1. 程式人生 > 程式設計 >高效能佇列 Disruptor 使用教程

高效能佇列 Disruptor 使用教程

Disruptor 是什麼

Disruptor 是英國外匯交易公司LMAX開發的一個高效能佇列,研發的初衷是解決記憶體佇列的延遲問題(在效能測試中發現竟然與I/O操作處於同樣的數量級)。基於 Disruptor 開發的系統單執行緒能支撐每秒 600 萬訂單,2010 年在 QCon 演講後,獲得了業界關注。2011年,企業應用軟體專家 Martin Fowler 專門撰寫長文介紹。同年它還獲得了 Oracle 官方的 Duke 大獎。 從資料結構上來看,Disruptor 是一個支援 生產者 -> 消費者 模式的 環形佇列。能夠在 無鎖 的條件下進行並行消費,也可以根據消費者之間的依賴關係進行先後消費次序。本文將演示一些經典的場景如何通過 Disruptor 去實現。

新增依賴

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>
複製程式碼

單生產者單消費者模式

首先建立一個 OrderEvent 類,這個類將會被放入環形佇列中作為訊息內容。

@Data
public class OrderEvent {
    private String id;
}
複製程式碼

建立 OrderEventProducer

類,它將作為生產者使用。

public class OrderEventProducer {
    private final RingBuffer<OrderEvent> ringBuffer;
    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public void onData(String orderId) {
        long sequence = ringBuffer.next();
        try
{ OrderEvent orderEvent = ringBuffer.get(sequence); orderEvent.setId(orderId); } finally { ringBuffer.publish(sequence); } } } 複製程式碼

建立 OrderEventHandler 類,並實現 EventHandler<T>WorkHandler<T> 介面,作為消費者。

@Slf4j
public class OrderEventHandler implements EventHandler<OrderEvent>,WorkHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event,long sequence,boolean endOfBatch) {
        log.info("event: {},sequence: {},endOfBatch: {}",event,sequence,endOfBatch);
    }
    @Override
    public void onEvent(OrderEvent event) {
        log.info("event: {}",event);
    }
}
複製程式碼

建立完上面三個類,我們就已經具備了 事件類生產者消費者 這三個要素了。接下來我們通過一個主方法來演示這一系列流程。

@Slf4j
public class DisruptorDemo {
    public static void main(String[] args) throws InterruptedException {
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                OrderEvent::new,1024 * 1024,Executors.defaultThreadFactory(),ProducerType.SINGLE,new YieldingWaitStrategy()
        );
        disruptor.handleEventsWith(new OrderEventHandler());
        disruptor.start();
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
        eventProducer.onData(UUID.randomUUID().toString());
    }
}
複製程式碼

單生產者多消費者

如果消費者是多個,只需要在呼叫 handleEventsWith 方法時將多個消費者傳遞進去。下面的程式碼傳遞了兩個消費者。

- disruptor.handleEventsWith(new OrderEventHandler());
+ disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
複製程式碼

上面傳入的兩個消費者會重複消費每一條訊息,如果想實現一條訊息在有多個消費者的情況下,只會被一個消費者消費,那麼需要呼叫 handleEventsWithWorkerPool 方法。

- disruptor.handleEventsWith(new OrderEventHandler());
+ disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());
複製程式碼

多生產者多消費者

在實際開發中,多個生產者傳送訊息,多個消費者處理訊息才是常態。這一點,Disruptor 也是支援的。多生產者多消費者的程式碼如下:

@Slf4j
public class DisruptorDemo {
    public static void main(String[] args) throws InterruptedException {
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                OrderEvent::new,// 這裡的列舉修改為多生產者
                ProducerType.MULTI,new YieldingWaitStrategy()
        );
        disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());
        disruptor.start();
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
        // 建立一個執行緒池,模擬多個生產者
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 100; i++) {
            fixedThreadPool.execute(() -> eventProducer.onData(UUID.randomUUID().toString()));
        }
    }
}
複製程式碼

消費者優先順序

Disruptor 可以做到的事情遠遠不止上面的內容。在實際場景中,我們通常會因為業務邏輯而形成一條消費鏈。比如一個訊息必須由 消費者A -> 消費者B -> 消費者C 的順序依次進行消費。在配置消費者時,可以通過 .then 方法去實現。如下:

disruptor.handleEventsWith(new OrderEventHandler())
         .then(new OrderEventHandler())
         .then(new OrderEventHandler());
複製程式碼

當然,handleEventsWithhandleEventsWithWorkerPool 都是支援 .then 的,它們可以結合使用。比如可以按照 消費者A -> (消費者B 消費者C) -> 消費者D 的消費順序

disruptor.handleEventsWith(new OrderEventHandler())
         .thenHandleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler())
         .then(new OrderEventHandler());
複製程式碼

總結

以上就是 Disruptor 高效能佇列的常用方法。其實 生成者 -> 消費者 模式是很常見的,通過一些訊息佇列也可以輕鬆做到上述的效果。不同的地方在於,Disruptor 是在記憶體中以佇列的方式去實現的,而且是無鎖的。這也是 Disruptor 為什麼高效的原因。

參考連結