POM配置
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spark.version>1.6.0</spark.version>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<hbase.version>1.2.0-cdh5.8.2</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
</dependencies>
代码示例
引入包
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{ ConnectionFactory, Admin, Put, Delete, Get, Table, ResultScanner, Scan, Result }
import org.apache.hadoop.hbase.{ TableName, HTableDescriptor, HColumnDescriptor }
import org.apache.hadoop.hbase.util.Bytes
初始准备
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "sparkmaster1");
val conn = ConnectionFactory.createConnection(conf)
val admin = conn.getAdmin
val wetagTable = TableName.valueOf("wetag")
删除表
if (admin.tableExists(wetagTable)) {
println("drop table `wetag` ")
admin.disableTable(wetagTable)
admin.deleteTable(wetagTable)
println("done")
}
创建表
val tableDescr = new HTableDescriptor(wetagTable)
tableDescr.addFamily(new HColumnDescriptor("attr".getBytes))
tableDescr.addFamily(new HColumnDescriptor("bhvr".getBytes))
if (!admin.tableExists(wetagTable)) {
println("Creating table `wetag` ")
admin.createTable(tableDescr)
println("Table `wetag` created ")
}
插入(更新)记录
val key="12345678"
val family="attr"
val column="population.age"
val value="30"
val table=conn.getTable(wetagTable)
val p=new Put(key.getBytes)
p.addColumn(family.getBytes,column.getBytes,value.getBytes())
table.put(p)
val family="attr"
val column="population.sex"
val value="male"
val table=conn.getTable(wetagTable)
val p=new Put(key.getBytes)
p.addColumn(family.getBytes,column.getBytes,value.getBytes())
table.put(p)
val family="attr"
val column="population.age"
val value="45"
val table=conn.getTable(wetagTable)
val p=new Put(key.getBytes)
p.addColumn(family.getBytes,column.getBytes,value.getBytes())
table.put(p)
删除记录
val key="12345678"
val family="attr"
val column="population.age"
val value="30"
val table=conn.getTable(wetagTable)
val d = new Delete(key.getBytes)
d.addColumn(family.getBytes,column.getBytes)
table.delete(d)
查询某条记录
val key="12345678"
val family="attr"
val column="population.sex"
val table=conn.getTable(wetagTable)
val g = new Get(key.getBytes)
val result = table.get(g)
val value = Bytes.toString(result.getValue(family.getBytes, column.getBytes))
println("GET: " + key + " " + value)
扫描记录
var table:Table=null
var scanner:ResultScanner=null
val family="attr"
val column="population.sex"
try{
table=conn.getTable(wetagTable)
val s=new Scan()
s.addColumn(family.getBytes(),column.getBytes())
scanner=table.getScanner(s)
println("scan...for...")
var result:Result=scanner.next()
while(result!=null) {
println("Found row:" + result)
println("Found value: "+Bytes.toString(result.getValue(family.getBytes(),column.getBytes())))
result=scanner.next()
}
} finally {
if(table!=null)
table.close()
scanner.close()
}
删除列
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3");
val conn = ConnectionFactory.createConnection(conf)
var table:Table=null
var scanner:ResultScanner=null
val family="info"
val column="uuid"
val wetagTable = TableName.valueOf("tb")
try{
table=conn.getTable(wetagTable)
val s=new Scan()
s.addColumn(family.getBytes(),column.getBytes())
scanner=table.getScanner(s)
println("scan...for...")
var result:Result=scanner.next
while(result!=null) {
val rowkey = result.getRow
println("Found rowkey: " + Bytes.toString(rowkey) + " " + family +":"+column +": " + Bytes.toString(result.getValue(family.getBytes, column.getBytes)))
val delCell = new Delete(rowkey)
delCell.addColumns(family.getBytes, column.getBytes)
table.delete(delCell)
result=scanner.next
}
} finally {
if(table!=null)
table.close()
scanner.close()
}