需求:绘制渠道用户的每日趋势(每分钟一组数据一天1440组,2000+个渠道,区分新/老用户,2*1440*2000+=576万+/每天),需要保存90天。
查询条件:渠道号、新or老用户、日期
rowkey:渠道_日期_新or老用户_小时分钟(hhmm)
连接HBase
from thrift.protocol import TBinaryProtocol from thrift.transport import TSocket from thrift.transport import TTransport from hbase import Hbase def hbase_connect(): transport = TSocket.TSocket('*', 9090) # transport = TSocket.TSocket('10.50.14.105', 9090) hbase_transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) hbase_client = Hbase.Client(protocol) hbase_transport.open() return hbase_transport, hbase_client
创建表:
def create_hbase_table(): transport, client = hbase_connect() tables = client.getTableNames() print tables client.createTable('client_rt_pv', [Hbase.ColumnDescriptor(name = 'default')]) tables = client.getTableNames() print tables
插入数据:
mutationsbatch = [] #### loop rowkey = '_'.join([tmp_pub, dayStr, 'ac', tmp_ct]) mutations = [ Hbase.Mutation(column="default:pv", value=str(tmp_pv)), Hbase.Mutation(column="default:uv", value=str(tmp_uv)), Hbase.Mutation(column="default:pvdivuv", value=str('%.2f' % (tmp_pv/float(tmp_uv) if tmp_uv != 0 else 0, ))), Hbase.Mutation(column="default:tm", value=str(tmp_ct)), Hbase.Mutation(column="default:pub", value=str("".join([tmp_pub, ' ']))), Hbase.Mutation(column="default:pubname", value=pub_id.get(tmp_pub, 'unknown'))] mutationsbatch.append(Hbase.BatchMutation(row=rowkey, mutations=mutations)) ### end loop hbase_client.mutateRows("client_rt_pv", mutationsbatch, None) hive_transport.close()