Cat原始碼分析(二):Server端
初始化
服務端消費客戶端發來的訊息進行分析和展示,所以這個的初始化指的是CatHomeModule的初始化
CatHomeModule依賴TcpSocketReceiver和MessageConsumer,前者用來接收客戶端傳送的訊息,後者用來消費訊息。
TcpSocketReceiver通過Messagecodec對MessageQueue中的MessageTree進行解碼,還原成為MessageTree,然後通過MessageHandler呼叫Consumer對訊息進行消費。(這個消費的過程其實是一個訊息分發的過程。訊息有不同的訊息分析器)
消費的過程是一個週期性的過程,對應上圖右邊部分。一個Period代表一個週期,每個週期對應一個持續時間(duration),預設為一小時。
RealTimeConsumer是MessageConsumer的實現類,他的作用是進行實時的消費,如何實現週期性消費呢?他需要依賴PeriodManager,進行週期管理。所以在初始化MessageConsumer的過程中會初始化PeriodManager。並且開啟periodmanager的守護執行緒,進行週期開始和結束的控制。
通過m_strategy.next(now)方法進行時間對比,返回大於零或小於零的值,來決定是開始新的週期還是結束舊的週期。這個執行緒是每隔一秒執行一次的。
public long next(long now) { long startTime = now - now % m_duration;//得到一個整點的時間,作為開始時間,如果now是10:05,startTime就是10:00 Date nowTime = new Date(startTime); // for current period 當前週期返回開始時間, //第一次進入的時候m_lastStartTime初始化為週期開始時間 if (startTime > m_lastStartTime) { m_lastStartTime = startTime; return startTime; } // prepare next period ahead //下一個時期 返回大於0的值,則開始新的週期 if (now - m_lastStartTime >= m_duration - m_aheadTime) { m_lastStartTime = startTime + m_duration; return startTime + m_duration; } // last period is over 上一個週期已經結束了 //返回小於零的值,銷燬上一個週期 if (now - m_lastEndTime >= m_duration + m_extraTime) { long lastEndTime = m_lastEndTime; m_lastEndTime = startTime; return -lastEndTime; }
在初始化週期管理器的時候,會執行startPeriod方法,來開啟一個週期。例項化Period,在這個過程中,會通過analyzerManager得到12種分析器,並宣告一個m_task的HashMap<String,List> key就是分析器名稱,value就是List,分析器不同對應的list的size也不同,例如transaction分析比較耗時,就會分配兩個PeriodTask,如下圖
private void startPeriod(long startTime) { long endTime = startTime + m_strategy.getDuration(); Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger); m_periods.add(period); period.start(); }
訊息消費
例項化完成後,將period加入到m_periods中,然後呼叫period.start方法,這個方法迴圈m_task每一種分析器,啟動periodTask的執行緒,進行analyze,這個方法會一直對佇列進行輪詢,從佇列中取出tree,進行process處理, process是抽象方法,具體會由重寫了該方法的子類去執行process方法
for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
List<PeriodTask> taskList = tasks.getValue();
for (int i = 0; i < taskList.size(); i++) {
PeriodTask task = taskList.get(i);
task.setIndex(i);
Threads.forGroup("Cat-RealtimeConsumer").start(task);
}
}
@Override
public void run() {
try {
m_analyzer.analyze(m_queue);
} catch (Exception e) {
Cat.logError(e);
}
}
訊息分發
講完有關週期的初始化過程,我們在回頭看看server接收到訊息是如何放到消費佇列中的
tcp接收到訊息---->MessageDocoder: decode—>DefaulMessageHandler : handle(MessageTree tree) m_consumer.consume(tree)----> RealtimeConsumer consume方法
@Override
public void consume(MessageTree tree) {
long timestamp = tree.getMessage().getTimestamp();
Period period = m_periodManager.findPeriod(timestamp);
if (period != null) {
period.distribute(tree);
} else {
m_serverStateManager.addNetworkTimeError(1);
}
}
- 根據當前時間,找到當前時間對應的週期Period
- 然後呼叫週期的distribute方法進行訊息的分發
a) 迴圈m_tasks
i. 獲得list中的某一個PeriodTask
ii. 將訊息放到佇列(m_queue)中
可以看出,一個訊息將被放到所有型別的訊息分析器進行分析
寫在後邊
分析器將與後邊做具體的分析
說明:
本文涉及到的UML圖片來源於大眾點評Cat–Server模組架構分析這篇文章,部分內容也有參考