Python 中使用 pyhs2 连接 HiveServer 2 查询数据

封装一下 pyhs2,让其使用起来和 torndb 一样简单好用。
提供 db.querydb.getdb.execute 三个方法,用法和 torndb 完全一样。
queryget 返回值数据类型也完全一样; 惟一不同的是 execute 方法,torndb 中会返回 lastrowid,本文中返回 None

完整代码:https://github.com/Shu-Ji/pyhs2-torndb

# coding: u8

import pyhs2
from pyhs2.error import Pyhs2Exception


class Row(dict):
    """A dict that allows for object-like property access syntax."""
    def __getattr__(self, name):
        try:
            return self[name]
        except KeyError:
            raise AttributeError(name)


class Connection(object):
    def __init__(self, db_host, user, database, port=10000,
            authMechanism="PLAIN"):
        """
        create connection to hive server2
        """

        self.conn = pyhs2.connect(host=db_host,
                port=port,
                authMechanism=authMechanism,
                user=user,
                database=database,
                )

    def query(self, sql):
        """Returns a row list for the given query and parameters."""
        with self.conn.cursor() as cursor:
            self._execute(cursor, sql)
            column_names = [i['columnName'] for i in cursor.getSchema()]
            return [Row(zip(column_names, row)) for row in cursor]

    def _execute(self, cursor, sql):
        try:
            return cursor.execute(sql)
        except Pyhs2Exception as e:
            self.close()
            raise(e)

    def get(self, sql):
        """Returns the (singular) row returned by the given query.
        If the query has no results, returns None.  If it has
        more than one result, raises an exception.
        """

        rows = self.query(sql)
        if not rows:
            return None
        elif len(rows) > 1:
            raise Exception("Multiple rows returned for get() query")
        else:
            return rows[0]

    def execute(self, sql):
        """Executes the given query, returning None."""

        with self.conn.cursor() as cursor:
            self._execute(cursor, sql)

    def close(self):
        if hasattr(self, 'conn'):
            self.conn.close()

    def __del__(self):
        self.close()


def test_query(db):
    sql = """
    SELECT platform, dt
    FROM  mydb.pv_log
    WHERE dt='20170827'
    LIMIT 3
    """
    rows = db.query(sql)
    for row in rows:
        print(row)


def test_get(db):
    # get() should only return one row, else will raise an error
    sql = """
    SELECT platform, dt
    FROM  mydb.pv_log
    WHERE dt='20170829'
    LIMIT 1
    """
    row = db.get(sql)
    print(row)
    # row is an dict-like object
    print(row.dt)
    print(row['dt'])


def test_execute(db):
    # `UPDATE` or `DELETE` is DANGER!!!
    # sql = """
    # UPDATE xxx
    # """
    # db.execute(sql)
    pass


def main():
    host = '127.0.0.1'
    db = Connection(db_host=host, port=10000, user='myuser',
            database='mydb', authMechanism='PLAIN')

    test_query(db)
    test_get(db)
    test_execute(db)


if __name__ == '__main__':
    main()
    原文作者:书SHU
    原文地址: https://www.jianshu.com/p/b8d748cf1ce2
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞