資料庫中介軟體 Sharding-JDBC 原始碼分析 —— 事務(一)之BED
本文主要基於 Sharding-JDBC 1.5.0 正式版
- 1. 概述
- 2. 最大努力送達型
- 3. 柔性事務管理器
- 3.3.1 建立柔性事務
- 3.1 概念
- 3.2 柔性事務配置
- 3.3 柔性事務
- 4. 事務日誌儲存器
- 4.1 #add()
- 4.2 #remove()
- 4.3 #findEligibleTransactionLogs()
- 4.4 #increaseAsyncDeliveryTryTimes()
- 4.5 #processData()
- 5. 最大努力送達型事務監聽器
- 6. 最大努力送達型非同步作業
- 6.1 BestEffortsDeliveryJob
- 6.2 AsyncSoftTransactionJobConfiguration
- 6.3 Elastic-Job 是否必須?
- 7. 適用場景
- 8. 開發指南 & 開發示例
1. 概述
資料庫表分庫後,業務場景下的單庫本地事務可能變成跨庫分散式事務。雖然我們可以通過合適的分庫規則讓操作的資料在同庫下,繼續保證單庫本地事務,這也是非常推崇的,但不是所有場景下都能適用。如果這些場景對事務的一致性有要求,我們就不得不解決分散式事務的“麻煩”。
分散式事務是個很大的話題,我們來看看 Sharding-JDBC 對她的權衡:
Sharding-JDBC由於效能方面的考量,決定不支援強一致性分散式事務。我們已明確規劃線路圖,未來會支援最終一致性的柔性事務。
Sharding-JDBC 提供了兩種 柔性事務
- 最大努力送達型 BED :已經實現
- 事務補償型 TCC :計劃中
本文分享 最大努力送達型 的實現。建議前置閱讀:《Sharding-JDBC 原始碼分析 —— SQL 執行》。
Sharding-JDBC 正在收集使用公司名單:傳送門。 ? 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門 Sharding-JDBC 也會因此,能夠覆蓋更多的業務場景。傳送門 登記吧,騷年!傳送門
2. 最大努力送達型
概念
在分散式資料庫的場景下,相信對於該資料庫的操作最終一定可以成功,所以通過最大努力反覆嘗試送達操作。
從概念看,可能不是很直白的理解是什麼意思,本文會最大努力讓你乾淨理解。
架構圖
執行過程有 四種 情況:
- 【紅線】執行成功
- 【棕線】執行失敗,同步重試成功
- 【粉線】執行失敗,同步重試失敗,非同步重試成功
- 【綠線】執行失敗,同步重試失敗,非同步重試失敗,事務日誌保留
整體成漏斗倒三角,上一個階段失敗,交給下一個階段重試:
整個過程通過如下 元件 完成:
- 柔性事務管理器
- 最大努力送達型柔性事務
- 最大努力送達型事務監聽器
- 事務日誌儲存器
- 最大努力送達型非同步作業
下面,我們逐節分享每個元件。
3. 柔性事務管理器
3.1 概念
柔性事務管理器,SoftTransactionManager 實現,負責對柔性事務配置( SoftTransactionConfiguration ) 、柔性事務( AbstractSoftTransaction )的管理。
3.2 柔性事務配置
呼叫 #init()
初始化柔性管理器:
// SoftTransactionManager.java
/**
* 柔性事務配置物件
*/
@Getter
private final SoftTransactionConfiguration transactionConfig;
// SoftTransactionManager.java
/**
* 初始化事務管理器.
*/
public void init() throws SQLException {
// 初始化 最大努力送達型事務監聽器
EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
// 初始化 事務日誌資料庫儲存表
if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource());
createTable();
}
// 初始化 內嵌的最大努力送達型非同步作業
if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {
new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();
}
}
- 將最大努力送達型事務監聽器( BestEffortsDeliveryListener )註冊到事務匯流排 ( EventBus )。在『最大努力送達型事務監聽器』小節會詳細分享
- 當使用資料庫儲存事務日誌( TransactionLog ) 時,若事務日誌表(
transaction_log
)不存在則進行建立。在『事務日誌儲存器』小節會詳細分享 - 當配置使用內嵌的最大努力送達型非同步作業( NestedBestEffortsDeliveryJob ) 時,進行初始化。在『最大努力送達型非同步作業』小節會詳細分享
SoftTransactionConfiguration
SoftTransactionConfiguration,柔性事務配置物件。
public class SoftTransactionConfiguration {
/**
* 事務管理器管理的資料來源.
*/
@Getter(AccessLevel.NONE)
private final DataSource targetDataSource;
/**
* 同步的事務送達的最大嘗試次數.
*/
private int syncMaxDeliveryTryTimes = 3;
/**
* 事務日誌儲存型別.
*/
private TransactionLogDataSourceType storageType = RDB;
/**
* 儲存事務日誌的資料來源.
*/
private DataSource transactionLogDataSource;
/**
* 內嵌的最大努力送達型非同步作業配置物件.
*/
private Optional<NestedBestEffortsDeliveryJobConfiguration> bestEffortsDeliveryJobConfiguration = Optional.absent();
}
3.3 柔性事務
在 Sharding-JDBC 裡,目前柔性事務分成兩種:
- BEDSoftTransaction :最大努力送達型柔性事務
- TCCSoftTransaction :TCC型柔性事務
繼承 AbstractSoftTransaction
public abstract class AbstractSoftTransaction {
/**
* 分片連線原自動提交狀態
*/
private boolean previousAutoCommit;
/**
* 分片連線
*/
@Getter
private ShardingConnection connection;
/**
* 事務型別
*/
@Getter
private SoftTransactionType transactionType;
/**
* 事務編號
*/
@Getter
private String transactionId;
}
AbstractSoftTransaction 實現了開啟柔性事務、關閉柔性事務兩個方法提供給子類呼叫:
-
#beginInternal()
- 對異常處理的程式碼:ExecutorExceptionHandler#setExceptionThrown()
- 對於其他 SQL,不會因為 SQL 錯誤不執行,會繼續執行
- 對於上層業務,不會因為 SQL 錯誤終止邏輯,會繼續執行。這裡有一點要注意下,上層業務不能對該 SQL 執行結果有強依賴,因為 SQL 錯誤需要重試達到資料最終一致性
- 對於最大努力型事務( TCC暫未實現 ),會對執行錯誤的 SQL 進行重試
- 呼叫
ExecutorExceptionHandler.setExceptionThrown(false)
設定執行 SQL 錯誤時,也不丟擲異常。 - 呼叫
connection.setAutoCommit(true);
,設定執行自動提交。使用最大努力型事務時,上層業務執行 SQL 會馬上提交,即使呼叫Connection#rollback()
也是無法回滾的,這點一定要注意。
/**
* 開啟柔性
*
* @param conn 分片連線
* @param type 事務型別
* @throws SQLException
*/
protected final void beginInternal(final Connection conn, final SoftTransactionType type) throws SQLException {
// TODO 判斷如果在傳統事務中,則拋異常
Preconditions.checkArgument(conn instanceof ShardingConnection, "Only ShardingConnection can support eventual consistency transaction.");
// 設定執行錯誤,不丟擲異常
ExecutorExceptionHandler.setExceptionThrown(false);
connection = (ShardingConnection) conn;
transactionType = type;
// 設定自動提交狀態
previousAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
// 生成事務編號
// TODO 替換UUID為更有效率的id生成器
transactionId = UUID.randomUUID().toString();
}
-
#end()
- 事務結束後,一定要記得呼叫
#end()
清理執行緒變數。否則,下次請求使用到該執行緒,會繼續在這個柔性事務內。
- 事務結束後,一定要記得呼叫
/**
* 結束柔性事務.
*/
public final void end() throws SQLException {
if (connection != null) {
ExecutorExceptionHandler.setExceptionThrown(true);
connection.setAutoCommit(previousAutoCommit);
SoftTransactionManager.closeCurrentTransactionManager();
}
}
// SoftTransactionManager.java
/**
* 關閉當前的柔性事務管理器.
*/
static void closeCurrentTransactionManager() {
ExecutorDataMap.getDataMap().put(TRANSACTION, null);
ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, null);
}
BEDSoftTransaction
BEDSoftTransaction,最大努力送達型柔性事務。
public class BEDSoftTransaction extends AbstractSoftTransaction {
/**
* 開啟柔性事務.
*
* @param connection 資料庫連線物件
*/
public void begin(final Connection connection) throws SQLException {
beginInternal(connection, SoftTransactionType.BestEffortsDelivery);
}
}
TCCSoftTransaction
TCCSoftTransaction,TCC 型柔性事務,暫未實現。實現後,會更新到 《Sharding-JDBC 原始碼分析 —— 分散式事務(二)之事務補償型》。
3.3.1 建立柔性事務
通過呼叫 SoftTransactionManager#getTransaction()
建立柔性事務物件:
/**
* {@link ExecutorDataMap#dataMap} 柔性事務物件 key
*/
private static final String TRANSACTION = "transaction";
/**
* {@link ExecutorDataMap#dataMap} 柔性事務配置 key
*/
private static final String TRANSACTION_CONFIG = "transactionConfig";
// SoftTransactionManager.java
/**
* 建立柔性事務.
*
* @param type 柔性事務型別
* @return 柔性事務
*/
public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {
AbstractSoftTransaction result;
switch (type) {
case BestEffortsDelivery:
result = new BEDSoftTransaction();
break;
case TryConfirmCancel:
result = new TCCSoftTransaction();
break;
default:
throw new UnsupportedOperationException(type.toString());
}
// TODO 目前使用不支援巢狀事務,以後這裡需要可配置
if (getCurrentTransaction().isPresent()) {
throw new UnsupportedOperationException("Cannot support nested transaction.");
}
ExecutorDataMap.getDataMap().put(TRANSACTION, result);
ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);
return result;
}
- 後續可以從 ExecutorDataMap 中獲取當前執行緒的柔性事務和柔性事務配置:
// SoftTransactionManager.java
/**
* 獲取當前執行緒的柔性事務配置.
*
* @return 當前執行緒的柔性事務配置
*/
public static Optional<SoftTransactionConfiguration> getCurrentTransactionConfiguration() {
Object transactionConfig = ExecutorDataMap.getDataMap().get(TRANSACTION_CONFIG);
return (null == transactionConfig)
? Optional.<SoftTransactionConfiguration>absent()
: Optional.of((SoftTransactionConfiguration) transactionConfig);
}
/**
* 獲取當前的柔性事務.
*
* @return 當前的柔性事務
*/
public static Optional<AbstractSoftTransaction> getCurrentTransaction() {
Object transaction = ExecutorDataMap.getDataMap().get(TRANSACTION);
return (null == transaction)
? Optional.<AbstractSoftTransaction>absent()
: Optional.of((AbstractSoftTransaction) transaction);
}
4. 事務日誌儲存器
柔性事務執行過程中,會通過事務日誌( TransactionLog ) 記錄每條 SQL 執行狀態:
- SQL 執行前,記錄一條事務日誌
- SQL 執行成功,移除對應的事務日誌
通過實現事務日誌儲存器介面( TransactionLogStorage ),提供儲存功能。目前有兩種實現:
- MemoryTransactionLogStorage :基於記憶體的事務日誌儲存器。主要用於開發測試,生產環境下不要使用。
- RdbTransactionLogStorage :基於資料庫的事務日誌儲存器。
本節只分析 RdbTransactionLogStorage。對 MemoryTransactionLogStorage 感興趣的同學可以點選連結傳送到達。
TransactionLogStorage 有五個介面方法,下文每個小標題都是一個方法。
4.1 #add()
// TransactionLogStorage.java
/**
* 儲存事務日誌.
*
* @param transactionLog 事務日誌
*/
void add(TransactionLog transactionLog);
// RdbTransactionLogStorage.java
@Override
public void add(final TransactionLog transactionLog) {
String sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);";
try (
// ... 省略你熟悉的程式碼
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
}
-
注意:如果插入事務日誌失敗,SQL 會繼續執行,如果此時 SQL 執行失敗,則該 SQL 會不見了。建議:
#add()
和下文的#remove()
異常時,都列印下異常日誌都檔案系統
TransactionLog (transaction_log) 資料庫表結構如下:
欄位 |
名字 |
資料庫型別 |
備註 |
---|---|---|---|
id |
事件編號 |
VARCHAR(40) |
EventBus 事件編號,非事務編號 |
transaction_type |
柔性事務型別 |
VARCHAR(30) |
|
data_source |
真實資料來源名 |
VARCHAR(255) |
|
sql |
執行 SQL |
TEXT |
已經改寫過的 SQL |
parameters |
佔位符引數 |
TEXT |
JSON 字串儲存 |
creation_time |
記錄時間 |
LONG |
|
asyncdeliverytry_times |
已非同步重試次數 |
INT |
4.2 #remove()
// TransactionLogStorage.java
/**
* 根據主鍵刪除事務日誌.
*
* @param id 事務日誌主鍵
*/
void remove(String id);
// RdbTransactionLogStorage.java
@Override
public void remove(final String id) {
String sql = "DELETE FROM `transaction_log` WHERE `id`=?;";
try (
// ... 省略你熟悉的程式碼
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
}
4.3 #findEligibleTransactionLogs()
// TransactionLogStorage.java
/**
* 讀取需要處理的事務日誌.
*
* <p>需要處理的事務日誌為: </p>
* <p>1. 非同步處理次數小於最大處理次數.</p>
* <p>2. 非同步處理的事務日誌早於非同步處理的間隔時間.</p>
*
* @param size 獲取日誌的數量
* @param maxDeliveryTryTimes 事務送達的最大嘗試次數
* @param maxDeliveryTryDelayMillis 執行送達事務的延遲毫秒數.
*/
List<TransactionLog> findEligibleTransactionLogs(int size, int maxDeliveryTryTimes, long maxDeliveryTryDelayMillis);
// RdbTransactionLogStorage.java
@Override
public List<TransactionLog> findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) {
List<TransactionLog> result = new ArrayList<>(size);
String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` "
+ "FROM `transaction_log` WHERE `async_delivery_try_times`<? AND `transaction_type`=? AND `creation_time`<? LIMIT ?;";
try (Connection conn = dataSource.getConnection()) {
// ... 省略你熟悉的程式碼
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
return result;
}
4.4 #increaseAsyncDeliveryTryTimes()
// TransactionLogStorage.java
/**
* 增加事務日誌非同步重試次數.
*
* @param id 事務主鍵
*/
void increaseAsyncDeliveryTryTimes(String id);
// RdbTransactionLogStorage.java
@Override
public void increaseAsyncDeliveryTryTimes(final String id) {
String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;";
try (
// ... 省略你熟悉的程式碼
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
}
4.5 #processData()
// TransactionLogStorage.java
/**
* 處理事務資料.
*
* @param connection 業務資料庫連線
* @param transactionLog 事務日誌
* @param maxDeliveryTryTimes 事務送達的最大嘗試次數
*/
boolean processData(Connection connection, TransactionLog transactionLog, int maxDeliveryTryTimes);
// RdbTransactionLogStorage.java
@Override
public boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) {
// 重試執行失敗 SQL
try (
Connection conn = connection;
PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) {
for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) {
preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex));
}
preparedStatement.executeUpdate();
} catch (final SQLException ex) {
// 重試失敗,更新事務日誌,增加已非同步重試次數
increaseAsyncDeliveryTryTimes(transactionLog.getId());
throw new TransactionCompensationException(ex);
}
// 移除重試執行成功 SQL 對應的事務日誌
remove(transactionLog.getId());
return true;
}
- 不同於前四個增刪改查介面方法的實現,
#processData()
是帶有一些邏輯的。根據事務日誌( TransactionLog )重試執行失敗的 SQL,若成功,移除事務日誌;若失敗,更新事務日誌,增加已非同步重試次數 - 該方法會被最大努力送達型非同步作業呼叫到
5. 最大努力送達型事務監聽器
最大努力送達型事務監聽器,BestEffortsDeliveryListener,負責記錄事務日誌、同步重試執行失敗 SQL。
// BestEffortsDeliveryListener.java
@Subscribe
@AllowConcurrentEvents
public void listen(final DMLExecutionEvent event) {
if (!isProcessContinuously()) {
return;
}
SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
switch (event.getEventExecutionType()) {
case BEFORE_EXECUTE: // 執行前,插入事務日誌
//TODO 對於批量執行的SQL需要解析成兩層列表
transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
return;
case EXECUTE_SUCCESS: // 執行成功,移除事務日誌
transactionLogStorage.remove(event.getId());
return;
case EXECUTE_FAILURE: // 執行失敗,同步重試
boolean deliverySuccess = false;
for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) { // 同步【多次】重試
if (deliverySuccess) {
return;
}
boolean isNewConnection = false;
Connection conn = null;
PreparedStatement preparedStatement = null;
try {
// 獲得資料庫連線
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
if (!isValidConnection(conn)) { // 因為可能執行失敗是資料庫連線異常,所以判斷一次,如果無效,重新獲取資料庫連線
bedSoftTransaction.getConnection().release(conn);
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
isNewConnection = true;
}
preparedStatement = conn.prepareStatement(event.getSql());
// 同步重試
//TODO 對於批量事件需要解析成兩層列表
for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
}
preparedStatement.executeUpdate();
deliverySuccess = true;
// 同步重試成功,移除事務日誌
transactionLogStorage.remove(event.getId());
} catch (final SQLException ex) {
log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
} finally {
close(isNewConnection, conn, preparedStatement);
}
}
return;
default:
throw new UnsupportedOperationException(event.getEventExecutionType().toString());
}
}
- BestEffortsDeliveryListener 通過 EventBus 實現監聽 SQL 的執行。Sharding-JDBC 如何實現 EventBus 的,請看《Sharding-JDBC 原始碼分析 —— SQL 執行》
-
呼叫
#isProcessContinuously()
方法判斷是否處於最大努力送達型事務中,當且僅當處於該狀態才進行監聽事件處理 - SQL 執行前,插入事務日誌
- SQL 執行成功,移除事務日誌
-
SQL 執行失敗,根據柔性事務配置( SoftTransactionConfiguration )同步的事務送達的最大嘗試次數(
syncMaxDeliveryTryTimes
)進行多次重試直到成功。總體邏輯和RdbTransactionLogStorage#processData()
方法邏輯類似,區別在於獲取分片資料庫連線的特殊處理:此處呼叫失敗,資料庫連線可能是異常無效的,因此呼叫了#isValidConnection()
判斷連線的有效性。若無效,則重新獲取分片資料庫連線。另外,若是重新獲取分片資料庫連線,需要進行關閉釋放 (Connection#close()
):
// BestEffortsDeliveryListener.java
/**
* 通過 SELECT 1 校驗資料庫連線是否有效
*
* @param conn 資料庫連線
* @return 是否有效
*/
private boolean isValidConnection(final Connection conn) {
try (PreparedStatement preparedStatement = conn.prepareStatement("SELECT 1")) {
try (ResultSet rs = preparedStatement.executeQuery()) {
return rs.next() && 1 == rs.getInt("1");
}
} catch (final SQLException ex) {
return false;
}
}
/**
* 關閉釋放預編譯SQL物件和資料庫連線
*
* @param isNewConnection 是否新建立的資料庫連線,是的情況下才釋放
* @param conn 資料庫連線
* @param preparedStatement 預編譯SQL
*/
private void close(final boolean isNewConnection, final Connection conn, final PreparedStatement preparedStatement) {
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (final SQLException ex) {
log.error("PreparedStatement closed error:", ex);
}
}
if (isNewConnection && null != conn) {
try {
conn.close();
} catch (final SQLException ex) {
log.error("Connection closed error:", ex);
}
}
}
6. 最大努力送達型非同步作業
當最大努力送達型事務監聽器( BestEffortsDeliveryListener )多次同步重試失敗後,交給最大努力送達型非同步作業進行多次非同步重試,並且多次執行有固定間隔。
Sharding-JDBC 提供了兩個最大努力送達型非同步作業實現:
- NestedBestEffortsDeliveryJob :內嵌的最大努力送達型非同步作業
- BestEffortsDeliveryJob :最大努力送達型非同步作業
兩者實現程式碼邏輯基本一致。前者相比後者,用於開發測試,去除對 Zookeeper 依賴,無法實現高可用,因此生產環境下不適合使用。
6.1 BestEffortsDeliveryJob
BestEffortsDeliveryJob 所在 Maven 專案為 sharding-jdbc-transaction-async-job
,基於噹噹開源的 Elastic-Job 實現。如下是官方對該 Maven 專案的簡要說明:
由於柔性事務採用非同步嘗試,需要部署獨立的作業和Zookeeper。sharding-jdbc-transaction採用elastic-job實現的sharding-jdbc-transaction-async-job,通過簡單配置即可啟動高可用作業非同步送達柔性事務,啟動指令碼為start.sh。
BestEffortsDeliveryJob
public class BestEffortsDeliveryJob extends AbstractIndividualThroughputDataFlowElasticJob<TransactionLog> {
/**
* 最大努力送達型非同步作業配置物件
*/
@Setter
private BestEffortsDeliveryConfiguration bedConfig;
/**
* 事務日誌儲存器物件
*/
@Setter
private TransactionLogStorage transactionLogStorage;
@Override
public List<TransactionLog> fetchData(final JobExecutionMultipleShardingContext context) {
return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(),
bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis());
}
@Override
public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) {
try (
Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()) {
transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes());
} catch (final SQLException | TransactionCompensationException ex) {
log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes() + 1,
bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));
return false;
}
return true;
}
@Override
public boolean isStreamingProcess() {
return false;
}
}
- 呼叫
#fetchData()
方法獲取需要處理的事務日誌 (TransactionLog),內部呼叫了TransactionLogStorage#findEligibleTransactionLogs()
方法 - 呼叫
#processData()
方法處理事務日誌,重試執行失敗的 SQL,內部呼叫了TransactionLogStorage#processData()
-
#fetchData()
和#processData()
呼叫是 Elastic-Job 控制的。每一輪定時排程,每條事務日誌只執行一次。當超過最大非同步呼叫次數後,該條事務日誌不再處理,所以生產使用時,最好增加下相應監控超過最大非同步重試次數的事務日誌。
6.2 AsyncSoftTransactionJobConfiguration
AsyncSoftTransactionJobConfiguration,非同步柔性事務作業配置物件。
public class AsyncSoftTransactionJobConfiguration {
/**
* 作業名稱.
*/
private String name = "bestEffortsDeliveryJob";
/**
* 觸發作業的cron表示式.
*/
private String cron = "0/5 * * * * ?";
/**
* 每次作業獲取的事務日誌最大數量.
*/
private int transactionLogFetchDataCount = 100;
/**
* 事務送達的最大嘗試次數.
*/
private int maxDeliveryTryTimes = 3;
/**
* 執行事務的延遲毫秒數.
*
* <p>早於此間隔時間的入庫事務才會被作業執行.</p>
*/
private long maxDeliveryTryDelayMillis = 60 * 1000L;
}
6.3 Elastic-Job 是否必須?
Sharding-JDBC 提供的最大努力送達型非同步作業實現( BestEffortsDeliveryJob ),通過與 Elastic-Job 整合,可以很便捷並且有質量保證的高可用、高效能使用。一部分團隊,可能已經引入或自研了類似 Elastic-Job 的分散式作業中介軟體解決方案,每多一箇中間件,就是多一個學習與運維成本。那麼是否可以使用自己的分散式作業解決方案?答案是,可以的。參考 BestEffortsDeliveryJob 的實現,通過呼叫 TransactionLogStorage 來實現:
// 虛擬碼(不考慮效能、異常)
List<TransactionLog> transactionLogs = transactionLogStorage.findEligibleTransactionLogs(....);
for (TransactionLog transactionLog : transactionLogs) {
transactionLogStorage.processData(conn, log, maxDeliveryTryTimes);
}
當然,個人還是很推薦 Elastic-Job。
? 筆者要開始寫《Elastic-Job 原始碼分析》。
另外,如果有支援事務訊息的分散式佇列系統,可以通過 TransactionLogStorage 實現儲存事務訊息儲存成訊息。為什麼要支援事務訊息?如果 SQL 執行是成功的,需要回滾(刪除)事務訊息。
7. 適用場景
見《官方文件 - 事務支援》。
8. 開發指南 & 開發示例
見《官方文件 - 事務支援》。