1.工具类
1 package com.lixin.stuty.hbase; 2 3 import java.io.IOException; 4 5 import org.apache.commons.configuration.ConfigurationUtils; 6 import org.apache.commons.lang.StringUtils; 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.hbase.Cell; 9 import org.apache.hadoop.hbase.HBaseConfiguration; 10 import org.apache.hadoop.hbase.HColumnDescriptor; 11 import org.apache.hadoop.hbase.HTableDescriptor; 12 import org.apache.hadoop.hbase.TableName; 13 import org.apache.hadoop.hbase.client.Admin; 14 import org.apache.hadoop.hbase.client.Connection; 15 import org.apache.hadoop.hbase.client.ConnectionFactory; 16 import org.apache.hadoop.hbase.client.Get; 17 import org.apache.hadoop.hbase.client.Put; 18 import org.apache.hadoop.hbase.client.Result; 19 import org.apache.hadoop.hbase.client.ResultScanner; 20 import org.apache.hadoop.hbase.client.Scan; 21 import org.apache.hadoop.hbase.client.Table; 22 /** 23 * hbase for version 1.1.1 24 * @author Administrator 25 * 26 */ 27 public class HBaseUtil { 28 public static final String ZK_QUORUM = "hbase.zookeeper.quorum"; 29 public static final String ZK_CLIENTPORT = "hbase.zookeeper.property.clientPort"; 30 private Configuration conf = HBaseConfiguration.create(); 31 private Connection connection ; 32 private Admin admin; 33 34 public HBaseUtil(String zk_quorum) { 35 conf.set(ZK_QUORUM, zk_quorum); 36 init(); 37 } 38 39 public HBaseUtil(String zk_quorum,String zk_clientPort) { 40 conf.set(ZK_QUORUM, zk_quorum); 41 conf.set(ZK_CLIENTPORT, zk_clientPort); 42 init(); 43 } 44 45 private void init(){ 46 try { 47 //Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口 48 connection = ConnectionFactory.createConnection(conf); 49 admin = connection.getAdmin(); 50 } catch (IOException e) { 51 e.printStackTrace(); 52 } 53 } 54 public void close(){ 55 try { 56 if(admin != null) admin.close(); 57 if(connection!=null) connection.close(); 58 } catch (IOException e) { 59 e.printStackTrace(); 60 } 61 } 62 /** 63 * 创建一个表 64 * @param table_name 表名称 65 * @param family_names 列族名称集合 66 * @throws IOException 67 */ 68 public void create(String table_name,String... family_names) throws IOException{ 69 //获取TableName 70 TableName tableName = TableName.valueOf(table_name); 71 //table 描述 72 HTableDescriptor htabledes = new HTableDescriptor(tableName); 73 for(String family_name : family_names){ 74 //column 描述 75 HColumnDescriptor family = new HColumnDescriptor(family_name); 76 htabledes.addFamily(family); 77 } 78 admin.createTable(htabledes); 79 } 80 /** 81 * 增加一条记录 82 * @param table_name 表名称 83 * @param row rowkey 84 * @param family 列族名称 85 * @param qualifier 列族限定符(可以为null) 86 * @param value 值 87 * @throws IOException 88 */ 89 public void addColumn(String table_name,String row, String family,String qualifier,String value) throws IOException{ 90 //表名对象 91 TableName tableName = TableName.valueOf(table_name); 92 //表对象 93 Table table = connection.getTable(tableName); 94 // put对象 负责录入数据 95 Put put = new Put(row.getBytes()); 96 put.addColumn(family.getBytes(), qualifier.getBytes(), value.getBytes()); 97 table.put(put); 98 } 99 /** 100 * 判断表是否存在 101 */ 102 public boolean tableExist(String table_name) throws IOException{ 103 return admin.tableExists(TableName.valueOf(table_name)); 104 } 105 /**删除表*/ 106 public void deleteTable(String table_name) throws IOException{ 107 TableName tableName = TableName.valueOf(table_name); 108 if(admin.tableExists(tableName)){ 109 admin.disableTable(tableName); 110 admin.deleteTable(tableName); 111 } 112 } 113 /** 114 * 查询单个row的记录 115 * @param table_name 表明 116 * @param row 行键 117 * @param family 列族 118 * @param qualifier 列族成员 119 * @return 120 * @throws IOException 121 */ 122 public Cell[] getRow(String table_name,String row,String family,String qualifier) throws IOException{ 123 Cell[] cells = null; 124 //check 125 if(StringUtils.isEmpty(table_name)||StringUtils.isEmpty(row)){ 126 return null; 127 } 128 //Table 129 Table table = connection.getTable(TableName.valueOf(table_name)); 130 Get get = new Get(row.getBytes()); 131 //判断在查询记录时,是否限定列族和子列(qualifier). 132 if(StringUtils.isNotEmpty(family)&&StringUtils.isNotEmpty(qualifier)){ 133 get.addColumn(family.getBytes(), qualifier.getBytes()); 134 } 135 if(StringUtils.isNotEmpty(family)&&StringUtils.isEmpty(qualifier)){ 136 get.addFamily(family.getBytes()); 137 } 138 Result result = table.get(get); 139 cells = result.rawCells(); 140 return cells; 141 } 142 /** 143 * 获取表中的所有记录,可以指定列族,列族成员,开始行键,结束行键. 144 * @param table_name 145 * @param family 146 * @param qualifier 147 * @param startRow 148 * @param stopRow 149 * @return 150 * @throws IOException 151 */ 152 public ResultScanner getScan(String table_name,String family,String qualifier,String startRow,String stopRow) throws IOException{ 153 ResultScanner resultScanner = null; 154 155 //Table 156 Table table = connection.getTable(TableName.valueOf(table_name)); 157 Scan scan = new Scan(); 158 if(StringUtils.isNotBlank(family)&& StringUtils.isNotEmpty(qualifier)){ 159 scan.addColumn(family.getBytes(), qualifier.getBytes()); 160 } 161 if(StringUtils.isNotEmpty(family)&& StringUtils.isEmpty(qualifier)){ 162 scan.addFamily(family.getBytes()); 163 } 164 if(StringUtils.isNotEmpty(startRow)){ 165 scan.setStartRow(startRow.getBytes()); 166 } 167 if(StringUtils.isNotEmpty(stopRow)){ 168 scan.setStopRow(stopRow.getBytes()); 169 } 170 resultScanner = table.getScanner(scan); 171 172 return resultScanner; 173 } 174 }
2.测试:
1 package com.lixin.stuty.hbase; 2 3 import static org.junit.Assert.*; 4 5 import java.io.IOException; 6 import java.util.Iterator; 7 8 import org.apache.hadoop.hbase.Cell; 9 import org.apache.hadoop.hbase.CellUtil; 10 import org.apache.hadoop.hbase.client.Result; 11 import org.apache.hadoop.hbase.client.ResultScanner; 12 import org.apache.hadoop.hbase.util.Bytes; 13 import org.junit.Before; 14 import org.junit.Test; 15 16 public class HBaseUtilTest { 17 private HBaseUtil hu = null; 18 @Before 19 public void init(){ 20 String zk_quorum = "172.21.135.148"; 21 String zk_clientPort = "2181"; 22 hu = new HBaseUtil(zk_quorum, zk_clientPort); 23 } 24 @Test 25 public void testCreate() throws IOException { 26 String table_name = "users"; 27 String[] fanily_names = new String[]{"user_id","address","info"}; 28 hu.create(table_name, fanily_names); 29 hu.close(); 30 } 31 @Test 32 public void testIsExist() throws IOException{ 33 String table_name = "sitech"; 34 System.out.println(hu.tableExist(table_name)); 35 hu.close(); 36 } 37 @Test 38 public void testDelete() throws IOException{ 39 String table_name = "person1"; 40 hu.deleteTable(table_name); 41 hu.close(); 42 } 43 @Test 44 public void testGetRow() throws IOException{ 45 String table_name = "users"; 46 String row = "xiaoming"; 47 String family = "address"; 48 String qualifier = ""; 49 Cell[] cells = hu.getRow(table_name, row, family, qualifier); 50 for(Cell cell : cells){ 51 String recode_row = Bytes.toString(CellUtil.cloneRow(cell)); 52 String family1 = Bytes.toString(CellUtil.cloneFamily(cell)); 53 String qualifier1 = Bytes.toString(CellUtil.cloneQualifier(cell)); 54 String value = Bytes.toString(CellUtil.cloneValue(cell)); 55 System.out.println(recode_row+"\t"+family1+"\t"+qualifier1+"\t"+value); 56 } 57 hu.close(); 58 } 59 @Test 60 public void testGetScanner() throws IOException{ 61 String table_name = "users"; 62 String family = "address"; 63 String qualifier = "city"; 64 String startRow = "xiaoming"; 65 String stopRow = "xiaoming"; 66 67 ResultScanner resultScanner = hu.getScan(table_name, family, qualifier, startRow, stopRow); 68 Iterator<Result> iterator = resultScanner.iterator(); 69 while(iterator.hasNext()){ 70 Result result = iterator.next(); 71 Cell[] rawCells = result.rawCells(); 72 for(Cell cell : rawCells){ 73 String recode_row = Bytes.toString(CellUtil.cloneRow(cell)); 74 String family1 = Bytes.toString(CellUtil.cloneFamily(cell)); 75 String qualifier1 = Bytes.toString(CellUtil.cloneQualifier(cell)); 76 String value = Bytes.toString(CellUtil.cloneValue(cell)); 77 System.out.println(recode_row+"\t"+family1+"\t"+qualifier1+"\t"+value); 78 } 79 } 80 hu.close(); 81 } 82 }