(12)自定義資料流(實戰Docker事件推送的REST API)——響應式Spring的道法術器
2.2 自定義資料流
這一小節介紹如何通過定義相應的事件(onNext
、onError
和onComplete
) 建立一個 Flux 或 Mono。Reactor提供了generate
、create
、push
和handle
等方法,所有這些方法都使用 sink(池)來生成資料流。
sink,顧名思義,就是池子,可以想象一下廚房水池的樣子。如下圖所示:
下面介紹到的方法都有一個sink提供給方法使用者,通常至少會暴露三個方法給我們,next
、error
和complete
。next和error相當於兩個下水口,我們不斷將自定義的資料放到next口,Reactor就會幫我們串成一個Publisher資料流,直到有一個錯誤資料放到error口,或按了一下complete
本文測試原始碼。
2.2.1 generate
generate
是一種同步地,逐個地發出資料的方法。因為它提供的sink是一個SynchronousSink
, 而且其next()
方法在每次回撥的時候最多隻能被呼叫一次。
generate
方法有三種簽名:
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
public static <T, S> Flux<T> generate (Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
1)使用SynchronousSink生成資料流
@Test
public void testGenerate1() {
final AtomicInteger count = new AtomicInteger(1); // 1
Flux.generate(sink -> {
sink.next(count.get() + " : " + new Date()); // 2
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (count.getAndIncrement() >= 5) {
sink.complete(); // 3
}
}).subscribe(System.out::println); // 4
}
- 用於計數;
- 向“池子”放自定義的資料;
- 告訴
generate
方法,自定義資料已發完; - 觸發資料流。
輸出結果為每1秒鐘列印一下時間,共列印5次。
2)增加一個伴隨狀態
對於上邊的例子來說,count
用於記錄狀態,當值達到5之後就停止計數。由於在lambda內部使用,因此必須是final型別的,且不能是原生型別(如int
)或不可變型別(如Integer
)。
如果使用第二個方法簽名,上邊的例子可以這樣改:
@Test
public void testGenerate2() {
Flux.generate(
() -> 1, // 1
(count, sink) -> { // 2
sink.next(count + " : " + new Date());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (count >= 5) {
sink.complete();
}
return count + 1; // 3
}).subscribe(System.out::println);
}
- 初始化狀態值;
- 第二個引數是
BiFunction
,輸入為狀態和sink; - 每次迴圈都要返回新的狀態值給下次使用。
3)完成後處理
第三個方法簽名除了狀態、sink外,還有一個Consumer
,這個Consumer
在資料流發完後執行。
Flux.generate(
() -> 1,
(count, sink) -> {
sink.next(count + " : " + new Date());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (count >= 5) {
sink.complete();
}
return count + 1;
}, System.out::println) // 1
.subscribe(System.out::println);
}
- 最後將count值打印出來。
如果 state 使用了資料庫連線或者其他需要進行清理的資源,這個 Consumer lambda 可以用來在最後完成資源清理任務。
2.2.2 create
create
是一個更高階的建立Flux的方法,其生成資料流的方式既可以是同步的,也可以是非同步的,並且還可以每次發出多個元素。
create
用到了FluxSink
,後者同樣提供 next,error 和 complete 等方法。 與generate不同的是,create不需要狀態值,另一方面,它可以在回撥中觸發多個事件(即使事件是發生在未來的某個時間)。
create 常用的場景就是將現有的 API 轉為響應式,比如監聽器的非同步方法。
先編寫一個事件源:
public class MyEventSource {
private List<MyEventListener> listeners;
public MyEventSource() {
this.listeners = new ArrayList<>();
}
public void register(MyEventListener listener) { // 1
listeners.add(listener);
}
public void newEvent(MyEvent event) {
for (MyEventListener listener :
listeners) {
listener.onNewEvent(event); // 2
}
}
public void eventStopped() {
for (MyEventListener listener :
listeners) {
listener.onEventStopped(); // 3
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class MyEvent { // 4
private Date timeStemp;
private String message;
}
}
- 註冊監聽器;
- 向監聽器發出新事件;
- 告訴監聽器事件源已停止;
- 事件類,使用了lombok註解。
準備一個監聽器介面,它可以監聽上邊第2和3的兩種事件:(1)新的MyEvent
到來;(2)事件源停止。如下:
public interface MyEventListener {
void onNewEvent(MyEventSource.MyEvent event);
void onEventStopped();
}
下面的測試方法邏輯是:建立一個監聽器註冊到事件源,這個監聽器再收到事件回撥的時候通過Flux.create
的sink將一系列事件轉換成非同步的事件流:
@Test
public void testCreate() throws InterruptedException {
MyEventSource eventSource = new MyEventSource(); // 1
Flux.create(sink -> {
eventSource.register(new MyEventListener() { // 2
@Override
public void onNewEvent(MyEventSource.MyEvent event) {
sink.next(event); // 3
}
@Override
public void onEventStopped() {
sink.complete(); // 4
}
});
}
).subscribe(System.out::println); // 5
for (int i = 0; i < 20; i++) { // 6
Random random = new Random();
TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i));
}
eventSource.eventStopped(); // 7
}
- 事件源;
- 向事件源註冊用匿名內部類建立的監聽器;
- 監聽器在收到事件回撥的時候通過sink將事件再發出;
- 監聽器再收到事件源停止的回撥的時候通過sink發出完成訊號;
- 觸發訂閱(這時候還沒有任何事件產生);
- 迴圈產生20個事件,每個間隔不超過1秒的隨機時間;
- 最後停止事件源。
執行一下這個測試方法,20個MyEvent
陸續打印出來。
如果將上邊的create
方法換成generate
方法,則會報出異常:
java.lang.IllegalStateException: The generator didn't call any of the SynchronousSink method
證明generate
並不支援非同步的方式。
create
方法還有一個變體方法push
,適合生成事件流。與 create類似,
push 也可以是非同步地, 並且能夠使用以上各種回壓策略。所以上邊的例子可以替換為push
方法。區別在於,push
方法中,呼叫next
、complete
或error
的必須是同一個執行緒。
除了next
、complete
或error
方法外,FluxSink
還有onRequest
方法,這個方法可以用來響應下游訂閱者的請求事件。從而不僅可以像上一個例子那樣,上游在資料就緒的時候將其推送到下游,同時下游也可以從上游拉取已經就緒的資料。這是一種推送/拉取混合的模式。比如:
Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {
public void onMessage(List<String> messages) {
for(String s : messages) {
sink.next(s); // 1
}
}
});
sink.onRequest(n -> { // 2
List<String> messages = myMessageProcessor.request(n); // 3
for(String s : message) {
sink.next(s);
}
});
...
}
- push方式,主動向下游發出資料;
- 在下游發出請求時被呼叫;
- 響應下游的請求,查詢是否有可用的message。
2.2.3 實戰Docker事件推送API
Docker提供了一個用來監聽事件的命令:docker events
,執行這個命令後,會監聽docker daemon的事件並打印出來,執行是持續進行的,就像top
或前邊介紹的mongostat
命令一樣。Docker的java開發包的DockerClient
也提供了相應的API,這個API是基於回撥的,因此我們就可以使用Reactor的create
方法,將這個基於回撥的API轉換為響應式流,流中的資料就是一個一個的docker事件。如下圖所示:
1)測試DockerClient
首先,我們先啟動docker。
然後,我們繼續用第一章的webflux-demo
maven專案模組,在pom.xml
中新增Docker開發相關的依賴:
<!--docker client begin-->
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
<version>3.0.14</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>2.26</version>
</dependency>
<!--docker client end-->
最後編寫測試方法:
public class DockerEventTest {
@Test
public void dockerEventToFlux() throws InterruptedException {
collectDockerEvents().subscribe(System.out::println); // 5
TimeUnit.MINUTES.sleep(1); // 6
}
private Flux<Event> collectDockerEvents() {
DockerClient docker = DockerClientBuilder.getInstance().build(); // 1
return Flux.create((FluxSink<Event> sink) -> {
EventsResultCallback callback = new EventsResultCallback() { // 2
@Override
public void onNext(Event event) { // 3
sink.next(event);
}
};
docker.eventsCmd().exec(callback); // 4
});
}
}
- 建立DockerClient,預設會連線
tcp://localhost:2375
,2375是docker預設的埠號,可以通過指定的IP和埠連線docker daemon:DockerClientBuilder.getInstance("tcp://192.168.0.123:2375").build()
,不過要注意docker daemon監聽介面和防火牆的配置。 - 自定義回撥類。
- 當有docker事件產生時,會回撥
onNext
,這時候通過FluxSink
的next
方法將Event
物件發出。 - 開始對docker事件進行監聽。
- 通過訂閱的方式打印出來。
- 主執行緒會立刻返回,因此等待1分鐘。
OK,看一下效果。
為了方便對比,我們首先在終端執行docker events
命令,然後在另一個終端進行docker操作,比如本例:
docker run -it -m 200M --memort-swap=200M progrium/stress --vm 1 --vm-bytes 300M
progrium/stress
是一個用於壓力測試的容器,通過-m 200M
指定為該容器的執行最多分配200M記憶體,然後在壓力測試的時候,通過--vm-bytes 300M
使其執行時嘗試分配300M的記憶體,此時會出現記憶體不足(OOM)的錯誤並導致容器被殺死(single 9)。
如圖所示,上方是分別執行兩個命令的終端視窗,可以看到docker events
命令打印出了一系列事件,如果是第一個執行progrium/stress
應該回先有一個pull映象的事件。下方是我們的測試程式碼的輸出,除了一些日誌之外,可以看到這些事件也被輸出了。
2)REST API推送到前端
下面,我們更進一步將Event事件通過REST API推送到瀏覽器端,看過第1.3.3節的話,對這一塊兒應該是輕車熟路了。
(一)首先定義一個我們自己的DockerEvent
,這一步不是必須的哈,不過DockerClient
返回的Event
本身欄位比較多,通常前端展示的話會轉換為dvo,“戲要做足”嘛,哈哈。
DockerEvent.java
@Data
@Document(collection = "docker-event")
public class DockerEvent {
@Indexed
private String status;
@Id
private String id;
private String from;
private Node node;
private EventType type;
private String action;
private String actorId;
private Long time;
private Long timeNano;
}
(二)然後就是DAO層了,建立一個DockerEventMongoRepository
,增加三個@Tailable
的查詢方法,分別用於查詢全部、按照狀態查詢和按型別+名稱查詢(比如查詢某某容器的事件):
DockerEventMongoRepository.java
public interface DockerEventMongoRepository extends ReactiveMongoRepository<DockerEvent, String> {
@Tailable
Flux<DockerEvent> findBy();
@Tailable
Flux<DockerEvent> findByStatus(String status);
@Tailable
Flux<DockerEvent> findByTypeAndFrom(String type, String from);
}
(三)定義一個CommandLineRunner
,用於在應用啟動後即開始監聽docker事件:
DockerEventsCollector.java
@Slf4j
@Component
public class DockerEventsCollector implements CommandLineRunner {
private DockerEventMongoRepository dockerEventMongoRepository;
private MongoTemplate mongo; // 1
public DockerEventsCollector(DockerEventMongoRepository dockerEventMongoRepository, MongoTemplate mongo) { // 1
this.dockerEventMongoRepository = dockerEventMongoRepository;
this.mongo= mongo;
}
@Override
public void run(String... args) {
mongo.dropCollection(DockerEvent.class); // 2
mongo.createCollection(DockerEvent.class, CollectionOptions.empty().maxDocuments(200).size(100000).capped()); // 2
dockerEventMongoRepository.saveAll(collect()).subscribe(); // 6
}
private Flux<DockerEvent> collect() { // 3
DockerClient docker = DockerClientBuilder.getInstance().build();
return Flux.create((FluxSink<Event> sink) -> {
EventsResultCallback callback = new EventsResultCallback() {
@Override
public void onNext(Event event) {
sink.next(event);
}
};
docker.eventsCmd().exec(callback);
})
.map(this::trans) // 4
.doOnNext(e -> log.info(e.toString())); // 5
}
private DockerEvent trans(Event event) { // 4
DockerEvent dockerEvent = new DockerEvent();
dockerEvent.setAction(event.getAction());
dockerEvent.setActorId(Objects.requireNonNull(event.getActor()).getId());
dockerEvent.setFrom(event.getFrom() == null ? null : event.getFrom().replace("//", "_"));
dockerEvent.setId(UUID.randomUUID().toString());
dockerEvent.setNode(event.getNode());
dockerEvent.setStatus(event.getStatus());
dockerEvent.setTime(event.getTime());
dockerEvent.setTimeNano(event.getTimeNano());
dockerEvent.setType(event.getType());
return dockerEvent;
}
}
- 這裡使用的是
MongoTemplate
,Spring 4.3 之後,如果有構造方法,Spring會自動注入,不需要@Autowired
註解了。 - 每次啟動應用針對
DockerEvent
建立“capped”的collection,方便測試,如果提前手動建立好的話可以不加這兩句。如果在//1處使用的是響應式的ReactiveMongoTemplate
,因為是非同步的,所以要用then()
或thenMany()
將後續的所有操作連線起來,如mongo.dropCollection(...).then(mongo.createCollection(...)).thenMany(dockerEventMongoRepository.saveAll(collect()))
,保證能先後依次執行。 - 監聽docker事件的方法。
- 將返回的
Event
轉換為我們定義的DockerEvent
,其中DockerEvent.from
欄位是事件主體名稱,比如容器名,可能有/
,因此進行一個字元替換,否則在URL中會有問題。 - 列印個日誌(可選)。
- 將收集的
DockerEvent
儲存到MongoDB,用subscribe()
觸發執行。
(四)Service層沒有啥邏輯,我們直接寫Controller:
DockerEventController.java
@Slf4j
@RestController
@RequestMapping(value = "/docker/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) // 1
public class DockerEventController {
private DockerEventMongoRepository dockerEventMongoRepository;
public DockerEventController(DockerEventMongoRepository dockerEventMongoRepository) {
this.dockerEventMongoRepository = dockerEventMongoRepository;
}
@GetMapping
public Flux<DockerEvent> dockerEventStream() { // 2
return dockerEventMongoRepository.findBy();
}
@GetMapping("/{type}/{from}")
public Flux<DockerEvent> dockerEventStream(@PathVariable("type") String type, @PathVariable("from") String from) { // 3
return dockerEventMongoRepository.findByTypeAndFrom(type, from);
}
@GetMapping("/{status}")
public Flux<DockerEvent> dockerEventStream(@PathVariable String status) { // 4
return dockerEventMongoRepository.findByStatus(status);
}
}
OK了,啟動試一下:
可以看到,右側的瀏覽器的小圖示一直在旋轉,表示持續接收推送中,當在終端中進行docker操作的時候,所產生的事件就立刻出現在瀏覽器中了。如果請求/docker/events/oom
將只推送OOM事件,如果請求/docker/events/container/progrium_stress
將只推送來自容器progrium/stress的事件。
再次提醒,當capped 的 Collection中一條資料都沒有的時候,
@Tailable
的API也會立刻返回,所以需要等到資料庫中有至少一條資料之後(比如先執行以下pull),再在瀏覽器中請求docker/events
API。