Python通過zookeeper實現分散式服務程式碼解析
阿新 • • 發佈:2020-07-24
藉助zookeeper可以實現伺服器的註冊與發現,有需求的時候呼叫zookeeper來發現可用的伺服器,將任務均勻分配到各個伺服器上去.
這樣可以方便的隨任務的繁重程度對伺服器進行彈性擴容,客戶端和服務端是非耦合的,也可以隨時增加客戶端.
zk_server.py
import threading import json import socket import sys from kazoo.client import KazooClient # TCP服務端繫結埠開啟監聽,同時將自己註冊到zk class ZKServer(object): def __init__(self,host,port): self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) self.host = host self.port = port self.sock.bind((host,port)) self.zk = None def serve(self): """ 開始服務,每次獲取得到一個資訊,都新建一個執行緒處理 """ self.sock.listen(128) self.register_zk() print("開始監聽") while True: conn,addr = self.sock.accept() print("建立連結%s" % str(addr)) t = threading.Thread(target=self.handle,args=(conn,addr)) t.start() # 具體的處理邏輯,只要接收到資料就立即投入工作,下次沒有資料本次連結結束 def handle(self,conn,addr): while True: data=conn.recv(1024) if not data or data.decode('utf-8') == 'exit': break print(data.decode('utf-8')) conn.close() print('My work is done!!!') # 將自己註冊到zk,臨時節點,所以連線不能中斷 def register_zk(self): """ 註冊到zookeeper """ self.zk = KazooClient(hosts='127.0.0.1:2181') self.zk.start() self.zk.ensure_path('/rpc') # 建立根節點 value = json.dumps({'host': self.host,'port': self.port}) # 建立服務子節點 self.zk.create('/rpc/server',value.encode(),ephemeral=True,sequence=True) if __name__ == '__main__': if len(sys.argv) < 3: print("usage:python server.py [host] [port]") exit(1) host = sys.argv[1] port = sys.argv[2] server = ZKServer(host,int(port)) server.serve()
zk_client.py
import random import sys import time import json import socket from kazoo.client import KazooClient # 客戶端連線zk,並從zk獲取可用的伺服器列表 class ZKClient(object): def __init__(self): self._zk = KazooClient(hosts='127.0.0.1:2181') self._zk.start() self._get_servers() def _get_servers(self,event=None): """ 從zookeeper獲取伺服器地址資訊列表 """ servers = self._zk.get_children('/rpc',watch=self._get_servers) # print(servers) self._servers = [] for server in servers: data = self._zk.get('/rpc/' + server)[0] if data: addr = json.loads(data.decode()) self._servers.append(addr) def _get_server(self): """ 隨機選出一個可用的伺服器 """ return random.choice(self._servers) def get_connection(self): """ 提供一個可用的tcp連線 """ sock = None while True: server = self._get_server() print('server:%s' % server) try: sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.connect((server['host'],server['port'])) except ConnectionRefusedError: time.sleep(1) continue else: break return sock if __name__ == '__main__': # 模擬多個客戶端批量生成任務,推送給伺服器執行 client = ZKClient() for i in range(40): sock = client.get_connection() sock.send(bytes(str(i),encoding='utf8')) sock.close() time.sleep(1)
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。