python连接hive(使用zk服务发现多个thrift ha的方式)

背景

在网上搜了一下,目前python连接hive的工具大概有pyhs2,impyla,pyhive。但是都没有找到有支持hiveserver2 ha的方式。但是目前集群需求是连接带ha方式的hive thrift服务,使得多个服务能够自动通过zk来被发现,实现高可用和负载均衡。

依赖

基于pyhive的DB-API开发,使用kazoo连接zookeeper,hive版本为1.x

代码

引用

from kazoo.client import KazooClient
from pyhive import hive
import random

在zk上发现thrift服务,并返回所有host:port

#discovery the thrfit service host list
def discoveryThriftSerivcehost(zkhost,znodeName,serviceKeyword):
    zkClient = KazooClient(hosts=zkhost)
    zkClient.start()
    #get the children name of zonde
    result = zkClient.get_children(znodeName)
    zkClient.stop()
    length = result.__len__()
    hostList = list()
    for i in range(0, length):
        resultHost = {}
        map(lambda x: resultHost.setdefault(x.split("=")[0], x.split("=")[1]), str(result[i]).split(";"))
        hostList.append(resultHost.get(serviceKeyword))
    return hostList

在返回的list中随机选一个host:port作为thrift server来尝试连接,并将此host:port从list中移除。
轮询此操作,直到连接成功。

#connect hive by pyhive and return cursor
def connection(zkhost, znodeName, serviceKeyword, username, passwd, database):
    hostList = discoveryThriftSerivcehost(zkhost, znodeName, serviceKeyword)
    hostLength = hostList.__len__()
    random.seed()
    isConnected=False
    while isConnected is False and hostLength > 0:
        index = random.randint(0, hostLength-1)
        hostStr = hostList.pop(index).split(":")
        try:
            cursor = hive.connect(host=hostStr[0], port=hostStr[1], username=username, password=passwd, auth='LDAP', database=database).cursor()
            isConnected = True
        except:
            isConnected = False
            if hostLength > 1:
                print("ERROR:Can not connect "+hostStr[0]+":"+hostStr[1]+" .try another thrift server...")
            else:
                print("ERROR:Can not connect hiveserver2, please check the connection config and the hiveserver")
                return 0
        hostLength -= 1
    return cursor

对于内部用户也可以将zk信息先行传入,这样连接时的参数输入会变少。

#for LUDP Connection
def LUDPConnect(username, passwd, database):
    # for LUDP test environment
    zkhost="xxxxx:2181,xxxxx:2181"
    znodeName="/ludp_hive_ha"
    serviceKeyword="serverUri"
    return connection(zkhost, znodeName, serviceKeyword, username, passwd, database)

使用方法

from pyhiveConnection import hiveConnector
cursor = hiveConnector.connection("node15.test:2181,node16.test:2181","/hiveserver2","serverUri","admin",None,"tmp")
cursor.execute("show tables")
print( curosr.fetchall() )

github地址:https://github.com/peng128/pyhiveConnection

    原文作者:Moon_Storm
    原文地址: https://www.jianshu.com/p/443f03226b97
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞