grpc python 原始碼分析(1):server 的建立和啟動
阿新 • • 發佈:2020-12-21
from concurrent import futures import time import grpc from example import helloworld_pb2_grpc, helloworld_pb2 # 實現 proto 檔案中定義的 GreeterServicer class Greeter(helloworld_pb2_grpc.GreeterServicer): # 實現 proto 檔案中定義的 rpc 呼叫 def SayHello(self, request, context): return helloworld_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) def SayHelloAgain(self, request, context): return helloworld_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) def serve(): # 啟動 rpc 服務 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) server.add_insecure_port('[::]:50051') server.start() try: while True: time.sleep(60 * 60 * 24) # one day in seconds except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()
1️⃣ 建立 server
這裡我們傳了一個執行緒池給 grpc 的 server ,這個執行緒池用來處理請求。
經過重重呼叫,最後我們得到的 server 是_Server
的例項
class _Server(grpc.Server): # pylint: disable=too-many-arguments def __init__(self, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression): completion_queue = cygrpc.CompletionQueue() server = cygrpc.Server(_augment_options(options, compression)) server.register_completion_queue(completion_queue) self._state = _ServerState(completion_queue, server, generic_handlers, _interceptor.service_pipeline(interceptors), thread_pool, maximum_concurrent_rpcs) def add_generic_rpc_handlers(self, generic_rpc_handlers): _validate_generic_rpc_handlers(generic_rpc_handlers) _add_generic_handlers(self._state, generic_rpc_handlers) def add_insecure_port(self, address): return _common.validate_port_binding_result( address, _add_insecure_port(self._state, _common.encode(address))) def add_secure_port(self, address, server_credentials): return _common.validate_port_binding_result( address, _add_secure_port(self._state, _common.encode(address), server_credentials)) def start(self): _start(self._state) def wait_for_termination(self, timeout=None): # NOTE(https://bugs.python.org/issue35935) # Remove this workaround once threading.Event.wait() is working with # CTRL+C across platforms. return _common.wait(self._state.termination_event.wait, self._state.termination_event.is_set, timeout=timeout) def stop(self, grace): return _stop(self._state, grace) def __del__(self): if hasattr(self, '_state'): # We can not grab a lock in __del__(), so set a flag to signal the # serving daemon thread (if it exists) to initiate shutdown. self._state.server_deallocated = True
cygrpc.CompletionQueue
和cygrpc.Server
都是呼叫底層的c++ core
,我們不去管它。
再來看看這個_ServerState
的程式碼
class _ServerState(object): # pylint: disable=too-many-arguments def __init__(self, completion_queue, server, generic_handlers, interceptor_pipeline, thread_pool, maximum_concurrent_rpcs): self.lock = threading.RLock() self.completion_queue = completion_queue self.server = server self.generic_handlers = list(generic_handlers) self.interceptor_pipeline = interceptor_pipeline self.thread_pool = thread_pool self.stage = _ServerStage.STOPPED self.termination_event = threading.Event() self.shutdown_events = [self.termination_event] self.maximum_concurrent_rpcs = maximum_concurrent_rpcs self.active_rpc_count = 0 # TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields. self.rpc_states = set() self.due = set() # A "volatile" flag to interrupt the daemon serving thread self.server_deallocated = False
從這裡我們可以看到,python 的 server
只是對底層的簡單封裝,關於網路IO的處理完全是底層的 c++ core
負責,python 主要負責呼叫開發者的介面處理請求。
2️⃣ 註冊介面方法
這步負責將我們開發好的介面註冊到伺服器上,呼叫的是編譯 proto 檔案生成的 _pb2_grpc
字尾檔案的函式。
def add_GreeterServicer_to_server(servicer, server): rpc_method_handlers = { 'SayHello': grpc.unary_unary_rpc_method_handler( servicer.SayHello,#介面呼叫 request_deserializer=helloworld__pb2.HelloRequest.FromString,#反系列方法 response_serializer=helloworld__pb2.HelloReply.SerializeToString, #系列方法 ), 'SayHelloAgain': grpc.unary_unary_rpc_method_handler( servicer.SayHelloAgain, request_deserializer=helloworld__pb2.HelloRequest.FromString, response_serializer=helloworld__pb2.HelloReply.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( 'Greeter', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,))
請求的路由分發使用的是字典,key 是我們定義的介面名,value 則是一個命名元組,裡面儲存的我們的介面方法、序列化方法和反序列化。
3️⃣ 繫結監聽埠
這個最後是呼叫 c++ core
的程式碼,直接忽略
4️⃣ 服務啟動
server
的 start
方法只是呼叫 _start
函式
def start(self): _start(self._state)
def _start(state): with state.lock: if state.stage is not _ServerStage.STOPPED: raise ValueError('Cannot start already-started server!') state.server.start() state.stage = _ServerStage.STARTED _request_call(state) thread = threading.Thread(target=_serve, args=(state,)) thread.daemon = True thread.start()