1. 程式人生 > >實現基於redis的分散式鎖並整合spring-boot-starter

實現基於redis的分散式鎖並整合spring-boot-starter

文章目錄


如果你還不瞭解什麼是分散式鎖請參考小灰的部落格 什麼是分散式鎖?
書接上文 如何設計redis分散式鎖,這篇文章對redis分散式鎖進行了具體實現,並且將功能融入spring-boot,實現使用註解化,配置極簡化。github地址在
這裡

概述

此篇文章主要結合測試用例介紹redis分散式鎖使用,以及深入介紹實現原理。如果想在專案中使用請先閱讀README

使用

1.導包

clone該專案,進入專案目錄執行mvn install,把包安裝到maven倉庫

    <dependency>
        <groupId>com.redislock</groupId>
        <artifactId>redislock-spring-boot-starter</artifactId>
        <version>1.0.0</version>
    </dependency>

版本要求:spring-boot 2.0.0以上 ,jdk 1.8以上,redis 2.6.12以上

2.寫一個實現鎖功能的service

如下:

@Service
public class TestService {
    @RedisSynchronized(value = "talk",fallbackMethod = "shutup")
    public String myTurn(String speak){
        //已經獲得了鎖,可以對搶佔到的資源做操作了
        //為了便於觀察我們在獲得鎖後,讓執行緒sleep10s(獨佔這個鎖10s)
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return speak;
    }
    @Fallback(value = "shutup",replaceReturn = true)
    private String notYourTurn(RedisLockJoinPoint redisLockJoinPoint){
        //當鎖失敗會走入降級方法
        return "silence";
    }
}

3.檢查redis的key

安裝好redis客戶端之後,啟動redis服務,登陸到redis客戶端

127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379>

目前redis中沒有任何的key

4.呼叫(鎖成功)

呼叫第一步中@RedisSynchronized標註的方法

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RedislockApplication.class)
public class RedislockTest {
    @Autowired
    private TestService testService;
    @Test
    public void testLock() {
        System.out.println("執行結果:" + testService.myTurn("bulaha", 0));
    }
}

在方法執行結束之前,再次檢視redis,可以看到,redis裡多出了一個key,與@RedisSynchronized的value值相同,代表當前方法獲得了鎖。

127.0.0.1:6379> keys *
1) "talk"
127.0.0.1:6379>

10s後,執行結果如下:

執行結果:bulaha

方法執行結束後,再次觀察redis,可以發現"talk"已經被自動移除了,即鎖被釋放了。

127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> keys *
1) "talk"
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379>

5.呼叫(鎖失敗)

現在模擬一下當其他的執行緒或者程序已經搶佔到鎖時,當前執行緒鎖失敗時的情況。
先在redis中設定一個key “talk”

127.0.0.1:6379> set talk 1
OK
127.0.0.1:6379> keys *
1) "talk"
127.0.0.1:6379> 

再執行一次在第三步中的測試方法,執行結果如下:

執行結果:silence

可以看到,最終並沒有像成功獲得鎖時,輸出我們期望的結果"bulaha",而是輸出了第一步中@Fallback方法的返回值,這就是所謂的方法降級。在鎖失敗時,執行一些替代的邏輯並返回替代的值。(也可以不指定降級方法,鎖失敗會丟擲異常。)

目前只要有
三個註解@RedisSynchronized,@Fallback,@FallbackHandler
三個配置項
redislock.prefix
redislock.timeout
redislock.heart-beat
詳見README

實現

1.redislock如何實現

實現分散式鎖的策略為:

1.使用setnx在redis中設定一個鍵,並且設定過期時間(redis在2.6.12版本之後,支援了加強的set命令,即SET key value [expiration EX seconds|PX milliseconds] [NX|XX],一條set命令就可以實現setnx+expire,我們不必再為上篇文章中所說的setnx和expire分別執行時產生的死鎖問題擔心了,是不是很贊呢)
具體在java裡怎麼向redis傳送這條命令呢,這裡我們使用spring-data-redis,程式碼如下

public class MyRedisTemplate{
    private RedisTemplate<String,String> redisTemplate;

    public void setRedisTemplate(RedisTemplate<String,String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public Boolean setifAbsent(String key, String value, long timeoutMilis){
        Boolean execute = redisTemplate.execute(new MyRedisCallback(key,value,timeoutMilis));
        return execute;
    }
class MyRedisCallback implements RedisCallback<Boolean>{
    private String key;
    private String value;
    private long timeoutMilis;

    public MyRedisCallback(String key, String value, long timeoutMilis) {
        this.key = key;
        this.value = value;
        this.timeoutMilis = timeoutMilis;
    }

    @Override
    public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
        RedisStringCommands redisStringCommands = connection.stringCommands();
        //執行加強了的set命令,返回true代表設定成功
        Boolean set = redisStringCommands.set(key.getBytes(), value.getBytes(), Expiration.milliseconds(timeoutMilis), RedisStringCommands.SetOption.SET_IF_ABSENT);
        return set;
    }
}
    public Boolean expire(String key, long time) {
        return redisTemplate.expire(key,time,TimeUnit.MILLISECONDS);
    }

    public Boolean delete(String key) {
        return redisTemplate.delete(key);
    }
}

2.將成功獲得的鎖加入到一個集合中,開啟一個Timer,給集合中的鎖續命。由於這個集合是要線上程之間共享的,所以要使用執行緒安全的集合,這裡使用ConcurrentHashMap。(本來想使用Set,但是java的concurrent包中沒有提供,就用ConcurrentHashMap來代替,其實Java中的Set也就是用Map來實現的)

3.當解鎖時先將key從“續命集合”中移除,再從redis中移除key。這裡不能將順序倒過來,因為如果A執行緒先從redis中移除key的話,可能出現馬上又有一個B執行緒得到了鎖並將key加入“續命集合”之後,A執行緒才將這個key從“續命集合”中移除,這樣執行緒B得到的鎖就沒有了續命功能。如果這個執行緒偏偏到鎖過期還沒有執行完,就出現了併發操作被鎖住的資源的情況。

public class RedisLock {
    private long timeout = LOCK_TIMEOUT;
    private long heartBeat = LOCK_HEART_BEAT;
    
    private MyRedisTemplate myRedisTemplate;

    public void setMyRedisTemplate(MyRedisTemplate myRedisTemplate) {
        this.myRedisTemplate = myRedisTemplate;
    }

    private Map<String,String> aliveLocks = new ConcurrentHashMap<String,String>();

    private final Timer timer = new Timer();

	//在bean初始化完成之後啟動timer,在timer任務中給鎖續命
    @PostConstruct
    public void init(){
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                keeplockAlive();
            }
        },0,heartBeat);
    }
    //在銷燬之前將timer取消
    @PreDestroy
    private void destroy(){
        timer.cancel();
    }
    private void keeplockAlive(){
        if(aliveLocks.size() > 0){
            for (String key:
                    aliveLocks.keySet()) {
                myRedisTemplate.expire(key,timeout);
            }
        }
    }

    public void unlock(String key) {
        aliveLocks.remove(key);
        myRedisTemplate.delete(key);
    }

    public boolean lock(String key){
        return lock(key,true);
    }

    public boolean lock(String key,boolean keepAlive){
        Boolean redisLock = myRedisTemplate.setifAbsent(key, "redisLock", timeout);
        if(redisLock && keepAlive){
            aliveLocks.put(key,"");
        }
        return redisLock;
    }
}

2.如何實現註解化

現在我們已經可以使用之前講到的RedisLock類實現加鎖解鎖,其實分散式鎖的基本功能已經實現了,你可以如下這樣使用來實現分散式鎖

redislock.lock("mykey");
//需要加鎖的程式碼段
redislock.unlock("mykey");

但是這樣使用起來不是很方便,也不是很安全,你需要把RedisLock例項注入進來,自己進行加解操作。
我們可以仿照Spring的做法,宣告一個註解用於標記想要加鎖的方法

@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisSynchronized {
    String value() default "";
    String fallbackMethod() default "";
}

現在我們有了自己的註解,你可以隨便加到哪個方法上標記他們。但是你會發現不論執行多少遍你所標記的方法都不會如你所願加上鎖,這是當然的事情。因為我們雖然有了自己的註解但是我們並沒有真正把加鎖的功能賦予給我們標記的方法。那我們該怎麼做才能像spring和其他框架一樣在使用時只要標記註解就能賦予相應的功能呢?那就是動態代理。可以這麼說實現Spring生態的兩大利器,其一是註解,其二就是動態代理。註解流於光鮮的外表,給我們提供友善的體驗;動態代理潛藏在背後支撐起整個機制的執行。
動態代理簡介:spirng中的動態代理使用主要有兩種,一種是jdk動態代理,一種是cglib動態代理。其中jdk動態代理要求被代理類一定要繼承介面,而cglib卻沒有這個限制,這也是cglib能夠盛行的原因之一。而cglib也分很多種版本,普通的cglib需要額外依賴,如asm;還有cglib-nodep顧名思義,no dependence,避免了在實際使用時依賴的包不相容的問題。而我們使用的不在以上兩個之中,我們使用的是spring-core包中自帶的cglib,Spring 添加了一些很讚的功能,接下來我會介紹到。

3.生成代理

要達到我們最終的目的,有以下三件事要做
1.代理,給方法上添加了@RedisSynchronized註解的任意業務bean生成帶有加解鎖功能的代理。
2.替換,在使用@Autoware注入bean時,注入的應該是我們代理後的bean。
3.時機,選取代理時機,即在我們業務程式碼執行之前也就是Spring初始化完成前要完成代理並替換。
結合以上三個要點我們結合cglib並引入Spring的一個元件就可以全部滿足,他就是BeanPostProcessor
BeanPostProcessor簡介:參考BeanPostProcessor的註釋可知,容器在啟動時可以自動檢測BeanPostProcessor的實現類提前例項化,並在其後例項話的bean例項化時回撥介面方法。
介面方法postProcessBeforeInitialization會在屬性設定完成後,初始化方法(init-method)執行之前被回撥。
介面方法postProcessAfterInitialization初始化方法(init-method)執行之後被回撥。
回撥方法的返回值可以是原來的bean,也可以是被代理的bean(either the original or a wrapped one)。

從簡介中我們理解到,只要繼承BeanPostProcessor,時機就有了,我們可以在回撥方法中進行代理,並且在代理後可以直接把生成的代理當作返回值返回,替換就完成了。
實現如下

public class RedisLockAutoProxyCreator implements BeanPostProcessor{
 
    private RedisLock redisLock;

    public void setRedisLock(RedisLock redisLock) {
        this.redisLock = redisLock;
    }
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class<?> aClass =  AopUtils.getTargetClass(bean);
        for (Method method : aClass.getDeclaredMethods()) {
         //如果有方法標註了RedisSynchronized就生成cglib代理,並返回代理bean
            if(method.isAnnotationPresent(RedisSynchronized.class)){
                Enhancer enhancer = new Enhancer();
                enhancer.setSuperclass(aClass);
                enhancer.setCallback(new MyHandler(bean));
                return enhancer.create();
            }
        }
        //如果沒有方法標註RedisSynchronized就返回原bean
        return bean;
    }
    class MyHandler implements InvocationHandler {
        private Object o;
        MyHandler(Object o){
            this.o=o;
        }
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //如果沒有被RedisSynchronized註解標註就不對方法做代理
            if(!method.isAnnotationPresent(RedisSynchronized.class)){
                return method.invoke(o, args);
            }
            //如果被RedisSynchronized註解標註,新增鎖功能
            String key = getKey(o,method,args);
            boolean locked = redisLock.lock(key);
            if(locked){
                try{
                    return method.invoke(o, args);
                }finally {
                    redisLock.unlock(key);
                }
            }
            //鎖失敗就異常
            throw new LockFailedException("lock failed");
        }

        private String getKey(Object o, Method method, Object[] args) {
            RedisSynchronized annotation = method.getAnnotation(RedisSynchronized.class);
            String key = annotation.value();
            if("".equals(key)){
                key = method.toGenericString();
            }
            return key;
        }
    }
}

為了更清晰的表達代理過程,此程式碼捨去了有點複雜的鎖降級部分,如果以上都理解了,可以clone程式碼看一下鎖降級的實現。

到此我們完成了註解化的redis鎖。

整合spring-boot-starter

由於篇幅問題先講到這裡,如果對整合spring-boot-starter感興趣,可以給我留言,我另寫一篇進行講解。