Mproxy项目实录第4天

    xiaoxiao2021-03-25  125

    关于这个系列

    这个项目实录系列是记录Mproxy项目的整个开发流程。项目最终的目标是开发一套代理服务器的API。这个系列中会记录项目的需求、设计、验证、实现、升级等等,包括设计决策的依据,开发过程中的各种坑。希望和大家共同交流,一起进步。

    项目的源码我会同步更新到GitHub,项目地址:https://github.com/mrbcy/Mproxy。

    系列地址:

    Mproxy项目实录第1天

    Mproxy项目实录第2天

    Mproxy项目实录第3天

    今日计划

    到目前为止,我们已经有了一个爬虫和一个验证器。能够爬取代理服务器,并进行验证,然后将代理服务器的结果提交到Kafka集群中。接下来我们要先开发一点调度器,根据目前验证器的在线情况,动态更新ZooKeeper集群中的配置,从而控制验证器的运行。为此,验证器必须与ZooKeeper进行集成,能够向ZooKeeper注册自己以及监听控制配置的值。

    ZooKeeper集群注册器和监听器的封装

    接下来,我们要开发两个工具类,分别负责向ZooKeeper集群注册一个节点,以及监听指定节点的子节点的情况。这一部分我做了技术验证。详情可以参考:http://blog.csdn.net/mrbcy/article/details/60869079

    然后在上面的基础上又做了一点封装,让工具类能够兼容各个节点的情况。

    #-*- coding: utf-8 -*- import threading import time from kazoo.client import KazooClient from kazoo.protocol.states import KazooState class InfoKeeper(threading.Thread): def __init__(self,register): threading.Thread.__init__(self) self.register=register def run(self): time.sleep(0.25) if self.register.zk_node is None: print "create method has not been called" return check_result = self.register.zk.exists(self.register.zk_node) if check_result is None: # redo the regist print "redo the regist" self.register.regist() else: print "the path remain exists" class NodeRegister: def __init__(self,zkconn_str,node_path): # self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181') self.zk = KazooClient(hosts=zkconn_str) self.zk_node = None self.node_path = node_path self.zk.add_listener(self.conn_state_watcher) self.zk.start() def __del__(self): self.zk.close() def regist(self,node_name,value): self.zk_node = self.zk.create(self.node_path + node_name, value, ephemeral=True, sequence=True, makepath=True) def close(self): self.zk.close() def conn_state_watcher(self, state): if state == KazooState.CONNECTED: print "Now connected" if self.zk_node is None: print "create method has not been called" return info_keeper = InfoKeeper(self) info_keeper.start() elif state == KazooState.LOST: print "Now lost" else: print "Now suspended"

    上面的部分是注册器,负责向ZooKeeper集群注册一个临时的节点。

    #-*- coding: utf-8 -*- import time from kazoo.client import KazooClient from kazoo.recipe.watchers import ChildrenWatch class NodeDetector: def __init__(self,zkconn_str,node_path,node_change_listener = None): # self.node_path = '/mproxy/validators/' self.node_path = node_path # self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181') self.zk = KazooClient(hosts=zkconn_str) self.node_change_listener = node_change_listener self.children_watch = ChildrenWatch(client=self.zk, path=self.node_path, func=self.node_watcher_fun) self.zk.start() def node_watcher_fun(self, children): if self.node_change_listener is not None: self.node_change_listener(self.zk,self.node_path,children) def __del__(self): self.zk.close()

    上面的代码是监测器,负责监听某个节点下子节点的变化情况,并在合适的时间调用回调函数。

    验证器控制单元的开发

    接下来我们需要在调度器中开发验证器的控制单元。主要要做两件事,第1是监听验证器的上线情况,第2是根据验证器的上线情况动态的创建或删除ZooKeeper上的控制节点。

    使用上面的工具类,代码如下:

    #-*- coding: utf-8 -*- import time import traceback from conf.configloader import ConfigLoader from zkutil.nodedetector import NodeDetector from zkutil.noderegister import NodeRegister class Dispatcher: def __init__(self): self.conf_loader = ConfigLoader() def validator_node_change_listener(self,zk,node_path,children): try: # get all validators_name online_validator_names = [] for child in children: validator_node = zk.get(path = node_path + str(child)) if validator_node is not None: online_validator_names.append(validator_node[0]) print online_validator_names # judge whether all the validators on line validator_names = self.conf_loader.get_validator_list() is_all_online = True for name in validator_names: is_online = False for online_name in online_validator_names: if name == online_name: is_online = True if is_online == False: is_all_online = False break if is_all_online == True: self.validator_controller = NodeRegister(zkconn_str=self.conf_loader.get_zk_conn_str(), node_path=self.conf_loader.control_node_path()) self.validator_controller.regist('dispatcher',b'working') else: if self.validator_controller is not None: self.validator_controller.close() except Exception as e: traceback.print_exc() def start_work(self): self.validator_detector = NodeDetector(zkconn_str=self.conf_loader.get_zk_conn_str(),node_path=self.conf_loader.validator_node_path() ,node_change_listener = self.validator_node_change_listener) while True: time.sleep(1) if __name__ == '__main__': dispatcher = Dispatcher() dispatcher.start_work()

    config文件内容如下:

    [zookeeper] zk_conn_str = amaster:2181,anode1:2181,anode2:2181 control_node_path = /mproxy/validator_control_signal/ validator_node_path = /mproxy/validators/ [log] log_file_name = dispatcher.log [validator] validator_list = validator_huabei_test_1

    验证器的改造

    需要对验证器进行稍许的改造。使其能够监听调度器的控制指令。使用了上面封装的两个工具类。代码如下:

    #-*- coding: utf-8 -*- import logging import time from logging.handlers import RotatingFileHandler from conf.configloader import ConfigLoader from kafkaproxylistener import KafkaProxyListener from util.kafkaproxysubmitutil import KafkaProxySubmitUtil from util.proxyqueue import ProxyQueue from validator import ProxyValidator from zkutil.nodedetector import NodeDetector from zkutil.noderegister import NodeRegister class Scheduler: def __init__(self): self.init_log() def init_log(self): logging.getLogger().setLevel(logging.DEBUG) console = logging.StreamHandler() console.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s') console.setFormatter(formatter) logging.getLogger().addHandler(console) # add log ratate Rthandler = RotatingFileHandler('proxy_validator.log', maxBytes=10 * 1024 * 1024, backupCount=100, encoding="gbk") Rthandler.setLevel(logging.INFO) Rthandler.setFormatter(formatter) logging.getLogger().addHandler(Rthandler) def control_signal_change_listener(self,zk,node_path,children): try: if len(children) > 0: # now working logging.debug("接到控制器指令:开始工作") self.work_flag = True if self.proxy_listener is not None: self.proxy_listener.raiseExc(Exception) self.proxy_listener = KafkaProxyListener(queue=self.queue) self.proxy_listener.start() else: logging.debug("接到控制器指令:停止工作") self.work_flag = False if self.proxy_listener is not None: self.proxy_listener.raiseExc(Exception) self.proxy_listener = None except Exception as e: logging.exception("Exception happens") def start_working(self): self.work_flag = False # will not do the work unless receive the remote control signal validator_num = 10 validators = [] self.queue = ProxyQueue() config_loader = ConfigLoader() self.proxy_listener = None validator_register = NodeRegister(zkconn_str=config_loader.get_zk_conn_str(), node_path=config_loader.get_validator_node_path()) validator_register.regist('validator', bytes(config_loader.get_validator_name())) control_receiver = NodeDetector(zkconn_str=config_loader.get_zk_conn_str(), node_path=config_loader.get_control_node_path() ,node_change_listener=self.control_signal_change_listener) submit_util = KafkaProxySubmitUtil(bootstrap_server=config_loader.get_kafka_bootstrap_servers()) for i in xrange(validator_num): validators.append(ProxyValidator(queue=self.queue, submit_util=submit_util)) validators[i].start() while True: for i in xrange(validator_num): if validators[i].is_finish == True and self.queue.get_proxy_count() > 0 and self.work_flag == True: validators[i] = ProxyValidator(queue=self.queue, submit_util=submit_util) validators[i].start() logging.debug("分配一个新的验证器开始工作") if self.work_flag == True: logging.debug("当前任务列表长度:" + str(self.queue.get_proxy_count())) else: logging.debug("正在等待调度器的启动指令") time.sleep(1) print "代理服务器验证完毕。" if __name__ == '__main__': scheduler = Scheduler() scheduler.start_working()
    转载请注明原文地址: https://ju.6miu.com/read-8736.html

    最新回复(0)