对于数据操作,HBase支持四类主要的数据操作,分别是:
- Put :增加一行,修改一行
- Delete :删除一行,删除指定列族,删除指定column的多个版本,删除指定column的制定版本等
- Get :获取指定行的所有信息,获取指定行和指定列族的所有colunm,获取指定column,获取指定column的几个版本, 获取指定column的指定版本等
- Scan :获取所有行,获取指定行键范围的行,获取从某行开始的几行,获取满足过滤条件的行等
1. 命名空间NameSpace
在关系数据库系统中,命名空间NameSpace指的是一个表的逻辑分组 ,同一分组中的各个表有类似的用途。命名空间的概念为即将到来的多租户特性打下基础:
配额管理(Quota Management (HBASE-8410)):限制一个NameSpace可以使用的资源,资源包括region和table等
命名空间安全管理(Namespace Security Administration (HBASE-9206)):提供了另一个层面的多租户安全管理
Region服务器组(Region server groups (HBASE-6721)):一个命名空间或一张表,可以被固定到一组 RegionServers上,从而保证了数据隔离性。
1.1.命名空间管理
命名空间可以被创建、移除、修改。表和命名空间的隶属关系 在在创建表时决定,通过以下格式指定:<namespace>:<table>
Example:hbase shell中相关命令:
//Create a namespace create_namespace 'my_ns' //create my_table in my_ns namespace create ' my_ns:my_table', 'fam' //drop namespace drop_namespace 'my_ns' //alter namespace alter_namespace 'my_ns', {METHOD => 'set', 'PROPERTY_NAME' => 'PROPERTY_VALUE'}
1.2. 预定义的命名空间
有两个系统内置的预定义命名空间:
hbase:系统命名空间,用于包含hbase的内部表
default:所有未指定命名空间的表都自动进入该命名空间(默认的)
Example:指定命名空间和默认命名空间
//namespace=foo and table qualifier=bar create 'foo:bar', 'fam' //namespace=default and table qualifier=bar create 'bar', 'fam'
2.创建表
private static void createTable() throws MasterNotRunningException, ZooKeeperConnectionException, IOException { /** * create()方法有两个静态方法,无参和有参数 * 无参数:hadoop的core-site.xml配置的参数会被后面的覆盖 * 有参数:hadoop的core-site.xml配置的参数无法修改 */ Configuration conf = HBaseConfiguration.create();
// the location of hbase on hdfs conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); // the node of zookeeper conf.set("hbase.zookeeper.quorum", "ncst");
HBaseAdmin hba = new HBaseAdmin(conf);
// create namespace named 'ns' hba.createNamespace(NamespaceDescriptor.create("ns").build());
// create table named 'ns:t1' TableName tname = TableName.valueOf("ns:t1");
// Analyzing table exists if(!hba.tableExists(tname)){ HTableDescriptor htd = new HTableDescriptor(tname);
// add column 'f1' to table 't1' HColumnDescriptor hcd = new HColumnDescriptor("f1"); htd.addFamily(hcd);
//create table now hba.createTable(htd); } hba.close(); }
注意:
1). 必须将HBase集群的hbase-site.xml文件添加进工程的classpath中,否则 Configuration conf = HBaseConfiguration. create()代码获取不到需要的集群相关信息,也就无法找到集群,运行程序时会报错。即conf.set(“hbase.rootdir”, “hdfs://ncst:9000/hbase”); 和 conf.set(“hbase.zookeeper.quorum”, “ncst”); 不可缺少!
2). 代码HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(“ns:t1”));是描述表t1,并将t1添加到ns命名空间中,前提是该命名空间已存在,如果不存在则会报错NamespaceNotFoundException
3). 命名空间一般在建模阶段通过命令行创建,在java代码中通过hba.createNamespace(NamespaceDescriptor.create(“ns”).build());创建的机会不多
4). 创建 HBaseAdmin 对象时就已经建立了客户端程序与HBase集群的connection,所以在程序执行完成后,务必通过 hba.close(); 关闭connection
5). 可以通过 HTableDescriptor 对象设置Table的相关特性 ,比如:
//日志flush的时候是同步写,还是异步写 htd.setDurability(Durability.SYNC_WAL);
//region size大小,当一个region中的最大store文件达到这个size时,region就开始分裂 htd.setMaxFileSize(1024*1024*1024);
//MemStore大小,当memstore达到这个值时,开始往磁盘中刷数据 htd.setMemStoreFlushSize(256*1024*1024);
6). 由于HBase的数据是先写入内存,数据累计达到内存阀值时才往磁盘中flush数据,所以,如果在数据还没有flush进硬盘时,RegionServer down掉了,内存中的数据将丢失。想解决这个场景的问题就需要用到WAL(Write-Ahead-Log),htd.setDurability(Durability.SYNC_WAL); 就是设置写WAL日志的级别,示例中设置的是同步写WAL,该方式安全性较高,但无疑会一定程度影响性能,请根据具体场景选择使用
同步交互:指发送一个请求,需要等待返回,然后才能够发送下一个请求,有个等待过程
异步交互:指发送一个请求,不需要等待返回,随时可以再发送下一个请求,即不需要等待
两者区别:一个需要等待,一个不需要等待,在部分情况下,我们的项目开发中都会优先选择不需要等待的异步交互方式。
7). setDurability(Durability d)方法可以在相关的三个对象中使用,分别是:HTableDescriptor,Delete,Put。其中Delete和Put的该方法都是继承自父类org.apache.hadoop.hbase.client.Mutation。分别针对表、插入操作、删除操作设定WAL日志写入级别。需要注意的是,Delete和Put并不会继承Table的Durability级别(已实测验证)。Durability是一个枚举变量,如果不通过该方法指定WAL日志级别,则为默认USE_DEFAULT级别
8). 可以通过 HColumnDescriptor 对象设置ColumnFamily的特性 ,比如:
//压缩内存中和存储文件中的数据,默认NONE(不压缩) //PREFIX-TREE算法以后详述 hcd.setDataBlockEncoding(DataBlockEncoding.PREFIX); //bloom过滤器:NONE,ROW(默认值)和ROWCOL.ROWCOL除了过滤ROW还要过滤列族 hcd.setBloomFilterType(BloomType.ROW); //集群间复制的时候,如果被设置成REPLICATION_SCOPE_LOCAL(默认值)就不能被复制了 hcd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); //数据保存的最大版本数.默认是Long.MAX hcd.setMaxVersions(3); //数据保存的最小版本数.默认是1.配合TTL使用 hcd.setMinVersions(1); //数据保存的最长时间,即TTL,单位是ms hcd.setTimeToLive(18000); //设定数据存储的压缩类型.默认无压缩(NONE) hcd.setCompressionType(Algorithm.SNAPPY); //是否保存那些已经删除掉的cell hcd.setKeepDeletedCells(false); //设置数据保存在内存中以提高响应速度 hcd.setInMemory(true); //块缓存,保存着每个HFile数据块的startKey hcd.setBlockCacheEnabled(true); //块的大小,默认值是65536 hcd.setBlocksize(64*1024);
2.1.列出所有的表
//listTableNames例子 private static void listTable() throws MasterNotRunningException, ZooKeeperConnectionException, IOException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst");
HBaseAdmin hba = new HBaseAdmin(conf);
TableName[] listTableNames = hba.listTableNames(); for (TableName tableName : listTableNames) { System.out.println(tableName.toString()); } hba.close(); }
3.删除表
说明:删除表前必须先disable表
private static void dropTable() throws MasterNotRunningException, ZooKeeperConnectionException, IOException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst");
HBaseAdmin hba = new HBaseAdmin(conf);
TableName tableName = TableName.valueOf("t1"); if(hba.tableExists(tableName)){
//Must disable table at first hba.disableTable(tableName);
//delete table hba.deleteTable(tableName); } hba.close(); }
4.修改表
4.1.删除列族 and 新增列族
通过 HTableDescriptor htd = hba.getTableDescriptor(tablename.getBytes()); 取得目标表的描述对象,通过 hba.modifyTable(tablename, htd); 将修改后的描述对象应用到目标表
private static void modifyCF() throws MasterNotRunningException, ZooKeeperConnectionException, IOException, TableNotFoundException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst");
HBaseAdmin hba = new HBaseAdmin(conf);
String tablename = "nc:t1"; if(hba.tableExists(tablename)){ //disable table hba.disableTable(tablename); HTableDescriptor htd = hba.getTableDescriptor(tablename.getBytes()); //delete column family named 'f1' htd.removeFamily("f1".getBytes()); //add new column family HColumnDescriptor hcd = new HColumnDescriptor("info"); hcd.setMaxVersions(3); hcd.setInMemory(true); htd.addFamily(hcd); //modify target table hba.modifyTable(tablename, htd); //enable table hba.enableTable(tablename); } hba.close(); }
4.2.修改现有列族的属性(setMaxVersions)
private static void ModifyCFAttribute() throws MasterNotRunningException, ZooKeeperConnectionException, IOException, TableNotFoundException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst");
HBaseAdmin hba = new HBaseAdmin(conf);
String tablename = "nc:t1"; if(hba.tableExists(tablename)){ hba.disableTable(tablename); HTableDescriptor htd = hba.getTableDescriptor(tablename.getBytes()); //get target column family HColumnDescriptor family = htd.getFamily("info".getBytes()); family.setMaxVersions(5); //modify table hba.modifyTable(tablename, htd); } hba.close(); }
5.插入数据Put
5.1.常用构造函数:
指定行键:public Put(byte[] row)
指定行键和时间戳:public Put(byte[] row, long ts)
从目标字符串中提取子串,作为行键:Put(byte[] rowArray, int rowOffset, int rowLength)
从目标字符串中提取子串,作为行键,并加上时间戳:Put(byte[] rowArray, int rowOffset, int rowLength, long ts)
5.2.Put常用方法:
指定列族和限定符,添加值:add(byte[] family, byte[] qualifier, byte[] value)
指定列族,限定符和时间戳,添加值:add(byte[] family, byte[] qualifier, long ts, byte[] value)
设置写WAL(Write-Ahead-Log)的级别:public void setDurability(Durability d)
参数是一个枚举值,可以有以下几种选择:
- ASYNC_WAL : 当数据变动时,异步写WAL日志
- SYNC_WAL : 当数据变动时,同步写WAL日志
- FSYNC_WAL : 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘
- SKIP_WAL : 不写WAL日志
- USE_DEFAULT : 使用HBase全局默认的WAL写入级别,即 SYNC_WAL
private static void putOneRecord() throws IOException, InterruptedIOException, RetriesExhaustedWithDetailsException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst");
HTable ht = new HTable(conf, "nc:t1"); //RowKey Put put = new Put("xiaoming".getBytes()); //column family, column, value put.add("info".getBytes(), "age".getBytes(), "18".getBytes()); put.setDurability(Durability.SYNC_WAL); //put a record ht.put(put); ht.close(); }
注意:
1). Put的构造函数都需要指定行键,如果是全新的行键,则新增一行;如果是已有的行键,则更新现有行
2). 创建Put对象及put.add(***)过程都是在构建一行的数据:创建Put对象时相当于创建了行对象,add的过程就是往目标行里添加cell,直到table.put(***)才将数据插入表格
3). Put的其他构造函数 Put put = new Put(“100001_100002”.getBytes(),7,6); 第二个参数是偏移量,也就是从第一个参数的第几个字符开始截取RowKey,第三个参数是截取长度。这个代码实际是从 100001_100002 中截取了100002子串作为目标行的RowKey。
6.删除数据Delete
Delete类用于删除表中的一行数据,通过HTable.delete来执行该动作。
在执行Delete操作时,HBase并不会立即删除数据,而是对需要删除的数据打上一个“墓碑”标记,直到当Storefile合并时,再清除这些被标记上“墓碑”的数据。
如果希望删除整行,用行键来初始化一个Delete对象即可。如果希望进一步定义删除的具体内容,可以使用以下这些Delete对象的方法:
- 为了删除指定的列族,可以使用 deleteFamily
- 为了删除指定列的多个版本,可以使用 deleteColumns
- 为了删除指定列的指定版本 ,可以使用 deleteColumn,这样的话就只会删除版本号与指定版本相同的列。如果不指定时间戳,默认只删除最新的版本
6.1.构造函数
1). 指定要删除的行键:Delete(byte[] row)
删除行键指定行的数据。如果没有进一步的操作,使用该构造函数将删除行键指定的行中所有列族中所有列的所有版本 !
2). 指定要删除的行键和时间戳:Delete(byte[] row, long timestamp)
删除行键和时间戳共同确定行的数据。如果没有进一步的操作,使用该构造函数将删除行键指定的行中,所有列族中所有列的时间戳【小于等于】指定时间戳的数据版本 !
注意 :该时间戳仅仅和删除行有关,如果需要进一步指定列族或者列,你必须分别为它们指定时间戳。
3). 给定一个字符串,目标行键的偏移,截取的长度:Delete(byte[] rowArray, int rowOffset, int rowLength)
4).给定一个字符串,目标行键的偏移,截取的长度,时间戳:Delete(byte[] rowArray, int rowOffset, int rowLength, long ts)
6.2.常用方法
1).删除指定列的 最新版本 的数据:Delete deleteColumn (byte[] family, byte[] qualifier)
2).删除指定列的 指定版本 的数据:Delete deleteColumn (byte[] family, byte[] qualifier, long timestamp )
3).删除指定列的 所有版本 的数据:Delete deleteColumns (byte[] family, byte[] qualifier)
4).删除指定列的,时间戳 小于等于 给定时间戳的 所有版本 的数据:Delete deleteColumns (byte[] family, byte[] qualifier, long timestamp )
5).删除指定列族的所有列的 所有版本 数据:Delete deleteFamily (byte[] family)
6).删除指定列族的所有列中 时间戳 小于等于 指定时间戳 的所有数据:Delete deleteFamily (byte[] family, long timestamp)
7).删除指定列族中 所有列的时间戳 等于 指定时间戳的版本数据:Delete deleteFamilyVersion (byte[] family, long timestamp)
8).为Delete对象设置时间戳:void setTimestamp (long timestamp)
private static void deleteColumn() throws IOException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst"); HTable ht = new HTable(conf, "nc:t1"); //RowKey Delete delete = new Delete("xiaoming".getBytes()); //Column delete.deleteColumn("info".getBytes(), "age".getBytes()); ht.delete(delete); ht.close(); }
7.获取单行Get
如果希望获取整行数据,用行键初始化一个Get对象就可以,如果希望进一步缩小获取的数据范围,可以使用Get对象的的方法.
7.1.构造函数
Get的构造函数很简单,只有一个构造函数: Get(byte[] row) 参数是行键。
7.2.Get对象常用方法
1). Get addFamily(byte[] family) 指定希望获取的列族
2). Get addColumn(byte[] family, byte[] qualifier) 指定希望获取的列
3). Get setTimeRange(long minStamp, long maxStamp) 设置获取数据的 时间戳范围
4). Get setTimeStamp(long timestamp) 设置获取数据的时间戳
5). Get setMaxVersions(int maxVersions) 设定获取数据的版本数
6). Get setMaxVersions() 设定获取数据的所有版本
7). Get setFilter(Filter filter) 为Get对象添加过滤器,过滤器详解请参见:HBase API Filter过滤器
8). void setCacheBlocks(boolean cacheBlocks) 设置该Get获取的数据是否缓存在内存中
//获取指定RowKey,指定列 的最新版本数据 private static void getColumn() throws IOException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst");
HTable ht = new HTable(conf, "users"); Get get = new Get("xiaoming".getBytes()); // Column Family and Qualifier get.addColumn("address".getBytes(), "city".getBytes()); Result result = ht.get(get); for(Cell cell : result.rawCells()){ System.out.println(new String(CellUtil.cloneRow(cell))+"\t" +new String(CellUtil.cloneFamily(cell))+"\t" +new String(CellUtil.cloneQualifier(cell))+"\t" +new String(CellUtil.cloneValue(cell))+"\t" +cell.getTimestamp()); } ht.close(); }
//获取指定RowKey,指定 时间戳 的数据 private static void getTimeStamp() throws IOException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst"); HTable ht = new HTable(conf, "users"); Get get = new Get("xiaoming".getBytes()); // TimeStamp get.setTimeStamp(1441997498939L); Result result = ht.get(get); for(Cell cell : result.rawCells()){ System.out.println(new String(CellUtil.cloneRow(cell))+"\t" +new String(CellUtil.cloneFamily(cell))+"\t" +new String(CellUtil.cloneQualifier(cell))+"\t" +new String(CellUtil.cloneValue(cell))+"\t" +cell.getTimestamp()); } ht.close(); }
8.获取多行Scan
Scan对象可以返回满足给定条件的多行数据。如果希望获取所有的行,直接初始化一个Scan对象即可。如果希望限制扫描的行范围,可以使用Scan对象的方法
8.1.Scan构造函数
1). 创建扫描所有行的Scan:Scan()
2). 创建Scan,从指定行开始扫描:Scan(byte[] startRow)
注意:如果指定行不存在,从下一个最近的行开始
3). 创建Scan,指定起止行:Scan(byte[] startRow, byte[] stopRow)
注意: startRow <= 结果集 < stopRow
4). 创建Scan,指定起始行和过滤器:Scan(byte[] startRow, Filter filter)
注意:过滤器的功能和构造参见 http://blog.csdn.net/u010967382/article/details/37653177
8.2.Scan对象常用方法
- Scan setStartRow (byte[] startRow) 设置Scan的开始行,默认 结果集 包含该行。如果希望结果集不包含该行,可以在行键末尾加上0。
- Scan setStopRow (byte[] stopRow) 设置Scan的结束行,默认 结果集 不包含该行。如果希望结果集包含该行,可以在行键末尾加上0。
- Scan setBatch(int batch) 指定最多返回的Cell数目.用于防止一行中有过多的数据,导致OutofMemory错误
- Scan setTimeRange (long minStamp, long maxStamp) 扫描指定 时间范围 的数据
- Scan setTimeStamp (long timestamp) 扫描 指定时间 的数据
- Scan addColumn (byte[] family, byte[] qualifier) 指定扫描的列
- Scan addFamily (byte[] family) 指定扫描的列族
- Scan setFilter (Filter filter) 为Scan设置过滤器,详见HBase API Filter过滤器
- Scan setReversed (boolean reversed) 设置Scan的扫描顺序,默认是正向扫描(false),可以设置为逆向扫描(true)。注意:该方法0.98版本以后才可用!!
- Scan setMaxVersions () 获取所有版本的数据
- Scan setMaxVersions (int maxVersions) 设置获取的最大版本数! 不调用上下两个setMaxVersions() 方法,只返回最新版本数据
- void setCaching (int caching) 设定缓存在内存中的行数,缓存得越多,以后查询结果越快,同时也消耗更多内存
- void setRaw (boolean raw) 激活或者禁用raw模式。如果raw模式被激活,Scan将返回 所有已经被打上删除标记但尚未被真正删除 的数据。该功能仅用于激活了 KEEP_DELETED_ROWS的列族,即列族开启了 hcd.setKeepDeletedCells(true)
- Scan激活raw模式后,只能浏览所有的列,而不能指定任意的列,否则会报错
//扫描表中的 所有行 的最新版本数据 private static void scanAll() throws IOException, UnsupportedEncodingException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst");
HTable ht = new HTable(conf, "users"); // Scan All Scan scan = new Scan(); ResultScanner rs = ht.getScanner(scan); for(Result result : rs){ for(Cell cell : result.rawCells()){ System.out.println(new String(CellUtil.cloneRow(cell))+"\t" +new String(CellUtil.cloneFamily(cell))+"\t" +new String(CellUtil.cloneQualifier(cell))+"\t" +new String(CellUtil.cloneValue(cell),"UTF-8")+"\t" +cell.getTimestamp()); } }
ht.close(); }
//扫描指定RowKey范围,通过末尾加0,使得结果集包含StopRow
private static void scanRange() throws IOException, UnsupportedEncodingException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst");
HTable ht = new HTable(conf, "users"); // Range Scan scan = new Scan("xiaoming".getBytes(),"xiaoming030".getBytes()); ResultScanner rs = ht.getScanner(scan); for(Result result : rs){ for(Cell cell : result.rawCells()){ System.out.println(new String(CellUtil.cloneRow(cell))+"\t" +new String(CellUtil.cloneFamily(cell))+"\t" +new String(CellUtil.cloneQualifier(cell))+"\t" +new String(CellUtil.cloneValue(cell),"UTF-8")+"\t" +cell.getTimestamp()); } } ht.close(); }
//返回 所有已经被打上删除标记但尚未被真正删除 的数据 private static void scanRaw() throws IOException, UnsupportedEncodingException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst");
HTable ht = new HTable(conf, "users"); Scan scan = new Scan();
//开启raw模式 scan.setRaw(true);
//默认值long.Max scan.setMaxVersions(); ResultScanner rs = ht.getScanner(scan); for(Result result : rs){ for(Cell cell : result.rawCells()){ System.out.println(new String(CellUtil.cloneRow(cell))+"\t" +new String(CellUtil.cloneFamily(cell))+"\t" +new String(CellUtil.cloneQualifier(cell))+"\t" +new String(CellUtil.cloneValue(cell),"UTF-8")+"\t" +cell.getTimestamp()); } } ht.close(); }
//结合过滤器,获取所有age在15到30之间的行 private static void scanFilter() throws IOException, UnsupportedEncodingException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://ncst:9000/hbase"); conf.set("hbase.zookeeper.quorum", "ncst");
HTable ht = new HTable(conf, "users"); // And FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); // >= SingleColumnValueFilter filter1 = new SingleColumnValueFilter("info".getBytes(), "age".getBytes(), CompareOp.GREATER_OR_EQUAL, "15".getBytes()); // =< SingleColumnValueFilter filter2 = new SingleColumnValueFilter("info".getBytes(), "age".getBytes(), CompareOp.LESS_OR_EQUAL, "30".getBytes()); filterList.addFilter(filter1); filterList.addFilter(filter2); Scan scan = new Scan(); // set Filter scan.setFilter(filterList); ResultScanner rs = ht.getScanner(scan); for(Result result : rs){ for(Cell cell : result.rawCells()){ System.out.println(new String(CellUtil.cloneRow(cell))+"\t" +new String(CellUtil.cloneFamily(cell))+"\t" +new String(CellUtil.cloneQualifier(cell))+"\t" +new String(CellUtil.cloneValue(cell),"UTF-8")+"\t" +cell.getTimestamp()); } } ht.close(); }