1. 程式人生 > >skynet源碼分析之socketchannel

skynet源碼分析之socketchannel

之一 方式 地址 ... closed pad 兩種模式 out sse

請求回應模式是與外部交互最常用的模式之一。通常協議設計方式有兩種:1.每個請求包對應一個回應包,有tcp保證時序,先請求的先回應,但不必收到回應才發送下一個請求,redis的協議就是這種類型;2.每個請求帶一個唯一的session標識,回應包也帶這個標識。這樣每個請求不一定都需要回應,且不用遵循先請求先回應的時序。mongodb的協議就是這種類型。skynet提供socketchannel庫封裝內部細節,支持上面兩種模式。詳情參考官方wiki https://github.com/cloudwu/skynet/wiki/SocketChannel

調用socketchannel.channel創建一個channel對象,必須提供ip地址(可以是域名)和端口。采用第一種還是第二種模式依據是否提供response參數,redis沒有提供說明用的第一種模式,mongo提供了(第13行)說明用第二種模式。

 1 -- lualib/skynet/db/redis.lua
 2  local channel = socketchannel.channel {
 3      host = db_conf.host,
 4      port = db_conf.port or 6379,
 5      auth = redis_login(db_conf.auth, db_conf.db),
 6      nodelay = true,
 7  }
 8  
 9  -- lualib/skynet/db/mongo.lua
10  obj.__sock = socketchannel.channel {
11 host = obj.host, 12 port = obj.port, 13 response = dispatch_reply, 14 auth = mongo_auth(obj), 15 backup = backup, 16 nodelay = true, 17 } 18 19 -- lualib/skynet/socketchannel.lua 20 function socket_channel.channel(desc) 21 local c = { 22 __host = assert
(desc.host), 23 __port = assert(desc.port), 24 __backup = desc.backup, 25 __auth = desc.auth, 26 __response = desc.response, -- It‘s for session mode 27 __request = {}, -- request seq { response func or session } -- It‘s for order mode 28 __thread = {}, -- coroutine seq or session->coroutine map 29 __result = {}, -- response result { coroutine -> result } 30 __result_data = {}, 31 __connecting = {}, 32 __sock = false, 33 __closed = false, 34 __authcoroutine = false, 35 __nodelay = desc.nodelay, 36 } 37 38 return setmetatable(c, channel_meta) 39 end

創建完對象後,可以手動調用connect連接對端,如果不connect,在第一次發送請求的時候會嘗試去連接。最終調用到connect_once,

第7行,調用socket庫api連接對端

第11行,fork一個協程專門處理收到回應包

15-21行,如果是模式1,收到回應包後的處理函數是dispatch_by_order,模式2則是dispatch_by_session

 1 -- lualib/skynet/socketchannel.lua
 2 local function connect_once(self)
 3     if self.__closed then
 4         return false
 5     end
 6     assert(not self.__sock and not self.__authcoroutine)
 7     local fd,err = socket.open(self.__host, self.__port)
 8     ...
 9 
10     self.__sock = setmetatable( {fd} , channel_socket_meta )
11     self.__dispatch_thread = skynet.fork(dispatch_function(self), self)
12     ...
13 end
14 
15 local function dispatch_function(self)
16     if self.__response then
17         return dispatch_by_session
18     else
19         return dispatch_by_order
20     end
21 end

接下來先介紹發送請求包的流程,之後再介紹如何處理回應包。調用者通過channel:request發送請求包,該接口有三個參數:參數request請求包數據;參數response在模式1下是一個function用來接收回應包,模式2下是一個唯一的session值;參數padding可選,表示將巨大消息拆分成多個小包發送出去。

第2行,檢測是否已連接,如果未連接,會嘗試去連接

第8行,調用socket庫把發送請求包。

第13-16行,不需要回應直接返回。

第18,23,35-48行,保存當前co。如果是模式2,保留session-co映射關系在self.__thread裏(38行);如果是模式1,保留response函數在self.__request裏,co在self.__threaad裏(41,42行)。

43-46行,如果有暫停的co在等待回應包,重啟它。

第24行,暫停當前co,等待對方回應包。當收到回應包時,回應處理函數會重啟它。

25-32行,返回結果給調用者。

 1 function channel:request(request, response, padding)
 2     assert(block_connect(self, true))       -- connect once
 3     local fd = self.__sock[1]
 4 
 5     if padding then
 6         ...
 7     else
 8         if not socket_write(fd , request) then
 9             sock_err(self)
10         end
11     end
12 
13     if response == nil then
14         -- no response
15         return
16     end
17 
18     return wait_for_response(self, response)
19 end
20 
21 local function wait_for_response(self, response)
22     local co = coroutine.running()
23     push_response(self, response, co)
24     skynet.wait(co)
25 
26     local result = self.__result[co]
27     self.__result[co] = nil
28     local result_data = self.__result_data[co]
29     self.__result_data[co] = nil
30     ...
31 
32     return result_data
33 end
34 
35 local function push_response(self, response, co)
36     if self.__response then
37         -- response is session
38         self.__thread[response] = co
39     else
40         -- response is a function, push it to __request
41         table.insert(self.__request, response)
42         table.insert(self.__thread, co)
43         if self.__wait_response then
44             skynet.wakeup(self.__wait_response)
45             self.__wait_response = nil
46         end
47     end
48 end

對於模式1的回應處理函數dispatch_by_order,

第4行,調用pop_response獲取第一個未回應的請求包的response和co

第6行,調用response函數,response函數調用socket庫的readline/read(24行)來等待socket上的返回,是一個阻塞操作。等socket返回後,response函數返回

第11-16行,返回結果保存在self.__result_data

第17行,重啟調用者發送請求包的co,把結果返回給調用者(上面代碼的26-32行),至此完成一次與對端請求回應交互

 1 -- lualib/skynet/socketchannel.lua
 2 local function dispatch_by_order(self)
 3     while self.__sock do
 4         local func, co = pop_response(self)
 5         ...
 6         local ok, result_ok, result_data, padding = pcall(func, self.__sock)
 7         if ok then
 8             if padding and result_ok then
 9                 ...
10             else
11                 self.__result[co] = result_ok
12                 if result_ok and self.__result_data[co] then
13                     table.insert(self.__result_data[co], result_data)
14                 else
15                     self.__result_data[co] = result_data
16                 end
17                 skynet.wakeup(co)
18             end
19         end
20 end
21 
22 -- lualib/skynet/db/redis.lua
23 local function read_response(fd)
24     local result = fd:readline "\r\n"
25     local firstchar = string.byte(result)
26     local data = string.sub(result,2)
27     return redcmd[firstchar](fd,data)
28 end

對於模式2的回應處理函數dispatch_by_session,

第6行,調用response函數,response函數會調用socket庫的readline/read(30行)來等待socket上的返回,是一個阻塞操作。等socket返回後,response函數返回回應包(回應包包含唯一的session)

第8行,通過session獲取對應的co

第13-21行,接下來處理跟上面一樣,保存回應包內容,重啟co。

 1  -- lualib/skynet/socketchannel.lua
 2  local function dispatch_by_session(self)
 3      local response = self.__response
 4      -- response() return session
 5      while self.__sock do
 6          local ok , session, result_ok, result_data, padding = pcall(response, self.__sock)
 7          if ok and session then
 8              local co = self.__thread[session]
 9              if co then
10                  if padding and result_ok then
11                      ...
12                  else
13                      self.__thread[session] = nil
14                      self.__result[co] = result_ok
15                      if result_ok and self.__result_data[co] then
16                          table.insert(self.__result_data[co], result_data)
17                      else
18                          self.__result_data[co] = result_data
19                      end
20                      skynet.wakeup(co)
21                  end
22              else
23                  self.__thread[session] = nil
24                  skynet.error("socket: unknown session :", session)
25              end
26  end
27  
28  -- lualib/skynet/db/mongo.lua
29  local function dispatch_reply(so)
30      local len_reply = so:read(4)
31      local reply     = so:read(driver.length(len_reply))
32      ...
33      return reply_id, succ, result
34  end

skynet源碼分析之socketchannel