高效能佇列 Disruptor 使用教程
Disruptor 是什麼
Disruptor 是英國外匯交易公司LMAX開發的一個高效能佇列,研發的初衷是解決記憶體佇列的延遲問題(在效能測試中發現竟然與I/O操作處於同樣的數量級)。基於 Disruptor 開發的系統單執行緒能支撐每秒 600 萬訂單,2010 年在 QCon 演講後,獲得了業界關注。2011年,企業應用軟體專家 Martin Fowler 專門撰寫長文介紹。同年它還獲得了 Oracle 官方的 Duke 大獎。 從資料結構上來看,Disruptor 是一個支援 生產者 -> 消費者 模式的 環形佇列。能夠在 無鎖 的條件下進行並行消費,也可以根據消費者之間的依賴關係進行先後消費次序。本文將演示一些經典的場景如何通過 Disruptor 去實現。
新增依賴
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
複製程式碼
單生產者單消費者模式
首先建立一個 OrderEvent
類,這個類將會被放入環形佇列中作為訊息內容。
@Data
public class OrderEvent {
private String id;
}
複製程式碼
建立 OrderEventProducer
public class OrderEventProducer {
private final RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String orderId) {
long sequence = ringBuffer.next();
try {
OrderEvent orderEvent = ringBuffer.get(sequence);
orderEvent.setId(orderId);
} finally {
ringBuffer.publish(sequence);
}
}
}
複製程式碼
建立 OrderEventHandler
類,並實現 EventHandler<T>
和 WorkHandler<T>
介面,作為消費者。
@Slf4j
public class OrderEventHandler implements EventHandler<OrderEvent>,WorkHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event,long sequence,boolean endOfBatch) {
log.info("event: {},sequence: {},endOfBatch: {}",event,sequence,endOfBatch);
}
@Override
public void onEvent(OrderEvent event) {
log.info("event: {}",event);
}
}
複製程式碼
建立完上面三個類,我們就已經具備了 事件類
、生產者
、消費者
這三個要素了。接下來我們通過一個主方法來演示這一系列流程。
@Slf4j
public class DisruptorDemo {
public static void main(String[] args) throws InterruptedException {
Disruptor<OrderEvent> disruptor = new Disruptor<>(
OrderEvent::new,1024 * 1024,Executors.defaultThreadFactory(),ProducerType.SINGLE,new YieldingWaitStrategy()
);
disruptor.handleEventsWith(new OrderEventHandler());
disruptor.start();
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
eventProducer.onData(UUID.randomUUID().toString());
}
}
複製程式碼
單生產者多消費者
如果消費者是多個,只需要在呼叫 handleEventsWith
方法時將多個消費者傳遞進去。下面的程式碼傳遞了兩個消費者。
- disruptor.handleEventsWith(new OrderEventHandler());
+ disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
複製程式碼
上面傳入的兩個消費者會重複消費每一條訊息,如果想實現一條訊息在有多個消費者的情況下,只會被一個消費者消費,那麼需要呼叫 handleEventsWithWorkerPool
方法。
- disruptor.handleEventsWith(new OrderEventHandler());
+ disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());
複製程式碼
多生產者多消費者
在實際開發中,多個生產者傳送訊息,多個消費者處理訊息才是常態。這一點,Disruptor 也是支援的。多生產者多消費者的程式碼如下:
@Slf4j
public class DisruptorDemo {
public static void main(String[] args) throws InterruptedException {
Disruptor<OrderEvent> disruptor = new Disruptor<>(
OrderEvent::new,// 這裡的列舉修改為多生產者
ProducerType.MULTI,new YieldingWaitStrategy()
);
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());
disruptor.start();
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 建立一個執行緒池,模擬多個生產者
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(100);
for (int i = 0; i < 100; i++) {
fixedThreadPool.execute(() -> eventProducer.onData(UUID.randomUUID().toString()));
}
}
}
複製程式碼
消費者優先順序
Disruptor 可以做到的事情遠遠不止上面的內容。在實際場景中,我們通常會因為業務邏輯而形成一條消費鏈。比如一個訊息必須由 消費者A -> 消費者B -> 消費者C
的順序依次進行消費。在配置消費者時,可以通過 .then
方法去實現。如下:
disruptor.handleEventsWith(new OrderEventHandler())
.then(new OrderEventHandler())
.then(new OrderEventHandler());
複製程式碼
當然,handleEventsWith
與 handleEventsWithWorkerPool
都是支援 .then
的,它們可以結合使用。比如可以按照 消費者A -> (消費者B 消費者C) -> 消費者D
的消費順序
disruptor.handleEventsWith(new OrderEventHandler())
.thenHandleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler())
.then(new OrderEventHandler());
複製程式碼
總結
以上就是 Disruptor 高效能佇列的常用方法。其實 生成者 -> 消費者
模式是很常見的,通過一些訊息佇列也可以輕鬆做到上述的效果。不同的地方在於,Disruptor 是在記憶體中以佇列的方式去實現的,而且是無鎖的。這也是 Disruptor 為什麼高效的原因。