python操作zookeeper

1.前言

zookeeper是一个用于维护配置信息、命名、提供分布式同步和提供组服务。它自身是高可用的,只要宕机节点不达到半数,zookeeper服务都不会离线。zookeeper为实现分布式锁,分布式栅栏,分布式队列,安全的配置存储交换,在线状态监控,选举提供了坚实的基础。

在hadoop环境中,zookeeper被广泛应用。hadoop 高可用是依赖zookeeper的实现的。Hbase,storm,kafka都强依赖zookeeper,没有zookeeper根本都运行不起来。阿里的微服务治理框架dubbo也是依赖zookeeper的。

python访问zookeeper使用的的模块是kazoo。

本文主要介绍kazoo操作zookeeper,并不会讨论zookeeper的原理和使用方法。

2.环境

  1. zookeeper版本:3.4.5-cdh5.14.0
  2. kazoo 2.4.0

3.示例代码

3.1 连接到zookeeper并获取节点及数据

#!/usr/bin/python
# -*- coding: UTF-8 -*-
''' Created on 2018.2.27 @author: laofeng @note: kazoo 访问 zookeeper '''
import sys
from kazoo.client import KazooClient,KazooState
import logging
logging.basicConfig(
    level=logging.DEBUG
    ,stream=sys.stdout
    ,format='%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s')

#创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
    hosts='172.16.21.23:2181,172.16.21.24:2181,172.16.21.25:2181'
    ,timeout=10.0  #连接超时时间
    , logger=logging #传一个日志对象进行,方便 输出debug日志
    )
#开始心跳
zk.start()

#获取根节点数据和状态
data, stat = zk.get('/')

print data   #这行没有输出,‘/’根节点,并没有数据
print stat   
''' 这个是stat的输出: ZnodeStat(czxid=0, mzxid=0, ctime=0, mtime=0, version=0, cversion=8448, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=4, pzxid=4295036257L) ZnodeState的属性列表: czxid : 创建这个节点时的zxid mzxid : 修改这个节点时的zxid ctime : 创建时间 mtime : 修改时间 version : 数据被修改的次数 cversion: 子节点被修改的次数 aversion: acl被改变的次数 ephemeralOwner:临时节点创建的用户,如果不是临时节点值为0 dataLength:节点数据长度 numChildren:子节点的数量 pzxid:子节点被修改的zxid '''

#获取根节点的所有子节点,返回的是一个列表,只有子节点的名称
children = zk.get_children("/");
print children
#下面是根节点的返回值
#[u'rmstore', u'kazoo', u'yarn-leader-election', u'zookeeper']

#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()

3.2 遍历所有子节点的函数,

#!/usr/bin/python
# -*- coding: UTF-8 -*-
''' Created on 2018.2.27 @author: laofeng @note: kazoo 访问 zookeeper '''
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

from kazoo.client import KazooClient,KazooState
''' import logging logging.basicConfig(  level=logging.DEBUG  ,stream=sys.stdout  ,format='%(asctime)s %(pathname)s %(funcName)s%(lineno)d %(levelname)s: %(message)s') '''
#递归遍历所有节点的子节点函数,_zk是KazooClient的对象,node是节点名称字符串,func是回调函数
def zk_walk(_zk, node, func):
    data, stat = _zk.get(node)
    children = _zk.get_children(node)
    func(node, data, stat, children);
    if len(children) > 0:
        for sub in children:
            sub_node = ''
            if node != '/':
                sub_node = node + '/' + sub
            else:
                sub_node = '/' + sub
            zk_walk(_zk, sub_node, func)
            
#测试zk_walk的打印回调函数,只是把所有数据都打印出来
def printZNode(node,  data, stat, children):
    print("node : " + node)
    print("data : " + str(data))
    print("stat : " + str(stat))
    print("child : " + str(children))
    print '\n'
    
#创建一个客户端,可以指定多台zookeeper,
zk = KazooClient(
    hosts='172.16.21.23:2181,172.16.21.24:2181,172.16.21.25:2181'
    ,timeout=10.0  #连接超时间
    )
#开始心跳
zk.start()

#遍历谋个节点的所有子节点
zk_walk(zk, '/', printZNode)

#执行stop后所有的临时节点都将失效
zk.stop()
zk.close()

打印结果:因为有二进制数据,所以一些乱码在

node  : /
data  : 
stat  : ZnodeStat(czxid=0, mzxid=0, ctime=0, mtime=0, version=0, cversion=8622, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=4, pzxid=4295037672L)
child : [u'rmstore', u'kazoo', u'yarn-leader-election', u'zookeeper']

node  : /rmstore
data  : None
stat  : ZnodeStat(czxid=4295013289L, mzxid=4295013289L, ctime=1519892893473L, mtime=1519892893473L, version=0, cversion=1, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=1, pzxid=4295013291L)
child : [u'ZKRMStateRoot']

node  : /rmstore/ZKRMStateRoot
data  : None
stat  : ZnodeStat(czxid=4295013291L, mzxid=4295013291L, ctime=1519892893488L, mtime=1519892893488L, version=0, cversion=2573, aversion=2, ephemeralOwner=0, dataLength=0, numChildren=5, pzxid=4295034705L)
child : [u'AMRMTokenSecretManagerRoot', u'RMAppRoot', u'EpochNode', u'RMVersionNode', u'RMDTSecretManagerRoot']

node  : /rmstore/ZKRMStateRoot/AMRMTokenSecretManagerRoot
data  : 
••�ˣ������•••:
���•3
stat  : ZnodeStat(czxid=4295013299L, mzxid=4295013303L, ctime=1519892893729L, mtime=1519892893882L, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=23, numChildren=0, pzxid=4295013299L)
child : []

node  : /rmstore/ZKRMStateRoot/RMAppRoot
data  : None
stat  : ZnodeStat(czxid=4295013294L, mzxid=4295013294L, ctime=1519892893677L, mtime=1519892893677L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=4295013294L)
child : []

node  : /rmstore/ZKRMStateRoot/EpochNode
data  : ••
stat  : ZnodeStat(czxid=4295013302L, mzxid=4295034703L, ctime=1519892893824L, mtime=1519969389707L, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=2, numChildren=0, pzxid=4295013302L)
child : []

node  : /rmstore/ZKRMStateRoot/RMVersionNode
data  : ••••
stat  : ZnodeStat(czxid=4295013301L, mzxid=4295013301L, ctime=1519892893761L, mtime=1519892893761L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=4, numChildren=0, pzxid=4295013301L)
child : []

node  : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot
data  : None
stat  : ZnodeStat(czxid=4295013295L, mzxid=4295013295L, ctime=1519892893687L, mtime=1519892893687L, version=0, cversion=3, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=3, pzxid=4295013298L)
child : [u'RMDTSequentialNumber', u'RMDTMasterKeysRoot', u'RMDelegationTokensRoot']

node  : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTSequentialNumber
data  : None
stat  : ZnodeStat(czxid=4295013298L, mzxid=4295013298L, ctime=1519892893721L, mtime=1519892893721L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=0, pzxid=4295013298L)
child : []

node  : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot
data  : None
stat  : ZnodeStat(czxid=4295013296L, mzxid=4295013296L, ctime=1519892893696L, mtime=1519892893696L, version=0, cversion=4, aversion=0, ephemeralOwner=0, dataLength=0, numChildren=4, pzxid=4295034705L)
child : [u'DelegationKey_1', u'DelegationKey_3', u'DelegationKey_2', u'DelegationKey_4']

node  : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_1
data  : •�•b    ��••"��>L�� stat : ZnodeStat(czxid=4295013304L, mzxid=4295013304L, ctime=1519892893907L, mtime=1519892893907L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=17, numChildren=0, pzxid=4295013304L) child : []  node : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_3 data : •�•b•o,�•#���•�V� stat : ZnodeStat(czxid=4295034704L, mzxid=4295034704L, ctime=1519969389826L, mtime=1519969389826L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=17, numChildren=0, pzxid=4295034704L) child : []  node : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_2 data : •�•b ���•••jd••�� stat : ZnodeStat(czxid=4295013305L, mzxid=4295013305L, ctime=1519892893929L, mtime=1519892893929L, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=17, numChildren=0, pzxid=4295013305L) child : []  node : /rmstore/ZKRMStateRoot/RMDTSecretManagerRoot/RMDTMasterKeysRoot/DelegationKey_4 data : •�•b•o-••wM��@�W

3.3 监控节点的变化

两种方式,使用注解和在代码中调用

3.3.1 使用注解的方式

@zk.DataWatch("/kazoo") #当节点kazoo的数据变化时这个函数会被调用
def watch_node(data, stat):
    #如果节点被删除这个函数也会被调用,但是data和stat都是None
    if stat and data:
        print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
    else:
        print "节点已经被删除"
        
        
@zk.ChildrenWatch("/kazoo") #观察子节点的变化
def watch_children(children):
    print("Children are now: %s" % children)
# Above function called immediately, and from then on

        

3.3.2编程的方式监听节点变化

def monitor(event):
    print type(event)
    print event
#当rm_node产生某些变化时会出发这个函数调用,但是只能出发一次 
if not zk.exists(rm_node, monitor):
    zk.create(rm_node, "123",  makepath=True)
#输出信息如下,event的类型和属性
''' <class 'kazoo.protocol.states.WatchedEvent'> WatchedEvent(type='CHANGED', state='CONNECTED', path=u'/kazoo') '''

4.其他API

创建节点,删除节点,修改节点的数据,事务api

''' create 节点 递归创建所有层级的节点,只能设置acl,不能设置data zk.ensure_path("/my/favorite")  创建节点并设置data zk.create("/my/favorite/node", b"a value")  读取数据  zk.exists()  zk.get()  zk.get_children()  修改数据  zk.set()  删除节点  zk.delete()  重试&自定义重试 retry,可以多次重复执行一个方法,直到成功,这个函数真是赞 result = zk.retry(zk.get, "/path/to/node")  from kazoo.retry import KazooRetry  kr = KazooRetry(max_tries=3, ignore_expire=False) result = kr(client.get, "/some/path")  事务  transaction = zk.transaction() transaction.check('/node/a', version=3) transaction.create('/node/b', b"a value") results = transaction.commit() '''

    原文作者:老冯
    原文地址: https://zhuanlan.zhihu.com/p/34177172
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞