1. 程式人生 > >支援流式處理ACID事務!Flink團隊開源新作Streaming Ledger

支援流式處理ACID事務!Flink團隊開源新作Streaming Ledger

開發十年,就只剩下這套架構體系了! >>>   

流式處理的下一個演化步驟

 

在 data Artisans,我們目睹了資料流式處理的瘋狂式增長,從早期階段到快速增長的市場,預計到 2025 年將達到近 500 億美元的規模。從 Apache Flink 建立之初,我們就堅信流式處理是一項可以為企業的任務關鍵型應用程式提供強大支援的技術。

隨著越來越多的行業採用流式處理,併為越來越多的任務關鍵型應用程式提供支援,這項技術本身也在不斷髮展,以便為資料和計算的正確性提供更好的保證。

 

 第 1 步:資料分析的分散式流式處理(“至少一次保證”)

 

第一個分散式流式處理器的主要目標是資料分析應用程式,提供了一種實時非精確和非同步精確的分析方法。這種方式被稱為“Lambda 架構”,流式處理器基於到達的資料提供不精確的分析結果,同時批處理器以小時或天為單位提供精確的分析結果。這種流式處理保證被稱為“至少一次處理”。這是流式處理系統能夠提供的最弱的正確性保證,是流式處理技術的第一步。

 

 第 2 步:單鍵應用程式的分散式流式處理(“恰好一次保證”)

 

Apache Flink 率先採用了真正的有狀態流式處理,提供了大規模的“恰好一次保證”。這樣就可以通過能過提供強正確性保證的流式處理技術來構建分析型別和事務型別的應用程式,減少對 Lambda 架構和批處理器的需求。

如今,市場上有很多流式處理器都提供了強大的一致性保證,但僅適用於特定型別的應用程式,這些應用程式每次只更新單個鍵。也就是說,如果一個應用程式每次只更新單個銀行賬戶餘額,那就可以通過流式處理來實現,但如果應用程式要將資金從一個銀行賬戶轉移到另一個銀行賬戶,就難以實現強一致性。

 

 第 3 步:一般應用程式的分散式流式處理(“ACID 保證”)

 

隨著 Streaming Ledger 成為 data Artisans 平臺的一部分,使用者現在可以構建同時讀取或更新多條記錄和多張表的應用程式,並實現 ACID 事務支援。

Streaming Ledger 在提供這些保證的同時,還能保持流式處理(恰好一次保證)的伸縮能力,而且不會影響應用程式的速度、效能、可伸縮性或可用性。

我們可以將 Lambda 架構的至少一次保證視為最終一致性的一種形式(因為批處理系統最終會趕上來)。Flink 提供的“恰好一次保證”類似於分散式鍵值儲存系統為單鍵操作提供的一致性保證,而 Streaming Ledger 提供的保證類似於關係型資料庫提供的 ACID 保證。

我們相信這是流式處理的下一個演化步驟,它為以正確、可伸縮和靈活的方式實現基於流式架構的應用程式打開了大門。

 

多鍵和多表事務簡介

 

大部分關係型資料庫管理系統會執行 ACID 事務,每個事務通過 ACID 語義在序列化的隔離級別下修改資料表,以此來實現完整的資料一致性。根據 ACID 語義,所有事務都是 Atomic、Consistent、Isolated 和 Durable 的。ACID 語義在金融服務或電子商務等行業中扮演著重要的角色。

讓我們舉一個經典的例子,假設有一個帳戶餘額表,基於這個表將一個帳戶的錢轉到另一個帳戶上。為了確保事務的正確性,轉賬操作必須同時修改兩個帳戶或都不做修改(原子性),而且只有在源帳戶有足夠資金時才能進行轉賬(一致性),並且不存在其他可能導致錯誤結果的操作(隔離、無異常)。任何違反這些條件的操作都會導致資金丟失,最終導致賬戶餘額不正確。

上圖顯示瞭如何在有狀態流式處理器中實現這個示例。其中的帳戶 ID 就是鍵,而且兩個帳戶位於不同的分片中。這兩個分片對對方都沒有訪問許可權或者對對方的狀態持有一致的檢視,這使得實現這一的框架變得相當複雜,因為需要通過下列方式在兩個分片之間傳遞狀態:

  1. 提供一致的狀態檢視

  2. 能夠管理併發修改

  3. 確保修改的原子性

 

超越流式處理框架的恰好一次語義

 

data Artisans Streaming Ledger 基於 Apache Flink 構建,為跨多個表和單表多行的多個數據流提供執行序列化事務的能力。它可以被視為等效於鍵值儲存系統(甚至是跨多個鍵值儲存系統)的多行事務。Streaming Ledger 使用 Flink 的狀態來儲存表,所以就不需要額外的儲存或系統配置。Streaming Ledger 應用程式的構建塊由表、事務事件流、事務函式和可選的結果流組成。

有關更多資訊,請下載白皮書(https://data-artisans.com/download-the-data-artisans-streaming-ledger-whitepaper )。

data Artisans Streaming Ledger 為使用流式處理構建新的應用程式型別打開了一個大門,這類應用程式在以前只能依賴關係型資料庫。現在,資料密集型實時應用程式(如欺詐檢測、機器學習和實時交易定價)可以毫不費力地遷移到流式處理平臺上。在下一節中,我們將通過具體的示例演示使用 data Artisans Streaming Ledger 進行開發的必要步驟。

 

Streaming Ledger 用例演示

 

data Artisans Streaming Ledger 非常適合用於處理涉及多個狀態的用例,它支援對多個狀態進行事務性的修改,這些修改彼此隔離,並遵循序列化一致性原則。

我們假設有一個實時應用程式,它負責在帳戶和分類帳條目之間識別匯款模式。

這個應用程式需要維護兩張 Flink 狀態表:第一張表叫作“Accounts”,第二張叫作“Asset Ledger”。應用程式消費事務事件流,例如帳戶之間、分類帳條目之間或二者之間的轉賬。當有事件進入時,不同的事務型別會被應用在每個事件型別上,然後訪問相關行,檢查前置條件,並決定是處理還是拒絕轉賬操作。對於賬戶之間的轉賬,它會更新表中的各個行。對於賬戶和分類賬之間的轉賬,它會生成結果事件,表示轉賬是被接受還是被拒絕。下圖顯示了架構細節:

Streaming Ledger 公開了一個易於使用的 API,對於有過流式處理使用經驗的使用者和熟悉關係型資料庫的使用者來說,這個 API 可以輕鬆上手。在我們的例子中,我們使用了以下假設:

  • 兩個表:Accounts 和 Asset Ledger

  • 三個事件流:存款、轉賬和餘額查詢

  • 存款時將值寫入 Accounts 和 Asset Ledger 表中

  • 轉賬原子操作在 Accounts 和 Asset Ledger 之間轉移值。

接下來,我們將逐步使用 Streaming Ledger API 來完成這個示例。Ledger 的 API 是開源的,還包含了一個序列(單節點)的實現(https://github.com/dataArtisans/da-streamingledger )。

以下是通過 Flink DataStream 來建立事件源的方法:

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<DepositEvent> deposits = env.addSource(…); 
DataStream<TransferEvent> transfers = env.addSource(…);

And this is how you can define the scope and tables of your programme:

// 定義事務範圍
TransactionalStreams txStreams =
TransactionalStreams.create(“simple example”);

// 定義事務表
TransactionalStreams.State<String, Long> accounts = txStreams.declareState(“accounts”)
        .withKeyType(String.class)
        .withValueType(Long.class);

TransactionalStreams.State<String, Long> asset ledger = 
txStreams.declareState(“AssetLedger”)
       .withKeyType(String.class)
       .withValueType(Long.class);

接下來,你可以為每個流和表指定事務函式,以及選擇將要訪問到的資料行的鍵。

.apply(…) 函式本身包含事務的業務邏輯。

對於被訪問的每個資料行,為它們新增一個額外的呼叫:.on(table,key,name,type):

  • 'table'表示要訪問的表

  • 'key'是一個函式,可以通過這個函式從輸入事件中獲取這一行資料的鍵

  • 'name'是該行資料的邏輯名稱(稍後可能會用到)

  • 'type’限定對該行的訪問屬性,如只讀、只寫或讀寫訪問。這是一種優化,其中 READ_WRITE 是最通用的選項。

 

// 定義存款事務
txStreams.usingStream(deposits, “deposits”)
  .apply(new DepositsFunction())
  .on(accounts, DepositEvent::getAccountId, “account”, READ_WRITE)
  .on(assetledger,DepositEvent::getassetledgerEntryId, “asset”, READ_WRITE);

// 定義轉賬

// 將控制代碼儲存在結果流中,以備後用
OutputTag<TransferResult> result = txStreams.usingStream(transfers, “transfers”)
  .apply(new TransferFunction())
  .on(accounts, TransferEvent::getSourceAccountId, “source-account”, READ_WRITE)
  .on(accounts, TransferEvent::getTargetAccountId, “target-account”, READ_WRITE)
  .on(assetledger, TransferEvent::getSourceAssetledgerEntryId, “source-asset”, READ_WRITE)
  .on(Assetledger, TransferEvent::getTargetAssetledgerEntryId, “target-asset”, READ_WRITE)
  .output();

 

然後實現了包含業務邏輯的事務,決定是否以及如何更新資料行,以及生成哪些結果。

我們傳給這些事務函式一個狀態訪問物件,訪問物件負責讀取或跟新每一行資料。為了將狀態訪問與資料行和鍵關聯起來,對這些函式進行了與前一步相同的註解。

為簡單起見,我們只給出'TransferFunction'的實現。

 

 

public class TransferFunction extends 
TransactionProcessFunction<TransferEvent, TransferResult> { 
@ProcessTransaction 
public void process(
   TransferEvent txn,
   Context<TransferResult> ctx,
   @State(“source-account”) StateAccess<Long> sourceAccount,
   @State(“target-account”) StateAccess<Long> targetAccount,
   @State(“source-asset”) StateAccess<Long> sourceAsset,
   @State(“target-asset”) StateAccess<Long> targetAsset) {

// 訪問當前值的資料行
long sourceBalance = sourceAccount.read();
long sourceAssetValue = sourceAsset.read();
long targetBalance = targetAccount.read();
long targetAssetValue = targetAsset.read();

// 檢查前置條件: 正餘額和最小余額
if (sourceBalance > txn.getMinAccountBalance()
&& sourceBalance > txn.getAccountTransfer()
&& sourceAssetValue > txn.getAssetledgerEntryTransfer()) {

// 計算新的餘額
long newSourceBalance = sourceBalance - 
txn.getAccountTransfer();
long newTargetBalance = targetBalance + 
txn.getAccountTransfer();
long newSourceAssets = sourceAssetValue - 
txn.getAssetledgerEntryTransfer();
long newTargetAssets = targetAssetValue + 
txn.getAssetledgerEntryTransfer();

// 寫入更新過的值
sourceAccount.write(newSourceBalance);
targetAccount.write(newTargetBalance);
sourceAsset.write(newSourceAssets);
targetAsset.write(newTargetAssets);

// 觸發包含新餘額的正結果事件
ctx.emit(new TransferResult(txn, SUCCESS, 
newSourceBalance, newTargetBalance));
}
else {
// 觸發包含未更新餘額的負結果事件
ctx.emit(new TransferResult(txn, REJECT, 
sourceBalance, targetBalance));
   } 
 }
}

 

結    論

data Artisans Streaming Ledger 將之前依賴關係型資料庫的應用程式帶入到了流式處理時代,進一步擴充套件了流式處理技術的應用範圍!藉助 Streaming Ledger,我們正在開啟流式處理的新篇章,我們很高興現在越來越多的任務關鍵型應用程式可以充分利用流式處理的實時、非同步、靈活等優勢。

開源專案地址

https://github.com/dataArtisa