背景
在网上搜了一下,目前python连接hive的工具大概有pyhs2,impyla,pyhive。但是都没有找到有支持hiveserver2 ha的方式。但是目前集群需求是连接带ha方式的hive thrift服务,使得多个服务能够自动通过zk来被发现,实现高可用和负载均衡。
依赖
基于pyhive的DB-API开发,使用kazoo连接zookeeper,hive版本为1.x
代码
引用
from kazoo.client import KazooClient
from pyhive import hive
import random
在zk上发现thrift服务,并返回所有host:port
#discovery the thrfit service host list
def discoveryThriftSerivcehost(zkhost,znodeName,serviceKeyword):
zkClient = KazooClient(hosts=zkhost)
zkClient.start()
#get the children name of zonde
result = zkClient.get_children(znodeName)
zkClient.stop()
length = result.__len__()
hostList = list()
for i in range(0, length):
resultHost = {}
map(lambda x: resultHost.setdefault(x.split("=")[0], x.split("=")[1]), str(result[i]).split(";"))
hostList.append(resultHost.get(serviceKeyword))
return hostList
在返回的list中随机选一个host:port作为thrift server来尝试连接,并将此host:port从list中移除。
轮询此操作,直到连接成功。
#connect hive by pyhive and return cursor
def connection(zkhost, znodeName, serviceKeyword, username, passwd, database):
hostList = discoveryThriftSerivcehost(zkhost, znodeName, serviceKeyword)
hostLength = hostList.__len__()
random.seed()
isConnected=False
while isConnected is False and hostLength > 0:
index = random.randint(0, hostLength-1)
hostStr = hostList.pop(index).split(":")
try:
cursor = hive.connect(host=hostStr[0], port=hostStr[1], username=username, password=passwd, auth='LDAP', database=database).cursor()
isConnected = True
except:
isConnected = False
if hostLength > 1:
print("ERROR:Can not connect "+hostStr[0]+":"+hostStr[1]+" .try another thrift server...")
else:
print("ERROR:Can not connect hiveserver2, please check the connection config and the hiveserver")
return 0
hostLength -= 1
return cursor
对于内部用户也可以将zk信息先行传入,这样连接时的参数输入会变少。
#for LUDP Connection
def LUDPConnect(username, passwd, database):
# for LUDP test environment
zkhost="xxxxx:2181,xxxxx:2181"
znodeName="/ludp_hive_ha"
serviceKeyword="serverUri"
return connection(zkhost, znodeName, serviceKeyword, username, passwd, database)
使用方法
from pyhiveConnection import hiveConnector
cursor = hiveConnector.connection("node15.test:2181,node16.test:2181","/hiveserver2","serverUri","admin",None,"tmp")
cursor.execute("show tables")
print( curosr.fetchall() )