1. 程式人生 > >Mysql高手系列 - 第26篇:聊聊如何使用mysql實現分散式鎖

Mysql高手系列 - 第26篇:聊聊如何使用mysql實現分散式鎖

Mysql系列的目標是:通過這個系列從入門到全面掌握一個高階開發所需要的全部技能。

歡迎大家加我微信itsoku一起交流java、演算法、資料庫相關技術。

這是Mysql系列第26篇。

本篇我們使用mysql實現一個分散式鎖。

分散式鎖的功能

  1. 分散式鎖使用者位於不同的機器中,鎖獲取成功之後,才可以對共享資源進行操作
  2. 鎖具有重入的功能:即一個使用者可以多次獲取某個鎖
  3. 獲取鎖有超時的功能:即在指定的時間內去嘗試獲取鎖,超過了超時時間,如果還未獲取成功,則返回獲取失敗
  4. 能夠自動容錯,比如:A機器獲取鎖lock1之後,在釋放鎖lock1之前,A機器掛了,導致鎖lock1未釋放,結果會lock1一直被A機器佔有著,遇到這種情況時,分散式鎖要能夠自動解決,可以這麼做:持有鎖的時候可以加個持有超時時間,超過了這個時間還未釋放的,其他機器將有機會獲取鎖

預備技能:樂觀鎖

通常我們修改表中一條資料過程如下:

t1:select獲取記錄R1
t2:對R1進行編輯
t3:update R1

我們來看一下上面的過程存在的問題:

如果A、B兩個執行緒同時執行到t1,他們倆看到的R1的資料一樣,然後都對R1進行編輯,然後去執行t3,最終2個執行緒都會更新成功,後面一個執行緒會把前面一個執行緒update的結果給覆蓋掉,這就是併發修改資料存在的問題。

我們可以在表中新增一個版本號,每次更新資料時候將版本號作為條件,並且每次更新時候版本號+1,過程優化一下,如下:

t1:開啟事務start transaction
t2:select獲取記錄R1,宣告變數v=R1.version
t3:對R1進行編輯
t4:執行更新操作
    update R1 set version = version + 1 where user_id=#user_id# and version = #v#;
t5:t4中的update會返回影響的行數,我們將其記錄在count中,然後根據count來判斷提交還是回滾
    if(count==1){
        //提交事務
        commit;
    }else{
        //回滾事務
        rollback;
    }

上面重點在於步驟t4,當多個執行緒同時執行到t1,他們看到的R1是一樣的,但是當他們執行到t4的時候,資料庫會對update的這行記錄加鎖,確保併發情況下排隊執行,所以只有第一個的update會返回1,其他的update結果會返回0,然後後面會判斷count是否為1,進而對事務進行提交或者回滾。可以通過count的值知道修改資料是否成功了。

上面這種方式就樂觀鎖。我們可以通過樂觀鎖的方式確保資料併發修改過程中的正確性。

使用mysql實現分散式鎖

建表

我們建立一個分散式鎖表,如下

DROP DATABASE IF EXISTS javacode2018;
CREATE DATABASE javacode2018;
USE javacode2018;
DROP TABLE IF EXISTS t_lock;
create table t_lock(
  lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '鎖唯一標誌',
  request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用來標識請求物件的',
  lock_count INT NOT NULL DEFAULT 0 COMMENT '當前上鎖次數',
  timeout BIGINT NOT NULL DEFAULT 0 COMMENT '鎖超時時間',
  version INT NOT NULL DEFAULT 0 COMMENT '版本號,每次更新+1'
)COMMENT '鎖資訊表';

分散式鎖工具類:

package com.itsoku.sql;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.sql.*;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;


/**
 * 工作10年的前阿里P7分享Java、演算法、資料庫方面的技術乾貨!堅信用技術改變命運,讓家人過上更體面的生活!
 * 喜歡的請關注公眾號:路人甲Java
 */
@Slf4j
public class LockUtils {

    //將requestid儲存在該變數中
    static ThreadLocal<String> requestIdTL = new ThreadLocal<>();

    /**
     * 獲取當前執行緒requestid
     *
     * @return
     */
    public static String getRequestId() {
        String requestId = requestIdTL.get();
        if (requestId == null || "".equals(requestId)) {
            requestId = UUID.randomUUID().toString();
            requestIdTL.set(requestId);
        }
        log.info("requestId:{}", requestId);
        return requestId;
    }

    /**
     * 獲取鎖
     *
     * @param lock_key        鎖key
     * @param locktimeout(毫秒) 持有鎖的有效時間,防止死鎖
     * @param gettimeout(毫秒)  獲取鎖的超時時間,這個時間內獲取不到將重試
     * @return
     */
    public static boolean lock(String lock_key, long locktimeout, int gettimeout) throws Exception {
        log.info("start");
        boolean lockResult = false;
        String request_id = getRequestId();
        long starttime = System.currentTimeMillis();
        while (true) {
            LockModel lockModel = LockUtils.get(lock_key);
            if (Objects.isNull(lockModel)) {
                //插入一條記錄,重新嘗試獲取鎖
                LockUtils.insert(LockModel.builder().lock_key(lock_key).request_id("").lock_count(0).timeout(0L).version(0).build());
            } else {
                String reqid = lockModel.getRequest_id();
                //如果reqid為空字元,表示鎖未被佔用
                if ("".equals(reqid)) {
                    lockModel.setRequest_id(request_id);
                    lockModel.setLock_count(1);
                    lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
                    if (LockUtils.update(lockModel) == 1) {
                        lockResult = true;
                        break;
                    }
                } else if (request_id.equals(reqid)) {
                    //如果request_id和表中request_id一樣表示鎖被當前執行緒持有者,此時需要加重入鎖
                    lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
                    lockModel.setLock_count(lockModel.getLock_count() + 1);
                    if (LockUtils.update(lockModel) == 1) {
                        lockResult = true;
                        break;
                    }
                } else {
                    //鎖不是自己的,並且已經超時了,則重置鎖,繼續重試
                    if (lockModel.getTimeout() < System.currentTimeMillis()) {
                        LockUtils.resetLock(lockModel);
                    } else {
                        //如果未超時,休眠100毫秒,繼續重試
                        if (starttime + gettimeout > System.currentTimeMillis()) {
                            TimeUnit.MILLISECONDS.sleep(100);
                        } else {
                            break;
                        }
                    }
                }
            }
        }
        log.info("end");
        return lockResult;
    }

    /**
     * 釋放鎖
     *
     * @param lock_key
     * @throws Exception
     */
    public static void unlock(String lock_key) throws Exception {
        //獲取當前執行緒requestId
        String requestId = getRequestId();
        LockModel lockModel = LockUtils.get(lock_key);
        //當前執行緒requestId和庫中request_id一致 && lock_count>0,表示可以釋放鎖
        if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) {
            if (lockModel.getLock_count() == 1) {
                //重置鎖
                resetLock(lockModel);
            } else {
                lockModel.setLock_count(lockModel.getLock_count() - 1);
                LockUtils.update(lockModel);
            }
        }
    }

    /**
     * 重置鎖
     *
     * @param lockModel
     * @return
     * @throws Exception
     */
    public static int resetLock(LockModel lockModel) throws Exception {
        lockModel.setRequest_id("");
        lockModel.setLock_count(0);
        lockModel.setTimeout(0L);
        return LockUtils.update(lockModel);
    }

    /**
     * 更新lockModel資訊,內部採用樂觀鎖來更新
     *
     * @param lockModel
     * @return
     * @throws Exception
     */
    public static int update(LockModel lockModel) throws Exception {
        return exec(conn -> {
            String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND  version = ?";
            PreparedStatement ps = conn.prepareStatement(sql);
            int colIndex = 1;
            ps.setString(colIndex++, lockModel.getRequest_id());
            ps.setInt(colIndex++, lockModel.getLock_count());
            ps.setLong(colIndex++, lockModel.getTimeout());
            ps.setString(colIndex++, lockModel.getLock_key());
            ps.setInt(colIndex++, lockModel.getVersion());
            return ps.executeUpdate();
        });
    }

    public static LockModel get(String lock_key) throws Exception {
        return exec(conn -> {
            String sql = "select * from t_lock t WHERE t.lock_key=?";
            PreparedStatement ps = conn.prepareStatement(sql);
            int colIndex = 1;
            ps.setString(colIndex++, lock_key);
            ResultSet rs = ps.executeQuery();
            if (rs.next()) {
                return LockModel.builder().
                        lock_key(lock_key).
                        request_id(rs.getString("request_id")).
                        lock_count(rs.getInt("lock_count")).
                        timeout(rs.getLong("timeout")).
                        version(rs.getInt("version")).build();
            }
            return null;
        });
    }

    public static int insert(LockModel lockModel) throws Exception {
        return exec(conn -> {
            String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)";
            PreparedStatement ps = conn.prepareStatement(sql);
            int colIndex = 1;
            ps.setString(colIndex++, lockModel.getLock_key());
            ps.setString(colIndex++, lockModel.getRequest_id());
            ps.setInt(colIndex++, lockModel.getLock_count());
            ps.setLong(colIndex++, lockModel.getTimeout());
            ps.setInt(colIndex++, lockModel.getVersion());
            return ps.executeUpdate();
        });
    }

    public static <T> T exec(SqlExec<T> sqlExec) throws Exception {
        Connection conn = getConn();
        try {
            return sqlExec.exec(conn);
        } finally {
            closeConn(conn);
        }
    }

    @FunctionalInterface
    public interface SqlExec<T> {
        T exec(Connection conn) throws Exception;
    }

    @Getter
    @Setter
    @Builder
    public static class LockModel {
        private String lock_key;
        private String request_id;
        private Integer lock_count;
        private Long timeout;
        private Integer version;
    }

    private static final String url = "jdbc:mysql://localhost:3306/javacode2018?useSSL=false";        //資料庫地址
    private static final String username = "root";        //資料庫使用者名稱
    private static final String password = "root123";        //資料庫密碼
    private static final String driver = "com.mysql.jdbc.Driver";        //mysql驅動

    /**
     * 連線資料庫
     *
     * @return
     */
    public static Connection getConn() {
        Connection conn = null;
        try {
            Class.forName(driver);  //載入資料庫驅動
            try {
                conn = DriverManager.getConnection(url, username, password);  //連線資料庫
            } catch (SQLException e) {
                e.printStackTrace();
            }
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return conn;
    }

    /**
     * 關閉資料庫連結
     *
     * @return
     */
    public static void closeConn(Connection conn) {
        if (conn != null) {
            try {
                conn.close();  //關閉資料庫連結
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

上面程式碼中實現了文章開頭列的分散式鎖的所有功能,大家可以認真研究下獲取鎖的方法:lock,釋放鎖的方法:unlock

測試用例

package com.itsoku.sql;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import static com.itsoku.sql.LockUtils.lock;
import static com.itsoku.sql.LockUtils.unlock;

/**
 * 工作10年的前阿里P7分享Java、演算法、資料庫方面的技術乾貨!堅信用技術改變命運,讓家人過上更體面的生活!
 * 喜歡的請關注公眾號:路人甲Java
 */
@Slf4j
public class LockUtilsTest {

    //測試重複獲取和重複釋放
    @Test
    public void test1() throws Exception {
        String lock_key = "key1";
        for (int i = 0; i < 10; i++) {
            lock(lock_key, 10000L, 1000);
        }
        for (int i = 0; i < 9; i++) {
            unlock(lock_key);
        }
    }

    //獲取之後不釋放,超時之後被thread1獲取
    @Test
    public void test2() throws Exception {
        String lock_key = "key2";
        lock(lock_key, 5000L, 1000);
        Thread thread1 = new Thread(() -> {
            try {
                try {
                    lock(lock_key, 5000L, 7000);
                } finally {
                    unlock(lock_key);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        thread1.setName("thread1");
        thread1.start();
        thread1.join();
    }
}

test1方法測試了重入鎖的效果。

test2測試了主執行緒獲取鎖之後一直未釋放,持有鎖超時之後被thread1獲取到了。

留給大家一個問題

上面分散式鎖還需要考慮一個問題:比如A機會獲取了key1的鎖,並設定持有鎖的超時時間為10秒,但是獲取鎖之後,執行了一段業務操作,業務操作耗時超過10秒了,此時機器B去獲取鎖時可以獲取成功的,此時會導致A、B兩個機器都獲取鎖成功了,都在執行業務操作,這種情況應該怎麼處理?大家可以思考一下然後留言,我們一起討論一下。

更多優質文章

  1. java高併發系列全集(34篇)
  2. mysql高手系列(20多篇,高手必備)
  3. 聊聊db和快取一致性常見的實現方式

mysql系列大概有20多篇,喜歡的請關注一下,歡迎大家加我微信itsoku或者留言交流mysql相關技術!

相關推薦

Mysql高手系列 - 26聊聊如何使用mysql實現分散式

Mysql系列的目標是:通過這個系列從入門到全面掌握一個高階開發所需要的全部技能。 歡迎大家加我微信itsoku一起交流java、演算法、資料庫相關技術。 這是Mysql系列第26篇。 本篇我們使用mysql實現一個分散式鎖。 分散式鎖的功能 分散式鎖使用者位於不同的機器中,鎖獲取成功之後,才可以對共享資源

Mysql高手系列 - 7玩轉select條件查詢,避免踩坑

這是Mysql系列第7篇。 環境:mysql5.7.25,cmd命令中進行演示。 電商中:我們想檢視某個使用者所有的訂單,或者想檢視某個使用者在某個時間段內所有的訂單,此時我們需要對訂單表資料進行篩選,按照使用者、時間進行過濾,得到我們期望的結果。 此時我們需要使用條件查詢來對指定表進行操作,我們需要了解sq

Mysql高手系列 - 8詳解排序和分頁(order by & limit),及存在的坑

這是Mysql系列第8篇。 環境:mysql5.7.25,cmd命令中進行演示。 程式碼中被[]包含的表示可選,|符號分開的表示可選其一。 本章內容 詳解排序查詢 詳解limit limit存在的坑 分頁查詢中的坑 排序查詢(order by) 電商中:我們想檢視今天所有成交的訂單,按照交易額從高到低排序

Mysql高手系列 - 9詳解分組查詢,mysql分組有大坑!

這是Mysql系列第9篇。 環境:mysql5.7.25,cmd命令中進行演示。 本篇內容 分組查詢語法 聚合函式 單欄位分組 多欄位分組 分組前篩選資料 分組後篩選資料 where和having的區別 分組後排序 where & group by & having & order

Mysql高手系列 - 11深入瞭解連線查詢及原理

這是Mysql系列第11篇。 環境:mysql5.7.25,cmd命令中進行演示。 當我們查詢的資料來源於多張表的時候,我們需要用到連線查詢,連線查詢使用率非常高,希望大家都務必掌握。 本文內容 笛卡爾積 內連線 外連線 左連線 右連線 表連線的原理 使用java實現連線查詢,加深理解 準備資料 2張表

Mysql高手系列 - 10常用的幾十個函式詳解,收藏慢慢看

這是Mysql系列第10篇。 環境:mysql5.7.25,cmd命令中進行演示。 MySQL 數值型函式 函式名稱 作 用 abs 求絕對值 sqrt 求二次方根 mod 求餘數 ceil 和 ceiling 兩個函式功能相同,都是返回不小於引數的最小整數,即向上取整 floo

Mysql高手系列 - 12子查詢詳解

這是Mysql系列第12篇。 環境:mysql5.7.25,cmd命令中進行演示。 本章節非常重要。 子查詢 出現在select語句中的select語句,稱為子查詢或內查詢。 外部的select查詢語句,稱為主查詢或外查詢。 子查詢分類 按結果集的行列數不同分為4種 標量子查詢(結果集只有一行一列) 列子查

Mysql高手系列 - 13細說NULL導致的神坑,讓人防不勝防

這是Mysql系列第13篇。 環境:mysql5.7.25,cmd命令中進行演示。 當資料的值為NULL的時候,可能出現各種意想不到的效果,讓人防不勝防,我們來看看NULL導致的各種神坑,如何避免? 比較運算子中使用NULL 認真看下面的效果 mysql> select 1>NULL; +--

Mysql高手系列 - 14詳解事務

這是Mysql系列第14篇。 環境:mysql5.7.25,cmd命令中進行演示。 開發過程中,會經常用到資料庫事務,所以本章非常重要。 本篇內容 什麼是事務,它有什麼用? 事務的幾個特性 事務常見操作指令詳解 事務的隔離級別詳解 髒讀、不可重複讀、可重複讀、幻讀詳解 演示各種隔離級別產生的現象 關於隔離級

Mysql高手系列 - 21什麼是索引?

Mysql系列的目標是:通過這個系列從入門到全面掌握一個高階開發所需要的全部技能。 這是Mysql系列第21篇。 本文開始連續3篇詳解mysql索引: 第1篇來說說什麼是索引? 第2篇詳解Mysql中索引的原理 第3篇結合索引詳解關鍵字explain 本文為索引第一篇:我們來了解一下什麼是索引? 路人在搞

Mysql高手系列 - 18mysql流程控制語句詳解(高手進階)

Mysql系列的目標是:通過這個系列從入門到全面掌握一個高階開發所需要的全部技能。 這是Mysql系列第18篇。 環境:mysql5.7.25,cmd命令中進行演示。 程式碼中被[]包含的表示可選,|符號分開的表示可選其一。 上一篇儲存過程&自定義函式,對儲存過程和自定義函式做了一個簡單的介紹,但是如

Mysql高手系列 - 19mysql遊標詳解,此技能可用於救火

Mysql系列的目標是:通過這個系列從入門到全面掌握一個高階開發所需要的全部技能。 這是Mysql系列第19篇。 環境:mysql5.7.25,cmd命令中進行演示。 程式碼中被[]包含的表示可選,|符號分開的表示可選其一。 需求背景 當我們需要對一個select的查詢結果進行遍歷處理的時候,如何實現呢? 此

Mysql高手系列 - 20異常捕獲及處理詳解(實戰經驗)

Mysql系列的目標是:通過這個系列從入門到全面掌握一個高階開發所需要的全部技能。 這是Mysql系列第20篇。 環境:mysql5.7.25,cmd命令中進行演示。 程式碼中被[]包含的表示可選,|符號分開的表示可選其一。 需求背景 我們在寫儲存過程的時候,可能會出現下列一些情況: 插入的資料違反唯一約束

Mysql高手系列 - 22深入理解mysql索引原理,連載中

Mysql系列的目標是:通過這個系列從入門到全面掌握一個高階開發所需要的全部技能。 歡迎大家加我微信itsoku一起交流java、演算法、資料庫相關技術。 這是Mysql系列第22篇。 背景 使用mysql最多的就是查詢,我們迫切的希望mysql能查詢的更快一些,我們經常用到的查詢有: 按照id查詢唯一一條

Mysql高手系列 - 24如何正確的使用索引?【高手進階】

Mysql系列的目標是:通過這個系列從入門到全面掌握一個高階開發所需要的全部技能。 歡迎大家加我微信itsoku一起交流java、演算法、資料庫相關技術。 這是Mysql系列第24篇。 學習索引,主要是寫出更快的sql,當我們寫sql的時候,需要明確的知道sql為什麼會走索引?為什麼有些sql不走索引?sql

Mysql高手系列 - 27mysql如何確保資料不丟失的?我們借鑑這種設計思想實現熱點賬戶高併發設計及跨庫轉賬問題

Mysql系列的目標是:通過這個系列從入門到全面掌握一個高階開發所需要的全部技能。 歡迎大家加我微信itsoku一起交流java、演算法、資料庫相關技術。 這是Mysql系列第27篇。 本篇文章我們先來看一下mysql是如何確保資料不丟失的,通過本文我們可以瞭解mysql內部確保資料不丟失的原理,學習裡面優秀

Mysql高手系列 - 4天DDL常見操作彙總

這是Mysql系列第4篇。 環境:mysql5.7.25,cmd命令中進行演示。 DDL:Data Define Language資料定義語言,主要用來對資料庫、表進行一些管理操作。 如:建庫、刪庫、建表、修改表、刪除表、對列的增刪改等等。 文中涉及到的語法用[]包含的內容屬於可選項,下面做詳細說明。 庫的管

Mysql高手系列 - 5天DML操作彙總,確定你都會?

這是Mysql系列第5篇。 環境:mysql5.7.25,cmd命令中進行演示。 DML(Data Manipulation Language)資料操作語言,以INSERT、UPDATE、DELETE三種指令為核心,分別代表插入、更新與刪除,是必須要掌握的指令,DML和SQL中的select熟稱CRUD(增刪

Python金融系列現代投資組合理論

作者:chen_h 微訊號 & QQ:862251340 微信公眾號:coderpai 第一篇:計算股票回報率,均值和方差 第二篇:簡單線性迴歸 第三篇:隨機變數和分佈 第四篇:置信區間和假設檢驗 第五篇:多元線性迴歸和殘差分析 第六篇:現代投資組合

Python金融系列多元線性迴歸和殘差分析

作者:chen_h 微訊號 & QQ:862251340 微信公眾號:coderpai 第一篇:計算股票回報率,均值和方差 第二篇:簡單線性迴歸 第三篇:隨機變數和分佈 第四篇:置信區間和假設檢驗 第五篇:多元線性迴歸和殘差分析 第六篇:現代投資組合