elasticsearch原始碼分析之服務端(四)
一、服務端接收
1.1接收訊息
在客戶端分析中已經提到,netty中通訊的處理類是MessageChannelHandler,其中messageReceived方法用來處理訊息。
1.1.1解析資料流
1.1.2呼叫handleRequest,此處讀取出來action,跟不同的action生成具體的TransportRequest子類和不同的HandledTransportAction。
其中生成reg和request解析如下:
最終執行到了TransportBulkAction類的doExecute方法,處理bulkRequest。
1.2主要類UML圖
server端接收message,解析協議,根據action生成不同request和transportAction,進而執行transportAction.execute(request,listener)。
此處的結構會有很多具體的TransportAction類實現了HandeldTransportAction,實現了不同的響應介面。
二、服務端處理
2.1索引的處理
2.1.1索引分類處理
根據上面的分析,bulk的處理執行到TransportBulkAction類的doExecute(),程式碼如下:
此處會分析bulkRequest中的所有請求中的index和type,生成 Map<String, Set<String>> indicesAndTypes,然後遍歷indicesAndTypes,分index執行不同的bulk。
2.1.2生成_id
遍歷所有的request,對其做一些加工,主要包括:獲取routing(如果mapping裡有的話)、指定的timestamp(如果沒有帶timestamp會使用當前時間),如果沒有指定id欄位,在action.bulk.action.allow_id_generation
配置為true的情況下,會自動生成一個base64UUID
2.1.3生成shardid
再次遍歷所有的request,獲取獲取每個request應該傳送到的shardId,獲取的過程是這樣的:request有routing(上面獲取到的)就直接返回,如果沒有,會先對id求一個hash
分片演算法如下:
hash函式預設使用的是Murmur3,也可以通過配置index.legacy.routing.hash.type
來決定使用的hash函式。
其實上面就是要對request按shard來分組,進而進行分片處理。
2.1.4分片處理requst
遍歷,對不同的分組建立一個bulkShardRequest,包含配置consistencyLevel和timeout。執行BrokerTransportShardBulkAction.execute(bulkShardRequest,listener),
建立一個brokerPhase執行緒,並執行。
brokerPhase首先解析處理分片主節點,然後請求主節點上執行。
此時請求會轉發到node節點。
2.2寫入primary
2.2.1PrimaryPhase
請求轉發到node節點的主節點的TransportReplicationAction.execute(),建立一個PrimaryPhase執行緒,並執行。呼叫routeRequestOrPerformLocally方法,呼叫performOnPrimary方法
其中第一個紅框表示主分片上執行索引操作,第二個紅框生成ReplicationPhase執行緒,供後期完成執行副本任務,副本任務暫不分析。
2.2.2生成document
在shardOperationOnPrimary-->shardIndexOperation-->executeIndexRequestOnPrimary中有
prepareIndexOperationOnPrimary方法,其中生成了doc
TransportReplicationAction類
IndexShard類
2.2.3生成索引
在shardOperationOnPrimary-->shardIndexOperation-->executeIndexRequestOnPrimary-->Engine.execute()-->indexWriter.add/updateDocument
至此,索引資料寫入完成,但是隻是將資料寫入了buffer和translog裡面。後面還有refresh和flush,保證資料可查和安全性。
資料doc的格式如下: