1.前言
zookeeper是一个用于维护配置信息、命名、提供分布式同步和提供组服务。它自身是高可用的,只要宕机节点不达到半数,zookeeper服务都不会离线。zookeeper为实现分布式锁,分布式栅栏,分布式队列,安全的配置存储交换,在线状态监控,选举提供了坚实的基础。
在hadoop环境中,zookeeper被广泛应用。hadoop 高可用是依赖zookeeper的实现的。Hbase,storm,kafka都强依赖zookeeper,没有zookeeper根本都运行不起来。阿里的微服务治理框架dubbo也是依赖zookeeper的。
python访问zookeeper使用的的模块是kazoo。
本文主要介绍kazoo操作zookeeper,并不会讨论zookeeper的原理和使用方法。
2.环境
- zookeeper版本:3.4.5-cdh5.14.0
- kazoo 2.4.0
3.示例代码
3.1 连接到zookeeper并获取节点及数据
#!/usr/bin/python
# -*- coding: UTF-8 -*-
''' Created on 2018.2.27 @author: laofeng @note: kazoo 访问 zookeeper '''
import sys
from kazoo.client import KazooClient,KazooState
import logging
logging.basicConfig(
level=logging.DEBUG
,stream=sys.stdout
,format='%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s')
#创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
hosts='172.16.21.23:2181,172.16.21.24:2181,172.16.21.25:2181'
,timeout=10.0 #连接超时时间
, logger=logging #传一个日志对象进行,方便 输出debug日志
)
#开始心跳
zk.start()
#获取根节点数据和状态
data, stat = zk.get('/')
print data #这行没有输出,‘/’根节点,并没有数据
print stat
''' 这个是stat的输出: ZnodeStat(czxid=0, mzxid=0, ctime=0, mtime=0, version=0, cversion=8448, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=4, pzxid=4295036257L) ZnodeState的属性列表: czxid : 创建这个节点时的zxid mzxid : 修改这个节点时的zxid ctime : 创建时间 mtime : 修改时间 version : 数据被修改的次数 cversion: 子节点被修改的次数 aversion: acl被改变的次数 ephemeralOwner:临时节点创建的用户,如果不是临时节点值为0 dataLength:节点数据长度 numChildren:子节点的数量 pzxid:子节点被修改的zxid '''
#获取根节点的所有子节点,返回的是一个列表,只有子节点的名称
children = zk.get_children("/");
print children
#下面是根节点的返回值
#[u'rmstore', u'kazoo', u'yarn-leader-election', u'zookeeper']
#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()
3.2 遍历所有子节点的函数,
#!/usr/bin/python
# -*- coding: UTF-8 -*-
''' Created on 2018.2.27 @author: laofeng @note: kazoo 访问 zookeeper '''
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from kazoo.client import KazooClient,KazooState
''' import logging logging.basicConfig( level=logging.DEBUG ,stream=sys.stdout ,format='%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s') '''
#递归遍历所有节点的子节点函数,_zk是KazooClient的对象,node是节点名称字符串,func是回调函数
def zk_walk(_zk, node, func):
data, stat = _zk.get(node)
children = _zk.get_children(node)
func(node, data, stat, children);
if len(children) > 0:
for sub in children:
sub_node = ''
if node != '/':
sub_node = node + '/' + sub
else:
sub_node = '/' + sub
zk_walk(_zk, sub_node, func)
#测试zk_walk的打印回调函数,只是把所有数据都打印出来
def printZNode(node, data, stat, children):
print("node : " + node)
print("data : " + str(data))
print("stat : " + str(stat))
print("child : " + str(children))
print '\n'
#创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
hosts='172.16.21.23:2181,172.16.21.24:2181,172.16.21.25:2181'
,timeout=10.0 #连接超时间
)
#开始心跳
zk.start()
#遍历谋个节点的所有子节点
zk_walk(zk, '/', printZNode)
#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()
打印结果:因为有二进制数据,所以一些乱码在
node : /
data :
stat : ZnodeStat(czxid=0, mzxid=0, ctime=0, mtime=0, version=0, cversion=8622, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=4, pzxid=4295037672L)
child : [u'rmstore', u'kazoo', u'yarn-leader-election', u'zookeeper']
node : /rmstore
data : None
stat : ZnodeStat(czxid=4295013289L, mzxid=4295013289L, ctime=1519892893473L, mtime=1519892893473L, version=0, cversion=1, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=1, pzxid=4295013291L)
child : [u'ZKRMStateRoot']
node : /rmstore/ZKRMStateRoot
data : None
stat : ZnodeStat(czxid=4295013291L, mzxid=4295013291L, ctime=1519892893488L, mtime=1519892893488L, version=0, cversion=2573, aversion=2, ephemeralOwner=0, dataLength=0, numChildren=5, pzxid=4295034705L)
child : [u'AMRMTokenSecretManagerRoot', u'RMAppRoot', u'EpochNode', u'RMVersionNode', u'RMDTSecretManagerRoot']
node : /rmstore/ZKRMStateRoot/AMRMTokenSecretManagerRoot
data :
••�ˣ������•••:
���•3
stat : ZnodeStat(czxid=4295013299L, mzxid=4295013303L, ctime=1519892893729L, mtime=1519892893882L, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=23, numChildren=0, pzxid=4295013299L)
child : []
node : /rmstore/ZKRMStateRoot/RMAppRoot
data : None
stat : ZnodeStat(czxid=4295013294L, mzxid=4295013294L, ctime=1519892893677L, mtime=1519892893677L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=4295013294L)
child : []
node : /rmstore/ZKRMStateRoot/EpochNode
data : ••
stat : ZnodeStat(czxid=4295013302L, mzxid=4295034703L, ctime=1519892893824L, mtime=1519969389707L, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=2, numChildren=0, pzxid=4295013302L)
child : []
node : /rmstore/ZKRMStateRoot/RMVersionNode
data : ••••
stat : ZnodeStat(czxid=4295013301L, mzxid=4295013301L, ctime=1519892893761L, mtime=1519892893761L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=4, numChildren=0, pzxid=4295013301L)
child : []
node : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot
data : None
stat : ZnodeStat(czxid=4295013295L, mzxid=4295013295L, ctime=1519892893687L, mtime=1519892893687L, version=0, cversion=3, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=3, pzxid=4295013298L)
child : [u'RMDTSequentialNumber', u'RMDTMasterKeysRoot', u'RMDelegationTokensRoot']
node : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTSequentialNumber
data : None
stat : ZnodeStat(czxid=4295013298L, mzxid=4295013298L, ctime=1519892893721L, mtime=1519892893721L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=4295013298L)
child : []
node : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot
data : None
stat : ZnodeStat(czxid=4295013296L, mzxid=4295013296L, ctime=1519892893696L, mtime=1519892893696L, version=0, cversion=4, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=4, pzxid=4295034705L)
child : [u'DelegationKey_1', u'DelegationKey_3', u'DelegationKey_2', u'DelegationKey_4']
node : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_1
data : •�•b ��••"��>L�� stat : ZnodeStat(czxid=4295013304L, mzxid=4295013304L, ctime=1519892893907L, mtime=1519892893907L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=17, numChildren=0, pzxid=4295013304L) child : [] node : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_3 data : •�•b•o,�•#���•�V� stat : ZnodeStat(czxid=4295034704L, mzxid=4295034704L, ctime=1519969389826L, mtime=1519969389826L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=17, numChildren=0, pzxid=4295034704L) child : [] node : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_2 data : •�•b ���•••jd••�� stat : ZnodeStat(czxid=4295013305L, mzxid=4295013305L, ctime=1519892893929L, mtime=1519892893929L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=17, numChildren=0, pzxid=4295013305L) child : [] node : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_4 data : •�•b•o-••wM��@�W
3.3 监控节点的变化
两种方式,使用注解和在代码中调用
3.3.1 使用注解的方式
@zk.DataWatch("/kazoo") #当节点kazoo的数据变化时这个函数会被调用
def watch_node(data, stat):
#如果节点被删除这个函数也会被调用,但是data和stat都是None
if stat and data:
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
else:
print "节点已经被删除"
@zk.ChildrenWatch("/kazoo") #观察子节点的变化
def watch_children(children):
print("Children are now: %s" % children)
# Above function called immediately, and from then on
3.3.2编程的方式监听节点变化
def monitor(event):
print type(event)
print event
#当rm_node产生某些变化时会出发这个函数调用,但是只能出发一次
if not zk.exists(rm_node, monitor):
zk.create(rm_node, "123", makepath=True)
#输出信息如下,event的类型和属性
''' <class 'kazoo.protocol.states.WatchedEvent'> WatchedEvent(type='CHANGED', state='CONNECTED', path=u'/kazoo') '''
4.其他API
创建节点,删除节点,修改节点的数据,事务api
''' create 节点 递归创建所有层级的节点,只能设置acl,不能设置data zk.ensure_path("/my/favorite") 创建节点并设置data zk.create("/my/favorite/node", b"a value") 读取数据 zk.exists() zk.get() zk.get_children() 修改数据 zk.set() 删除节点 zk.delete() 重试&自定义重试 retry,可以多次重复执行一个方法,直到成功,这个函数真是赞 result = zk.retry(zk.get, "/path/to/node") from kazoo.retry import KazooRetry kr = KazooRetry(max_tries=3, ignore_expire=False) result = kr(client.get, "/some/path") 事务 transaction = zk.transaction() transaction.check('/node/a', version=3) transaction.create('/node/b', b"a value") results = transaction.commit() '''