1. 程式人生 > 實用技巧 >grpc python 原始碼分析(1):server 的建立和啟動

grpc python 原始碼分析(1):server 的建立和啟動

from concurrent import futures
import time
import grpc
from example import helloworld_pb2_grpc, helloworld_pb2


# 實現 proto 檔案中定義的 GreeterServicer
class Greeter(helloworld_pb2_grpc.GreeterServicer):
    # 實現 proto 檔案中定義的 rpc 呼叫
    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message='
hello {msg}'.format(msg=request.name)) def SayHelloAgain(self, request, context): return helloworld_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) def serve(): # 啟動 rpc 服務 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) server.add_insecure_port(
'[::]:50051') server.start() try: while True: time.sleep(60 * 60 * 24) # one day in seconds except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()

1️⃣ 建立 server
這裡我們傳了一個執行緒池給 grpc 的 server ,這個執行緒池用來處理請求。
經過重重呼叫,最後我們得到的 server 是_Server的例項

class _Server(grpc.Server):

    # pylint: disable=too-many-arguments
    def __init__(self, thread_pool, generic_handlers, interceptors, options,
                 maximum_concurrent_rpcs, compression):
        completion_queue = cygrpc.CompletionQueue()
        server = cygrpc.Server(_augment_options(options, compression))
        server.register_completion_queue(completion_queue)
        self._state = _ServerState(completion_queue, server, generic_handlers,
                                   _interceptor.service_pipeline(interceptors),
                                   thread_pool, maximum_concurrent_rpcs)

    def add_generic_rpc_handlers(self, generic_rpc_handlers):
        _validate_generic_rpc_handlers(generic_rpc_handlers)
        _add_generic_handlers(self._state, generic_rpc_handlers)

    def add_insecure_port(self, address):
        return _common.validate_port_binding_result(
            address, _add_insecure_port(self._state, _common.encode(address)))

    def add_secure_port(self, address, server_credentials):
        return _common.validate_port_binding_result(
            address,
            _add_secure_port(self._state, _common.encode(address),
                             server_credentials))

    def start(self):
        _start(self._state)

    def wait_for_termination(self, timeout=None):
        # NOTE(https://bugs.python.org/issue35935)
        # Remove this workaround once threading.Event.wait() is working with
        # CTRL+C across platforms.
        return _common.wait(self._state.termination_event.wait,
                            self._state.termination_event.is_set,
                            timeout=timeout)

    def stop(self, grace):
        return _stop(self._state, grace)

    def __del__(self):
        if hasattr(self, '_state'):
            # We can not grab a lock in __del__(), so set a flag to signal the
            # serving daemon thread (if it exists) to initiate shutdown.
            self._state.server_deallocated = True

  

cygrpc.CompletionQueuecygrpc.Server都是呼叫底層的c++ core,我們不去管它。
再來看看這個_ServerState的程式碼

class _ServerState(object):

    # pylint: disable=too-many-arguments
    def __init__(self, completion_queue, server, generic_handlers,
                 interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):
        self.lock = threading.RLock()
        self.completion_queue = completion_queue
        self.server = server
        self.generic_handlers = list(generic_handlers)
        self.interceptor_pipeline = interceptor_pipeline
        self.thread_pool = thread_pool
        self.stage = _ServerStage.STOPPED
        self.termination_event = threading.Event()
        self.shutdown_events = [self.termination_event]
        self.maximum_concurrent_rpcs = maximum_concurrent_rpcs
        self.active_rpc_count = 0

        # TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.
        self.rpc_states = set()
        self.due = set()

        # A "volatile" flag to interrupt the daemon serving thread
        self.server_deallocated = False

  

從這裡我們可以看到,python 的 server 只是對底層的簡單封裝,關於網路IO的處理完全是底層的 c++ core 負責,python 主要負責呼叫開發者的介面處理請求。

2️⃣ 註冊介面方法
這步負責將我們開發好的介面註冊到伺服器上,呼叫的是編譯 proto 檔案生成的 _pb2_grpc 字尾檔案的函式。

def add_GreeterServicer_to_server(servicer, server):
    rpc_method_handlers = {
        'SayHello': grpc.unary_unary_rpc_method_handler(
            servicer.SayHello,#介面呼叫
            request_deserializer=helloworld__pb2.HelloRequest.FromString,#反系列方法
            response_serializer=helloworld__pb2.HelloReply.SerializeToString, #系列方法
        ),
        'SayHelloAgain': grpc.unary_unary_rpc_method_handler(
            servicer.SayHelloAgain,
            request_deserializer=helloworld__pb2.HelloRequest.FromString,
            response_serializer=helloworld__pb2.HelloReply.SerializeToString,
        ),
    }
    generic_handler = grpc.method_handlers_generic_handler(
        'Greeter', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))

  

請求的路由分發使用的是字典,key 是我們定義的介面名,value 則是一個命名元組,裡面儲存的我們的介面方法、序列化方法和反序列化。

3️⃣ 繫結監聽埠
這個最後是呼叫 c++ core 的程式碼,直接忽略

4️⃣ 服務啟動
serverstart 方法只是呼叫 _start 函式

    def start(self):
        _start(self._state)

  

def _start(state):
    with state.lock:
        if state.stage is not _ServerStage.STOPPED:
            raise ValueError('Cannot start already-started server!')
        state.server.start()
        state.stage = _ServerStage.STARTED
        _request_call(state)

        thread = threading.Thread(target=_serve, args=(state,))
        thread.daemon = True
        thread.start()