1. 程式人生 > >Cat原始碼分析(二):Server端

Cat原始碼分析(二):Server端

初始化

服務端消費客戶端發來的訊息進行分析和展示,所以這個的初始化指的是CatHomeModule的初始化

在這裡插入圖片描述

CatHomeModule依賴TcpSocketReceiver和MessageConsumer,前者用來接收客戶端傳送的訊息,後者用來消費訊息。
TcpSocketReceiver通過Messagecodec對MessageQueue中的MessageTree進行解碼,還原成為MessageTree,然後通過MessageHandler呼叫Consumer對訊息進行消費。(這個消費的過程其實是一個訊息分發的過程。訊息有不同的訊息分析器)
消費的過程是一個週期性的過程,對應上圖右邊部分。一個Period代表一個週期,每個週期對應一個持續時間(duration),預設為一小時。
RealTimeConsumer是MessageConsumer的實現類,他的作用是進行實時的消費,如何實現週期性消費呢?他需要依賴PeriodManager,進行週期管理。所以在初始化MessageConsumer的過程中會初始化PeriodManager。並且開啟periodmanager的守護執行緒,進行週期開始和結束的控制。

在這裡插入圖片描述

通過m_strategy.next(now)方法進行時間對比,返回大於零或小於零的值,來決定是開始新的週期還是結束舊的週期。這個執行緒是每隔一秒執行一次的。

public long next(long now) {
   long startTime = now - now % m_duration;//得到一個整點的時間,作為開始時間,如果now是10:05,startTime就是10:00
   Date nowTime = new Date(startTime);

   // for current period 當前週期返回開始時間,
//第一次進入的時候m_lastStartTime初始化為週期開始時間
   if (startTime > m_lastStartTime) {
      m_lastStartTime = startTime;
      return startTime;
   }

   // prepare next period ahead  
//下一個時期  返回大於0的值,則開始新的週期
   if (now - m_lastStartTime >= m_duration - m_aheadTime) {
      m_lastStartTime = startTime + m_duration;
      return startTime + m_duration;
   }

   // last period is over 上一個週期已經結束了
//返回小於零的值,銷燬上一個週期
   if (now - m_lastEndTime >= m_duration + m_extraTime) {
      long lastEndTime = m_lastEndTime;
      m_lastEndTime = startTime;
      return -lastEndTime;
   }

在初始化週期管理器的時候,會執行startPeriod方法,來開啟一個週期。例項化Period,在這個過程中,會通過analyzerManager得到12種分析器,並宣告一個m_task的HashMap<String,List> key就是分析器名稱,value就是List,分析器不同對應的list的size也不同,例如transaction分析比較耗時,就會分配兩個PeriodTask,如下圖

在這裡插入圖片描述

private void startPeriod(long startTime) {
   long endTime = startTime + m_strategy.getDuration();
   Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger);

   m_periods.add(period);
   period.start();
}

訊息消費

例項化完成後,將period加入到m_periods中,然後呼叫period.start方法,這個方法迴圈m_task每一種分析器,啟動periodTask的執行緒,進行analyze,這個方法會一直對佇列進行輪詢,從佇列中取出tree,進行process處理, process是抽象方法,具體會由重寫了該方法的子類去執行process方法

在這裡插入圖片描述

for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
      List<PeriodTask> taskList = tasks.getValue();

      for (int i = 0; i < taskList.size(); i++) {
         PeriodTask task = taskList.get(i);

         task.setIndex(i);

         Threads.forGroup("Cat-RealtimeConsumer").start(task);
      }
   }

@Override
public void run() {
   try {
      m_analyzer.analyze(m_queue);
   } catch (Exception e) {
      Cat.logError(e);
   }
}

訊息分發

講完有關週期的初始化過程,我們在回頭看看server接收到訊息是如何放到消費佇列中的

tcp接收到訊息---->MessageDocoder: decode—>DefaulMessageHandler : handle(MessageTree tree) m_consumer.consume(tree)----> RealtimeConsumer consume方法
在這裡插入圖片描述

@Override
public void consume(MessageTree tree) {
   long timestamp = tree.getMessage().getTimestamp();
   Period period = m_periodManager.findPeriod(timestamp);

   if (period != null) {
      period.distribute(tree);
   } else {
      m_serverStateManager.addNetworkTimeError(1);
   }
}

  1. 根據當前時間,找到當前時間對應的週期Period
  2. 然後呼叫週期的distribute方法進行訊息的分發
    a) 迴圈m_tasks
    i. 獲得list中的某一個PeriodTask
    ii. 將訊息放到佇列(m_queue)中

可以看出,一個訊息將被放到所有型別的訊息分析器進行分析

寫在後邊

分析器將與後邊做具體的分析

說明:
本文涉及到的UML圖片來源於大眾點評Cat–Server模組架構分析這篇文章,部分內容也有參考