封装一下 pyhs2,让其使用起来和 torndb 一样简单好用。
提供 db.query
、db.get
、db.execute
三个方法,用法和 torndb 完全一样。
query
和 get
返回值数据类型也完全一样; 惟一不同的是 execute
方法,torndb 中会返回 lastrowid
,本文中返回 None
完整代码:https://github.com/Shu-Ji/pyhs2-torndb
# coding: u8
import pyhs2
from pyhs2.error import Pyhs2Exception
class Row(dict):
"""A dict that allows for object-like property access syntax."""
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
class Connection(object):
def __init__(self, db_host, user, database, port=10000,
authMechanism="PLAIN"):
"""
create connection to hive server2
"""
self.conn = pyhs2.connect(host=db_host,
port=port,
authMechanism=authMechanism,
user=user,
database=database,
)
def query(self, sql):
"""Returns a row list for the given query and parameters."""
with self.conn.cursor() as cursor:
self._execute(cursor, sql)
column_names = [i['columnName'] for i in cursor.getSchema()]
return [Row(zip(column_names, row)) for row in cursor]
def _execute(self, cursor, sql):
try:
return cursor.execute(sql)
except Pyhs2Exception as e:
self.close()
raise(e)
def get(self, sql):
"""Returns the (singular) row returned by the given query.
If the query has no results, returns None. If it has
more than one result, raises an exception.
"""
rows = self.query(sql)
if not rows:
return None
elif len(rows) > 1:
raise Exception("Multiple rows returned for get() query")
else:
return rows[0]
def execute(self, sql):
"""Executes the given query, returning None."""
with self.conn.cursor() as cursor:
self._execute(cursor, sql)
def close(self):
if hasattr(self, 'conn'):
self.conn.close()
def __del__(self):
self.close()
def test_query(db):
sql = """
SELECT platform, dt
FROM mydb.pv_log
WHERE dt='20170827'
LIMIT 3
"""
rows = db.query(sql)
for row in rows:
print(row)
def test_get(db):
# get() should only return one row, else will raise an error
sql = """
SELECT platform, dt
FROM mydb.pv_log
WHERE dt='20170829'
LIMIT 1
"""
row = db.get(sql)
print(row)
# row is an dict-like object
print(row.dt)
print(row['dt'])
def test_execute(db):
# `UPDATE` or `DELETE` is DANGER!!!
# sql = """
# UPDATE xxx
# """
# db.execute(sql)
pass
def main():
host = '127.0.0.1'
db = Connection(db_host=host, port=10000, user='myuser',
database='mydb', authMechanism='PLAIN')
test_query(db)
test_get(db)
test_execute(db)
if __name__ == '__main__':
main()