Python与ZooKeeper集群连接

    xiaoxiao2021-03-25  168

    由于项目的需要,需要学习Python客户端连接ZooKeeper集群,并实现创建临时节点、获得指定的路径下的信息、监听子节点变化的功能。

    环境配置

    ZooKeeper集群的安装可以参考http://blog.csdn.net/mrbcy/article/details/54767484

    使用下面的命令安装kazoo

    pip install kazoo

    基本使用

    这一部分可参考官方文档:http://kazoo.readthedocs.io/en/latest/basic_usage.htm

    监听子节点变化

    下面的代码实现了创建一个临时、顺序的节点,并且可以监听子节点的变化。

    #-*- coding: utf-8 -*- import time from kazoo.client import KazooClient from kazoo.recipe.watchers import ChildrenWatch class ValidatorDetector: def __init__(self): self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181') self.validator_children_watcher = ChildrenWatch(client=self.zk,path='/mproxy/validators',func=self.validator_watcher_fun) self.zk.start() def validator_watcher_fun(self,children): print "The children now are:", children def create_node(self): self.zk.create('/mproxy/validators/validator',b'validator_huabei_1',ephemeral=True,sequence=True,makepath=True) def __del__(self): self.zk.close() if __name__ == '__main__': detector = ValidatorDetector() detector.create_node() time.sleep(10)

    ZooKeeper原生提供了监听节点变化及值的变化的API。关于这一部分可以参考http://blog.csdn.net/mrbcy/article/details/54790758。但是这些API只能生效一次,一旦被触发过一次以后就不会再触发了,除非再次注册。而kazoo则在这个基础上封装了更上层的API,可以持续的触发。这就是上面的ChildrenWatch,除此之外kazoo还封装了一个DataWatch,用于监听数据的变化。下面我们也会用到。

    kazoo还实现了自动续订功能,使得在会话过期后我们不需要再次初始化ZooKeeper客户端(这里可以参考http://blog.csdn.net/mrbcy/article/details/55062713),也是非常方便的。

    注册验证器

    有了上面的知识就可以做一个注册类和一个监测类了。

    #-*- coding: utf-8 -*- import threading import time from kazoo.client import KazooClient from kazoo.protocol.states import KazooState class InfoKeeper(threading.Thread): def __init__(self,register): threading.Thread.__init__(self) self.register=register def run(self): time.sleep(0.25) if self.register.zk_node is None: print "create method has not been called" return check_result = self.register.zk.exists(self.register.validator_path) if check_result is None: # redo the regist print "redo the regist" self.register.regist() else: print "the path remain exists" class ValidatorRegister: def __init__(self): self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181') self.zk_node = None self.validator_path = '/mproxy/validators/' self.zk.add_listener(self.conn_state_watcher) self.zk.start() def __del__(self): self.zk.close() def regist(self): self.zk_node = self.zk.create(self.validator_path + 'validator',bytes('validator_huabei_1'),ephemeral=True,sequence=True,makepath=True) def close(self): self.zk.stop() self.zk.close() def conn_state_watcher(self, state): if state == KazooState.CONNECTED: print "Now connected" if self.zk_node is None: print "create method has not been called" return info_keeper = InfoKeeper(self) info_keeper.start() elif state == KazooState.LOST: print "Now lost" else: print "Now suspended"

    监测类:

    #-*- coding: utf-8 -*- import time from kazoo.client import KazooClient from kazoo.recipe.watchers import ChildrenWatch class ValidatorDetector: def __init__(self): self.validator_path = '/mproxy/validators/' self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181') self.validator_children_watcher = ChildrenWatch(client=self.zk,path=self.validator_path,func=self.validator_watcher_fun) self.zk.start() def validator_watcher_fun(self,children): for child in children: validator_name = self.zk.get(path=self.validator_path + str(child)) print validator_name[0] print "The children now are:", children def __del__(self): self.zk.close()

    注册类这里稍微复杂了一点,做了一个在会话过期后重新注册的机制,如果会话过期,重新注册之前的注册信息。

    监听子节点值的变化

    嗯,这个需求仔细想过后可以通过监听子节点的变化来代替,所以暂时不实现了。

    转载请注明原文地址: https://ju.6miu.com/read-677.html

    最新回复(0)