1. 程式人生 > 其它 >MySQL事務提交流程詳解

MySQL事務提交流程詳解

MySQL事務的提交採用兩階段提交協議, 前些日子和同事聊的時候發現對提交的細節還是有些模糊,這裡對照MySQL原始碼詳細記錄一下,版本是MySQL5.7.36。

1. 事務的提交流程。

 

2. 流程程式碼。

主程式碼如下,刪除了部分輔助程式碼,從 trans_commit(THD *thd) 函式開始。

bool trans_commit(THD *thd)
{ // 提交事務。 res = ha_commit_trans(thd, TRUE); if (res == FALSE) if (thd->rpl_thd_ctx.session_gtids_ctx().notify_after_transaction_commit(thd)) sql_print_warning("Failed to collect GTID to send in the response packet!"); thd->server_status &= ~SERVER_STATUS_IN_TRANS; thd->variables.option_bits &= ~OPTION_BEGIN; thd->get_transaction()->reset_unsafe_rollback_flags(Transaction_ctx::SESSION); thd->lex->start_transaction_opt = 0; /* The transaction should be marked as complete in P_S. */ assert(thd->m_transaction_psi == NULL); thd->tx_priority = 0; trans_track_end_trx(thd); DBUG_RETURN(MY_TEST(res)); }

  

/*    
    提交事務。
    server層最後呼叫函式 ha_commit_trans(), 該函式負責處理 binlog 層和儲存引擎層的提交。
*/
int ha_commit_trans(THD *thd, bool all, bool ignore_global_read_lock)
{
    // 讀寫事務 && 不能忽略全域性讀鎖
    if (rw_trans && !ignore_global_read_lock)
    {
      /*
        獲取一個 MDL_KEY::COMMIT 元資料鎖, 該元資料鎖將確保 commit 操作會被活躍的 FTWRL 鎖阻止。
        FTWRL鎖會阻塞 COMMIT 操作。
      */
      MDL_REQUEST_INIT(&mdl_request,
                       MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
                       MDL_EXPLICIT);
      DBUG_PRINT("debug", ("Acquire MDL commit lock"));
      // 申請 MDL_key::COMMIT 鎖, 申請失敗
      if (thd->mdl_context.acquire_lock
(&mdl_request, thd->variables.lock_wait_timeout)) { ha_rollback_trans(thd, all); DBUG_RETURN(1); } release_mdl = true; } // 判斷是否開啟 xa 事務; // 所有的 entries 都支援 2pc && 在事務 scope 中設定做讀寫更改的引擎數量 > 1 if (!trn_ctx->no_2pc(trx_scope) && (trn_ctx->rw_ha_count(trx_scope) > 1)) // prepare; 在事務協調器中 prepare commit tx, 在引擎層生成一個 XA 事務。 // tc_log: mysqld啟動時生成的 MySQL_BIN_LOG 物件[XA控制物件]。 error = tc_log->prepare
(thd, all); } /* XA 事務的狀態變更為 prepared, 中間態。最終會變成常規的 NOTR 狀態。 */ if (!error && all && xid_state->has_state(XID_STATE::XA_IDLE)) { assert(thd->lex->sql_command == SQLCOM_XA_COMMIT && static_cast<Sql_cmd_xa_commit *>(thd->lex->m_sql_cmd)->get_xa_opt() == XA_ONE_PHASE); // 設定 XA 事務狀態為 XA_PREPARED 狀態。 xid_state->set_state(XID_STATE::XA_PREPARED); } /** * XA 事務提交 */ if (error || (error = tc_log->commit(thd, all))) { ha_rollback_trans(thd, all); error = 1; goto end; } end: // 釋放 mdl 鎖。 if (release_mdl && mdl_request.ticket) { thd->mdl_context.release_lock(mdl_request.ticket); }
/* * 釋放資源並執行其他清理。空事務也需要。 */ if (is_real_trans) { trn_ctx->cleanup(); thd->tx_priority = 0; } }

  

int MYSQL_BIN_LOG::prepare(THD *thd, bool all)
{
  /*
    設定 HA_IGNORE_DURABILITY 在 prepare 階段不將事務的 prepare record 刷到 innodb redo log。
    這樣在 binlog 組提交的 flush 階段 flushing binlog 之前 flush prepare record 到 innodb redo log。
    在 innodb prepare 時, 不刷 redo log.
  */
  thd->durability_property = HA_IGNORE_DURABILITY;
  //  在引擎中 prepare commit trx
  int error = ha_prepare_low(thd, all);
  DBUG_RETURN(error);
}

/**
 * prepare commit trx
 * 在引擎層 prepare commit trx
 * 包括 binlog引擎 和 innodb引擎
*/
int ha_prepare_low(THD *thd, bool all)
{
  // 遍歷引擎
  if (ha_info)
  {
    for (; ha_info && !error; ha_info = ha_info->next())
    {
      int err = 0;
      // 引擎
      handlerton *ht = ha_info->ht();
      /*
        如果這個特定事務是隻讀的, 不要呼叫兩階段提交。
      */
      if (!ha_info->is_trx_read_write())
        continue;
      /**
       * 呼叫引擎的 prepare 在儲存層生成 XA 事務。
       * 先 binlog prepare, 再 innodb prepare; 
       * binlog prepare: 將上一次 commit 佇列中最大的 seq num 寫入本次事務的 last_commit 中
       * innodb prepare: 在 innodb 中更改 undo 日誌段的狀態為 trx_undo_prepared, 並將 xid 寫入 undo log header。
       * */
      if ((err = ht->prepare(ht, thd, all)))
      {
        my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
        error = 1;
      }
      // ha_prepare_count++
      thd->status_var.ha_prepare_count++;
    }
  }
}

  

/*
 * binlog prepare;  
*/
static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{
  if (!all)
  {
    // 將上一次 commit 佇列中最大的 seq number 寫入本次事務的 last_commit 中。
    thd->get_transaction()->store_commit_parent(mysql_bin_log.m_dependency_tracker.get_max_committed_timestamp());
  }
  DBUG_RETURN(all && is_loggable_xa_prepare(thd) ? mysql_bin_log.commit(thd, true) : 0);
}

/*******************************************************************/ /**
Innodb prepare 一個 X/Open XA 分散式事務。
static int
innobase_xa_prepare(
		/*================*/
		handlerton *hton, /*!< in: InnoDB handlerton ; innodb引擎 */
		THD *thd,					/*!< in: handle to the MySQL thread of
					the user whose XA transaction should
					be prepared ; mysql執行緒 */
		bool prepare_trx) /*!< in: true - prepare transaction
					false - the current SQL statement
					ended ; true: prepare 事務 
							false: 當前 SQL 語句結束, 語句級別的提交 */
{
	// trx
	trx_t *trx = check_trx_exists(thd);
	// 獲取thd的 xid, 同時設定到 trx -> xid 中
	thd_get_xid(thd, (MYSQL_XID *)trx->xid);

	/* 釋放可能的 FIFO ticket 和 search latch。
	因為我們要保留 trx_sys -> mutex, 我們必須首先釋放 search system latch 來遵守鎖存順序。
	*/
	trx_search_latch_release_if_reserved(trx);
	// prepare trx
	if (prepare_trx || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
	{
		/* preapre 整個事務, 或者這是一個SQL語句結束, autocommit 是開啟狀態 */
		// 事務已經在 mysql 2pc 協調器中註冊。
		ut_ad(trx_is_registered_for_2pc(trx));
		// trx prepare
		dberr_t err = trx_prepare_for_mysql(trx);
	}
	else
	{
		/* 語句的提交動作, 而非真正的事務提交。 */
		// 需要釋放語句 hold 的 auto_increment 鎖
		lock_unlock_table_autoinc(trx);

		// 記錄本語句的 undo 資訊, 以便語句級的回滾
		// 標記最新SQL語句結束。
		trx_mark_sql_stat_end(trx);
	}
	return (0);
}

/**
 * trx prepare
*/
dberr_t
trx_prepare_for_mysql(trx_t *trx)
{
	trx->op_info = "preparing";
	// prepare trx.
	trx_prepare(trx);
}

/****************************************************************/ /**
prepare trx.*/
static void
trx_prepare(
		/*========*/
		trx_t *trx) /*!< in/out: transaction */
{
	// 回滾段 != NULL && redo 段被修改
	if (trx->rsegs.m_redo.rseg != NULL && trx_is_redo_rseg_updated(trx))
	{
		// 為指定的回滾段 preapre 一個事務。lsn 為當前已 commit 的 lsn
		lsn = trx_prepare_low(trx, &trx->rsegs.m_redo, false);
	}

	if (trx->rsegs.m_noredo.rseg != NULL && trx_is_noredo_rseg_updated(trx))
	{
		// 為指定的回滾段 preapre 一個事務。
		trx_prepare_low(trx, &trx->rsegs.m_noredo, true);
	}

	/*--------------------------------------*/
	// 事務狀態為 TRX_STATE_ACTIVE 狀態, 修改事務狀態
	trx->state = TRX_STATE_PREPARED;
	// 事務系統中處於 xa prepared 狀態的事務的數量
	trx_sys->n_prepared_trx++;
	/*--------------------------------------*/
	/* Release read locks after PREPARE for READ COMMITTED
	and lower isolation. 
	對 rc 隔離級別, 在 prepare 之後釋放 read locks, 降低隔離度
	*/
	if (trx->isolation_level <= TRX_ISO_READ_COMMITTED)
	{
		/* Stop inheriting GAP locks. 
		停止繼承 GAP lock。
		*/
		trx->skip_lock_inheritance = true;

		/* Release only GAP locks for now. 
		釋放 GAP lock。
		*/
		lock_trx_release_read_locks(trx, true);
	}
	switch (thd_requested_durability(trx->mysql_thd))
	{
	case HA_IGNORE_DURABILITY:
		/* 
		在 binlog group commit 的 prepare 階段, 我們設定 HA_IGNORE_DURABILITY , 這樣在這個階段不會 flush redo log。
		這樣我們就可以在 binlog group commit 的 flush 階段在將 binary log寫入二進位制日誌之前, 在一個組中 flush redo log。
		*/
		break;
	case ..
	}
}


/****************************************************************/ /**
為指定的回滾段 preapre 一個事務。 */
static lsn_t
trx_prepare_low(
		/*============*/
		trx_t *trx,								/*!< in/out: transaction */
		trx_undo_ptr_t *undo_ptr, /*!< in/out: pointer to rollback
					segment scheduled for prepare. 指向回滾段的指標 */
		bool noredo_logging)			/*!< in: turn-off redo logging. 不需要redo log */
{
	lsn_t lsn;
	// insert 或者 undo 回滾段不為 NULL
	if (undo_ptr->insert_undo != NULL || undo_ptr->update_undo != NULL)
	{
		// start a sync mtr
		mtr_start_sync(&mtr);
		// 設定 mtr mode
		if (noredo_logging)
		{
			mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
		}

		/* 
		將  undo 日誌段狀態從 trx_undo_active 修改為 trx_undo_prepared: 
		更改 undo 回滾段將其設定為 prepare 狀態。
		*/
		mutex_enter(&rseg->mutex);
		// insert undo log 不為 NULL
		if (undo_ptr->insert_undo != NULL)
		{
			/* 
			這裡不需要獲取 trx->undo_mutex, 因為只允許一個 OS 執行緒為該事務做事務準備。
			*/
			// 將 undo 日誌段狀態從 trx_undo_active 修改為 trx_undo_prepared 狀態
			trx_undo_set_state_at_prepare(
					trx, undo_ptr->insert_undo, false, &mtr);
		}
		// 將 undo 日誌段狀態從 trx_undo_active 修改為 trx_undo_prepared 狀態
		if (undo_ptr->update_undo != NULL)
		{
			trx_undo_set_state_at_prepare(
					trx, undo_ptr->update_undo, false, &mtr);
		}

		mutex_exit(&rseg->mutex);
		lsn = mtr.commit_lsn();
	}
	else
	{
		lsn = 0;
	}
	return (lsn);
}

/* 修改 undo 日誌段的狀態*/
page_t *
trx_undo_set_state_at_prepare(
		trx_t *trx,
		trx_undo_t *undo,
		bool rollback,
		mtr_t *mtr)
{
	// 獲取 undo page 頁, 並在其上加 x-latch
	undo_page = trx_undo_page_get(
			page_id_t(undo->space, undo->hdr_page_no),
			undo->page_size, mtr);
	// undo 段 header
	seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
	// 如果是 XA rollback
	if (rollback)
	{
		ut_ad(undo->state == TRX_UNDO_PREPARED);
		// 將 undo 段的狀態從 TRX_UNDO_PREPARED 修改為 TRX_UNDO_ACTIVE 狀態
		mlog_write_ulint(seg_hdr + TRX_UNDO_STATE, TRX_UNDO_ACTIVE,
										 MLOG_2BYTES, mtr);
		return (undo_page);
	}
	/*------------------------------*/
	// 是 XA prepare, 則將 undo 段的狀態從 TRX_UNDO_ACTIVE 修改為 TRX_UNDO_PREPARED, 並將 xid 寫入 undo。
	ut_ad(undo->state == TRX_UNDO_ACTIVE);
	undo->state = TRX_UNDO_PREPARED;
	undo->xid = *trx->xid;
	/*------------------------------*/
	// 在 undo 段中更新當前 undo 段的狀態
	mlog_write_ulint(seg_hdr + TRX_UNDO_STATE, undo->state,
									 MLOG_2BYTES, mtr);
	// 在 undo 段 last undo log header 中寫入 xid
	offset = mach_read_from_2(seg_hdr + TRX_UNDO_LAST_LOG);
	undo_header = undo_page + offset;
	mlog_write_ulint(undo_header + TRX_UNDO_XID_EXISTS,
									 TRUE, MLOG_1BYTE, mtr);
	trx_undo_write_xid(undo_header, &undo->xid, mtr);
	return (undo_page);
}

  

/*
  在事務協調器中提交事務。
  該函式將在二進位制日誌和儲存引擎中提交事務。
*/
TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all)
{
     ..
    // 提交。
    // ordered_commit: 事務在 binlog 階段提交的核心函式。
    if (ordered_commit(thd, all, skip_commit))
      DBUG_RETURN(RESULT_INCONSISTENT);

    /*
      檢查完所有錯誤情況後, 將標記 m_is_binlogged 標記為 true.
    */
    if (is_loggable_xa_prepare(thd))
      thd->get_transaction()->xid_state()->set_binlogged();
     ..
  DBUG_RETURN(RESULT_SUCCESS);
}

/**
  Flush and commit the transaction.

  This will execute an ordered flush and commit of all outstanding
  transactions and is the main function for the binary log group
  commit logic. The function performs the ordered commit in two
  phases.

  The first phase flushes the caches to the binary log and under
  LOCK_log and marks all threads that were flushed as not pending.

  The second phase executes under LOCK_commit and commits all
  transactions in order.

  The procedure is:

  1. Queue ourselves for flushing.
  2. Grab the log lock, which might result is blocking if the mutex is
     already held by another thread.
  3. If we were not committed while waiting for the lock
     1. Fetch the queue
     2. For each thread in the queue:
        a. Attach to it
        b. Flush the caches, saving any error code
     3. Flush and sync (depending on the value of sync_binlog).
     4. Signal that the binary log was updated
  4. Release the log lock
  5. Grab the commit lock
     1. For each thread in the queue:
        a. If there were no error when flushing and the transaction shall be committed:
           - Commit the transaction, saving the result of executing the commit.
  6. Release the commit lock
  7. Call purge, if any of the committed thread requested a purge.
  8. Return with the saved error code
*/
int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
{
  /*
    Stage #1: flushing transactions to binary log
    階段1: 將事務 flush 到二進位制日誌
    While flushing, we allow new threads to enter and will process
    them in due time. Once the queue was empty, we cannot reap
    anything more since it is possible that a thread entered and
    appointed itself leader for the flush phase.
    在 flush 時, 允許新的執行緒進入, 並在適當的時間處理他們。
    一旦佇列變空, 我們就不能再收穫任何東西了, 因為可能有一個執行緒進入了佇列並
    指定自己為flush階段的 leader。
  */

#ifdef HAVE_REPLICATION
  /**
   * 先形成 flush 佇列, 非 leader 執行緒將被阻塞, 直到 commit 階段被 leader 執行緒喚醒。
   * 然後leader執行緒獲取 Lock log鎖
  */
  if (has_commit_order_manager(thd))
  {
    Slave_worker *worker = dynamic_cast<Slave_worker *>(thd->rli_slave);
    Commit_order_manager *mngr = worker->get_commit_order_manager();

    if (mngr->wait_for_its_turn(worker, all))
    {
      thd->commit_error = THD::CE_COMMIT_ERROR;
      DBUG_RETURN(thd->commit_error);
    }
    // 獲取 Lock_log 鎖, 非 leader 執行緒將被阻塞, 直到commit之後被 leader 執行緒喚醒, 非 leader 執行緒這裡返回 true, 執行緒應該等待提交完成。
    if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log))
      DBUG_RETURN(finish_commit(thd));
  }
  else
#endif
      // 獲取 Lock_log 鎖, 非 leader 執行緒將被阻塞, 直到被 leader 執行緒喚醒, 非 leader 執行緒這裡返回 true, 執行緒應該等待提交完成。
      if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log))
  {
    DBUG_RETURN(finish_commit(thd));
  }

  THD *wait_queue = NULL, *final_queue = NULL;
  mysql_mutex_t *leave_mutex_before_commit_stage = NULL;
  my_off_t flush_end_pos = 0;
  bool update_binlog_end_pos_after_sync;
  DEBUG_SYNC(thd, "waiting_in_the_middle_of_flush_stage");
  // 執行 flush 階段操作。
  /*
  * 1. 對 flush 佇列進行 fetch, 本次處理的flush佇列就固定了
    2. 在 innodb 儲存引擎中 flush redo log, 做 innodb 層 redo 持久化。
    3. 為 flush 佇列中每個事務生成 gtid。
    4. 將 flush佇列中每個執行緒的 binlog cache flush 到 binlog 日誌檔案中。這裡包含兩步:
            1. 將事務的 GTID event直接寫入 binlog 磁碟檔案中
            2. 將事務生成的別的 event 寫入 binlog file cache 中
  */
  flush_error = process_flush_stage_queue(&total_bytes, &do_rotate,
                                          &wait_queue);
  // 將 binary log cache(IO cache) flush到檔案中
  if (flush_error == 0 && total_bytes > 0)
    flush_error = flush_cache_to_file(&flush_end_pos);
  // sync_binlog 是否等於 1
  update_binlog_end_pos_after_sync = (get_sync_period() == 1);

  /*
    如果 flush 操作成功, 則呼叫 after_flush hook。
  */
  if (flush_error == 0)
  {
    const char *file_name_ptr = log_file_name + dirname_length(log_file_name);
    assert(flush_end_pos != 0);
    if (RUN_HOOK(binlog_storage, after_flush,
                 (thd, file_name_ptr, flush_end_pos)))
    {
      sql_print_error("Failed to run 'after_flush' hooks");
      flush_error = ER_ERROR_ON_WRITE;
    }
    // 不等於 1, 通知 dump 執行緒
    if (!update_binlog_end_pos_after_sync)
      // 更新 binlog end pos, 通知 dump 執行緒向從庫傳送 event
      update_binlog_end_pos();
    DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
  }

  if (flush_error)
  {
    /*
      Handle flush error (if any) after leader finishes it's flush stage.
      如果存在 flush 錯誤, 則處理 flush錯誤
    */
    handle_binlog_flush_or_sync_error(thd, false /* need_lock_log */,
                                      (thd->commit_error == THD::CE_FLUSH_GNO_EXHAUSTED_ERROR)
                                          ? ER(ER_GNO_EXHAUSTED)
                                          : NULL);
  }
  /*
    Stage #2: Syncing binary log file to disk
    sync binary log file to disk.
  */
  /** 釋放 Lock_log mutex, 獲取 Lock_sync mutex
   *  第一個進入的 flush 佇列的 leader 為本階段的 leader, 其他 flush 佇列加入 sync 佇列, 其他 flush 佇列的
   * leader會被阻塞, 直到 commit 階段被 leader 執行緒喚醒。
   * */
  if (change_stage(thd, Stage_manager::SYNC_STAGE, wait_queue, &LOCK_log, &LOCK_sync))
  {
    DBUG_RETURN(finish_commit(thd));
  }

  /*
    根據 delay 的設定來決定是否延遲一段時間, 如果 delay 的時間越久, 那麼加入 sync 佇列的
    事務就越多【last commit 是在 binlog prepare 時生成的, 尚未更改, 因此加入 sync 佇列的
    事務是同一組事務】, 提高了從庫 mts 的效率。
  */
  if (!flush_error && (sync_counter + 1 >= get_sync_period()))
    stage_manager.wait_count_or_timeout(opt_binlog_group_commit_sync_no_delay_count,
                                        opt_binlog_group_commit_sync_delay,
                                        Stage_manager::SYNC_STAGE);
  // fetch sync 佇列, 對 sync 佇列進行固化。
  final_queue = stage_manager.fetch_queue_for(Stage_manager::SYNC_STAGE);
  // 這裡 sync_binlog file到磁碟中
  if (flush_error == 0 && total_bytes > 0)
  {
    // 根據 sync_binlog 的設定決定是否刷盤
    std::pair<bool, bool> result = sync_binlog_file(false);
    sync_error = result.first;
  }
  // 在這裡 sync_binlog = 1, 更新 binlog end_pos, 通知 dump 執行緒傳送 event
  if (update_binlog_end_pos_after_sync)
  {
    THD *tmp_thd = final_queue;
    const char *binlog_file = NULL;
    my_off_t pos = 0;
    while (tmp_thd->next_to_commit != NULL)
      tmp_thd = tmp_thd->next_to_commit;
    if (flush_error == 0 && sync_error == 0)
    {
      tmp_thd->get_trans_fixed_pos(&binlog_file, &pos);
      // 更新 binlog end pos, 通知 dump 執行緒
      update_binlog_end_pos(binlog_file, pos);
    }
  }

  leave_mutex_before_commit_stage = &LOCK_sync;
  /*
    Stage #3: Commit all transactions in order.
    按順序在 Innodb 層提交所有事務。  
    如果我們不需要對提交順序進行排序, 並且每個執行緒必須執行 handlerton 提交, 那麼這個階段可以跳過。
    然而, 由於我們保留了前一階段的鎖, 如果我們跳過這個階段, 則必須進行解鎖。
  */
commit_stage:
  // 如果需要順序提交
  if (opt_binlog_order_commits &&
      (sync_error == 0 || binlog_error_action != ABORT_SERVER))
  {
    // SYNC佇列加入 COMMIT 佇列, 第一個進入的 SYNC 佇列的 leader 為本階段的 leader。其他 sync 佇列
    // 加入 commit 佇列的 leade 會被阻塞, 直到 COMMIT 階段後被 leader 執行緒喚醒。
    // 釋放 lock_sync mutex, 持有 lock_commit mutex.
    if (change_stage(thd, Stage_manager::COMMIT_STAGE,
                     final_queue, leave_mutex_before_commit_stage,
                     &LOCK_commit))
    {
      DBUG_RETURN(finish_commit(thd));
    }
    // 固化 commit 佇列
    THD *commit_queue = stage_manager.fetch_queue_for(Stage_manager::COMMIT_STAGE);
    // 呼叫 after_sync hook
    if (flush_error == 0 && sync_error == 0)
      // 呼叫 after_sync hook.
      sync_error = call_after_sync_hook(commit_queue);

    /*
      process_commit_stage_queue 將為佇列中每個 thd 持有的 GTID 
      呼叫 update_on_commit 或 update_on_rollback。

      這樣做的目的是確保 gtid 按照順序新增到 GTIDs中, 避免出現不必要的間隙

      如果我們只允許每個執行緒在完成提交時呼叫 update_on_commit, 則無法保證 GTID
      順序, 並且 gtid_executed 之間可能出現空隙。發生這種情況, server必須從 
      Gtid_set 中新增和刪除間隔, 新增或刪除間隔需要一個互斥鎖, 這會降低效能。
    */
    process_commit_stage_queue(thd, commit_queue);
    // 退出 Lock_commit 鎖
    mysql_mutex_unlock(&LOCK_commit);
    /*
      Process after_commit after LOCK_commit is released for avoiding
      3-way deadlock among user thread, rotate thread and dump thread.
      在 LOCK_commit 釋放之後處理 after_commit 來避免 user thread, rotate thread 和 dump thread的
      3路死鎖。
    */
    process_after_commit_stage_queue(thd, commit_queue);
    final_queue = commit_queue;
  }
  else
  {
    // 釋放鎖, 呼叫 after_sync hook.
    if (leave_mutex_before_commit_stage)
      mysql_mutex_unlock(leave_mutex_before_commit_stage);
    if (flush_error == 0 && sync_error == 0)
      sync_error = call_after_sync_hook(final_queue);
  }

  /*
    Handle sync error after we release all locks in order to avoid deadlocks
    為了避免死鎖, 在釋放所有的 locks 之後處理sync error
  */
  if (sync_error)
    handle_binlog_flush_or_sync_error(thd, true /* need_lock_log */, NULL);

  /* Commit done so signal all waiting threads 
      commit完成之後通知所有處於 wait 狀態的執行緒
  */
  stage_manager.signal_done(final_queue);

  /*
    Finish the commit before executing a rotate, or run the risk of a
    deadlock. We don't need the return value here since it is in
    thd->commit_error, which is returned below.
    在執行 rotate 之前完成commit, 否則可能出現死鎖。
  */
  (void)finish_commit(thd);

  /*
    If we need to rotate, we do it without commit error.
    Otherwise the thd->commit_error will be possibly reset.
    rotate
   */
  if (DBUG_EVALUATE_IF("force_rotate", 1, 0) ||
      (do_rotate && thd->commit_error == THD::CE_NONE &&
       !is_rotating_caused_by_incident))
  {
    /*
      Do not force the rotate as several consecutive groups may
      request unnecessary rotations.

      NOTE: Run purge_logs wo/ holding LOCK_log because it does not
      need the mutex. Otherwise causes various deadlocks.
    */

    DEBUG_SYNC(thd, "ready_to_do_rotation");
    bool check_purge = false;
    mysql_mutex_lock(&LOCK_log);
    /*
      If rotate fails then depends on binlog_error_action variable
      appropriate action will be taken inside rotate call.
    */
    int error = rotate(false, &check_purge);
    mysql_mutex_unlock(&LOCK_log);

    if (error)
      thd->commit_error = THD::CE_COMMIT_ERROR;
    else if (check_purge)
      purge();
  }
  /*
    flush or sync errors are handled above (using binlog_error_action).
    Hence treat only COMMIT_ERRORs as errors.
  */
  DBUG_RETURN(thd->commit_error == THD::CE_COMMIT_ERROR);
}