1. 程式人生 > >redis系列之數據庫與緩存數據一致性解決方案

redis系列之數據庫與緩存數據一致性解決方案

查詢緩存 src dev 信息 一次 ram ren red getc

redis系列之數據庫與緩存數據一致性解決方案

數據庫與緩存讀寫模式策略

寫完數據庫後是否需要馬上更新緩存還是直接刪除緩存? (1)、如果寫數據庫的值與更新到緩存值是一樣的,不需要經過任何的計算,可以馬上更新緩存,但是如果對於那種寫數據頻繁而讀數據少的場景並不合適這種解決方案,因為也許還沒有查詢就被刪除或修改了,這樣會浪費時間和資源 (2)、如果寫數據庫的值與更新緩存的值不一致,寫入緩存中的數據需要經過幾個表的關聯計算後得到的結果插入緩存中,那就沒有必要馬上更新緩存,只有刪除緩存即可,等到查詢的時候在去把計算後得到的結果插入到緩存中即可。 所以一般的策略是當更新數據時,先刪除緩存數據,然後更新數據庫,而不是更新緩存,等要查詢的時候才把最新的數據更新到緩存

數據庫與緩存雙寫情況下導致數據不一致問題

場景一

當更新數據時,如更新某商品的庫存,當前商品的庫存是100,現在要更新為99,先更新數據庫更改成99,然後刪除緩存,發現刪除緩存失敗了,這意味著數據庫存的是99,而緩存是100,這導致數據庫和緩存不一致。

場景一解決方案

這種情況應該是先刪除緩存,然後在更新數據庫,如果刪除緩存失敗,那就不要更新數據庫,如果說刪除緩存成功,而更新數據庫失敗,那查詢的時候只是從數據庫裏查了舊的數據而已,這樣就能保持數據庫與緩存的一致性。

場景二

在高並發的情況下,如果當刪除完緩存的時候,這時去更新數據庫,但還沒有更新完,另外一個請求來查詢數據,發現緩存裏沒有,就去數據庫裏查,還是以上面商品庫存為例,如果數據庫中產品的庫存是100,那麽查詢到的庫存是100,然後插入緩存,插入完緩存後,原來那個更新數據庫的線程把數據庫更新為了99,導致數據庫與緩存不一致的情況

場景二解決方案

遇到這種情況,可以用隊列的去解決這個問,創建幾個隊列,如20個,根據商品的ID去做hash值,然後對隊列個數取摸,當有數據更新請求時,先把它丟到隊列裏去,當更新完後在從隊列裏去除,如果在更新的過程中,遇到以上場景,先去緩存裏看下有沒有數據,如果沒有,可以先去隊列裏看是否有相同商品ID在做更新,如果有也把查詢的請求發送到隊列裏去,然後同步等待緩存更新完成。
這裏有一個優化點,如果發現隊列裏有一個查詢請求了,那麽就不要放新的查詢操作進去了,用一個while(true)循環去查詢緩存,循環個200MS左右,如果緩存裏還沒有則直接取數據庫的舊數據,一般情況下是可以取到的。 在高並發下解決場景二要註意的問題
(1)讀請求時長阻塞
由於讀請求進行了非常輕度的異步化,所以一定要註意讀超時的問題,每個讀請求必須在超時間內返回,該解決方案最大的風險在於可能數據更新很頻繁,導致隊列中擠壓了大量的更新操作在裏面,然後讀請求會發生大量的超時,最後導致大量的請求直接走數據庫,像遇到這種情況,一般要做好足夠的壓力測試,如果壓力過大,需要根據實際情況添加機器。
(2)請求並發量過高
這裏還是要做好壓力測試,多模擬真實場景,並發量在最高的時候QPS多少,扛不住就要多加機器,還有就是做好讀寫比例是多少
(3)多服務實例部署的請求路由
可能這個服務部署了多個實例,那麽必須保證說,執行數據更新操作,以及執行緩存更新操作的請求,都通過nginx服務器路由到相同的服務實例上
(4)熱點商品的路由問題,導致請求的傾斜
某些商品的讀請求特別高,全部打到了相同的機器的相同丟列裏了,可能造成某臺服務器壓力過大,因為只有在商品數據更新的時候才會清空緩存,然後才會導致讀寫並發,所以更新頻率不是太高的話,這個問題的影響並不是很大,但是確實有可能某些服務器的負載會高一些。

數據庫與緩存數據一致性解決方案流程圖

技術分享圖片

數據庫與緩存數據一致性解決方案對應代碼

商品庫存實體

[java] view plain copy
  1. package com.shux.inventory.entity;
  2. /**
  3. **********************************************
  4. * 描述:
  5. * Simba.Hua
  6. * 2017年8月30日
  7. **********************************************
  8. **/
  9. public class InventoryProduct {
  10. private Integer productId;
  11. private Long InventoryCnt;
  12. public Integer getProductId() {
  13. return productId;
  14. }
  15. public void setProductId(Integer productId) {
  16. this.productId = productId;
  17. }
  18. public Long getInventoryCnt() {
  19. return InventoryCnt;
  20. }
  21. public void setInventoryCnt(Long inventoryCnt) {
  22. InventoryCnt = inventoryCnt;
  23. }
  24. }

請求接口

[java] view plain copy
  1. /**
  2. **********************************************
  3. * 描述:
  4. * Simba.Hua
  5. * 2017年8月27日
  6. **********************************************
  7. **/
  8. public interface Request {
  9. public void process();
  10. public Integer getProductId();
  11. public boolean isForceFefresh();
  12. }

數據更新請求

[java] view plain copy
  1. package com.shux.inventory.request;
  2. import org.springframework.transaction.annotation.Transactional;
  3. import com.shux.inventory.biz.InventoryProductBiz;
  4. import com.shux.inventory.entity.InventoryProduct;
  5. /**
  6. **********************************************
  7. * 描述:更新庫存信息
  8. * 1、先刪除緩存中的數據
  9. * 2、更新數據庫中的數據
  10. * Simba.Hua
  11. * 2017年8月30日
  12. **********************************************
  13. **/
  14. public class InventoryUpdateDBRequest implements Request{
  15. private InventoryProductBiz inventoryProductBiz;
  16. private InventoryProduct inventoryProduct;
  17. public InventoryUpdateDBRequest(InventoryProduct inventoryProduct,InventoryProductBiz inventoryProductBiz){
  18. this.inventoryProduct = inventoryProduct;
  19. this.inventoryProductBiz = inventoryProductBiz;
  20. }
  21. @Override
  22. @Transactional
  23. public void process() {
  24. inventoryProductBiz.removeInventoryProductCache(inventoryProduct.getProductId());
  25. inventoryProductBiz.updateInventoryProduct(inventoryProduct);
  26. }
  27. @Override
  28. public Integer getProductId() {
  29. // TODO Auto-generated method stub
  30. return inventoryProduct.getProductId();
  31. }
  32. @Override
  33. public boolean isForceFefresh() {
  34. // TODO Auto-generated method stub
  35. return false;
  36. }
  37. }

查詢請求

[java] view plain copy
  1. package com.shux.inventory.request;
  2. import com.shux.inventory.biz.InventoryProductBiz;
  3. import com.shux.inventory.entity.InventoryProduct;
  4. /**
  5. **********************************************
  6. * 描述:查詢緩存數據
  7. * 1、從數據庫中查詢
  8. * 2、從數據庫中查詢後插入到緩存中
  9. * Simba.Hua
  10. * 2017年8月30日
  11. **********************************************
  12. **/
  13. public class InventoryQueryCacheRequest implements Request {
  14. private InventoryProductBiz inventoryProductBiz;
  15. private Integer productId;
  16. private boolean isForceFefresh;
  17. public InventoryQueryCacheRequest(Integer productId,InventoryProductBiz inventoryProductBiz,boolean isForceFefresh) {
  18. this.productId = productId;
  19. this.inventoryProductBiz = inventoryProductBiz;
  20. this.isForceFefresh = isForceFefresh;
  21. }
  22. @Override
  23. public void process() {
  24. InventoryProduct inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId);
  25. inventoryProductBiz.setInventoryProductCache(inventoryProduct);
  26. }
  27. @Override
  28. public Integer getProductId() {
  29. // TODO Auto-generated method stub
  30. return productId;
  31. }
  32. public boolean isForceFefresh() {
  33. return isForceFefresh;
  34. }
  35. public void setForceFefresh(boolean isForceFefresh) {
  36. this.isForceFefresh = isForceFefresh;
  37. }
  38. }

spring啟動時初始化隊列線程池

[java] view plain copy
  1. package com.shux.inventory.thread;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import com.shux.inventory.request.Request;
  6. import com.shux.inventory.request.RequestQueue;
  7. import com.shux.utils.other.SysConfigUtil;
  8. /**
  9. **********************************************
  10. * 描述:請求處理線程池,初始化隊列數及每個隊列最多能處理的數量
  11. * Simba.Hua
  12. * 2017年8月27日
  13. **********************************************
  14. **/
  15. public class RequestProcessorThreadPool {
  16. private static final int blockingQueueNum = SysConfigUtil.get("request.blockingqueue.number")==null?10:Integer.valueOf(SysConfigUtil.get("request.blockingqueue.number").toString());
  17. private static final int queueDataNum = SysConfigUtil.get("request.everyqueue.data.length")==null?100:Integer.valueOf(SysConfigUtil.get("request.everyqueue.data.length").toString());
  18. private ExecutorService threadPool = Executors.newFixedThreadPool(blockingQueueNum);
  19. private RequestProcessorThreadPool(){
  20. for(int i=0;i<blockingQueueNum;i++){//初始化隊列
  21. ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(queueDataNum);//每個隊列中放100條數據
  22. RequestQueue.getInstance().addQueue(queue);
  23. threadPool.submit(new RequestProcessorThread(queue));//把每個queue交個線程去處理,線程會處理每個queue中的數據
  24. }
  25. }
  26. public static class Singleton{
  27. private static RequestProcessorThreadPool instance;
  28. static{
  29. instance = new RequestProcessorThreadPool();
  30. }
  31. public static RequestProcessorThreadPool getInstance(){
  32. return instance;
  33. }
  34. }
  35. public static RequestProcessorThreadPool getInstance(){
  36. return Singleton.getInstance();
  37. }
  38. /**
  39. * 初始化線程池
  40. */
  41. public static void init(){
  42. getInstance();
  43. }
  44. }

請求處理線程

[java] view plain copy
  1. package com.shux.inventory.thread;
  2. import java.util.Map;
  3. import java.util.concurrent.ArrayBlockingQueue;
  4. import java.util.concurrent.Callable;
  5. import com.shux.inventory.request.InventoryUpdateDBRequest;
  6. import com.shux.inventory.request.Request;
  7. import com.shux.inventory.request.RequestQueue;
  8. /**
  9. **********************************************
  10. * 描述:請求處理線程
  11. * Simba.Hua
  12. * 2017年8月27日
  13. **********************************************
  14. **/
  15. public class RequestProcessorThread implements Callable<Boolean>{
  16. private ArrayBlockingQueue<Request> queue;
  17. public RequestProcessorThread(ArrayBlockingQueue<Request> queue){
  18. this.queue = queue;
  19. }
  20. @Override
  21. public Boolean call() throws Exception {
  22. Request request = queue.take();
  23. Map<Integer,Boolean> flagMap = RequestQueue.getInstance().getFlagMap();
  24. //不需要強制刷新的時候,查詢請求去重處理
  25. if (!request.isForceFefresh()){
  26. if (request instanceof InventoryUpdateDBRequest) {//如果是更新請求,那就置為false
  27. flagMap.put(request.getProductId(), true);
  28. } else {
  29. Boolean flag = flagMap.get(request.getProductId());
  30. /**
  31. * 標誌位為空,有三種情況
  32. * 1、沒有過更新請求
  33. * 2、沒有查詢請求
  34. * 3、數據庫中根本沒有數據
  35. * 在最初情況,一旦庫存了插入了數據,那就好會在緩存中也會放一份數據,
  36. * 但這種情況下有可能由於redis中內存滿了,redis通過LRU算法把這個商品給清除了,導致緩存中沒有數據
  37. * 所以當標誌位為空的時候,需要從數據庫重查詢一次,並且把標誌位置為false,以便後面的請求能夠從緩存中取
  38. */
  39. if ( flag == null) {
  40. flagMap.put(request.getProductId(), false);
  41. }
  42. /**
  43. * 如果不為空,並且flag為true,說明之前有一次更新請求,說明緩存中沒有數據了(更新緩存會先刪除緩存),
  44. * 這個時候就要去刷新緩存,即從數據庫中查詢一次,並把標誌位設置為false
  45. */
  46. if ( flag != null && flag) {
  47. flagMap.put(request.getProductId(), false);
  48. }
  49. /**
  50. * 這種情況說明之前有一個查詢請求,並且把數據刷新到了緩存中,所以這時候就不用去刷新緩存了,直接返回就可以了
  51. */
  52. if (flag != null && !flag) {
  53. flagMap.put(request.getProductId(), false);
  54. return true;
  55. }
  56. }
  57. }
  58. request.process();
  59. return true;
  60. }
  61. }

請求隊列

[java] view plain copy
  1. package com.shux.inventory.request;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.concurrent.ArrayBlockingQueue;
  6. import java.util.concurrent.ConcurrentHashMap;
  7. /**
  8. **********************************************
  9. * 描述:請求隊列
  10. * Simba.Hua
  11. * 2017年8月27日
  12. **********************************************
  13. **/
  14. public class RequestQueue {
  15. private List<ArrayBlockingQueue<Request>> queues = new ArrayList<>();
  16. private Map<Integer,Boolean> flagMap = new ConcurrentHashMap<>();
  17. private RequestQueue(){
  18. }
  19. private static class Singleton{
  20. private static RequestQueue queue;
  21. static{
  22. queue = new RequestQueue();
  23. }
  24. public static RequestQueue getInstance() {
  25. return queue;
  26. }
  27. }
  28. public static RequestQueue getInstance(){
  29. return Singleton.getInstance();
  30. }
  31. public void addQueue(ArrayBlockingQueue<Request> queue) {
  32. queues.add(queue);
  33. }
  34. public int getQueueSize(){
  35. return queues.size();
  36. }
  37. public ArrayBlockingQueue<Request> getQueueByIndex(int index) {
  38. return queues.get(index);
  39. }
  40. public Map<Integer,Boolean> getFlagMap() {
  41. return this.flagMap;
  42. }
  43. }


spring 啟動初始化線程池類

[java] view plain copy
  1. package com.shux.inventory.listener;
  2. import org.springframework.context.ApplicationListener;
  3. import org.springframework.context.event.ContextRefreshedEvent;
  4. import com.shux.inventory.thread.RequestProcessorThreadPool;
  5. /**
  6. **********************************************
  7. * 描述:spring 啟動初始化線程池類
  8. * Simba.Hua
  9. * 2017年8月27日
  10. **********************************************
  11. **/
  12. public class InitListener implements ApplicationListener<ContextRefreshedEvent>{
  13. @Override
  14. public void onApplicationEvent(ContextRefreshedEvent event) {
  15. // TODO Auto-generated method stub
  16. if(event.getApplicationContext().getParent() != null){
  17. return;
  18. }
  19. RequestProcessorThreadPool.init();
  20. }
  21. }

異步處理請求接口

[java] view plain copy
  1. package com.shux.inventory.biz;
  2. import com.shux.inventory.request.Request;
  3. /**
  4. **********************************************
  5. * 描述:請求異步處理接口,用於路由隊列並把請求加入到隊列中
  6. * Simba.Hua
  7. * 2017年8月30日
  8. **********************************************
  9. **/
  10. public interface IRequestAsyncProcessBiz {
  11. void process(Request request);
  12. }

異步處理請求接口實現

[java] view plain copy
  1. package com.shux.inventory.biz.impl;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.stereotype.Service;
  6. import com.shux.inventory.biz.IRequestAsyncProcessBiz;
  7. import com.shux.inventory.request.Request;
  8. import com.shux.inventory.request.RequestQueue;
  9. /**
  10. **********************************************
  11. * 描述:異步處理請求,用於路由隊列並把請求加入到隊列中
  12. * Simba.Hua
  13. * 2017年8月30日
  14. **********************************************
  15. **/
  16. @Service("requestAsyncProcessService")
  17. public class RequestAsyncProcessBizImpl implements IRequestAsyncProcessBiz {
  18. private Logger logger = LoggerFactory.getLogger(getClass());
  19. @Override
  20. public void process(Request request) {
  21. // 做請求的路由,根據productId路由到對應的隊列
  22. ArrayBlockingQueue<Request> queue = getQueueByProductId(request.getProductId());
  23. try {
  24. queue.put(request);
  25. } catch (InterruptedException e) {
  26. logger.error("產品ID{}加入隊列失敗",request.getProductId(),e);
  27. }
  28. }
  29. private ArrayBlockingQueue<Request> getQueueByProductId(Integer productId) {
  30. RequestQueue requestQueue = RequestQueue.getInstance();
  31. String key = String.valueOf(productId);
  32. int hashcode;
  33. int hash = (key == null) ? 0 : (hashcode = key.hashCode())^(hashcode >>> 16);
  34. //對hashcode取摸
  35. int index = (requestQueue.getQueueSize()-1) & hash;
  36. return requestQueue.getQueueByIndex(index);
  37. }
  38. }


數據更新請求controller

[java] view plain copy
  1. package com.shux.inventory.controller;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Controller;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.ResponseBody;
  6. import com.shux.inventory.biz.IRequestAsyncProcessBiz;
  7. import com.shux.inventory.biz.InventoryProductBiz;
  8. import com.shux.inventory.entity.InventoryProduct;
  9. import com.shux.inventory.request.InventoryUpdateDBRequest;
  10. import com.shux.inventory.request.Request;
  11. import com.shux.utils.other.Response;
  12. /**
  13. **********************************************
  14. * 描述:提交更新請求
  15. * Simba.Hua
  16. * 2017年9月1日
  17. **********************************************
  18. **/
  19. @Controller("/inventory")
  20. public class InventoryUpdateDBController {
  21. private @Autowired InventoryProductBiz inventoryProductBiz;
  22. private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz;
  23. @RequestMapping("/updateDBInventoryProduct")
  24. @ResponseBody
  25. public Response updateDBInventoryProduct(InventoryProduct inventoryProduct){
  26. Request request = new InventoryUpdateDBRequest(inventoryProduct,inventoryProductBiz);
  27. requestAsyncProcessBiz.process(request);
  28. return new Response(Response.SUCCESS,"更新成功");
  29. }
  30. }

數據查詢請求controller

[java] view plain copy
  1. package com.shux.inventory.controller;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Controller;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import com.shux.inventory.biz.IRequestAsyncProcessBiz;
  6. import com.shux.inventory.biz.InventoryProductBiz;
  7. import com.shux.inventory.entity.InventoryProduct;
  8. import com.shux.inventory.request.InventoryQueryCacheRequest;
  9. import com.shux.inventory.request.Request;
  10. /**
  11. **********************************************
  12. * 描述:提交查詢請求
  13. * 1、先從緩存中取數據
  14. * 2、如果能從緩存中取到數據,則返回
  15. * 3、如果不能從緩存取到數據,則等待20毫秒,然後再次去數據,直到200毫秒,如果超過200毫秒還不能取到數據,則從數據庫中取,並強制刷新緩存數據
  16. * Simba.Hua
  17. * 2017年9月1日
  18. **********************************************
  19. **/
  20. @Controller("/inventory")
  21. public class InventoryQueryCacheController {
  22. private @Autowired InventoryProductBiz inventoryProductBiz;
  23. private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz;
  24. @RequestMapping("/queryInventoryProduct")
  25. public InventoryProduct queryInventoryProduct(Integer productId) {
  26. Request request = new InventoryQueryCacheRequest(productId,inventoryProductBiz,false);
  27. requestAsyncProcessBiz.process(request);//加入到隊列中
  28. long startTime = System.currentTimeMillis();
  29. long allTime = 0L;
  30. long endTime = 0L;
  31. InventoryProduct inventoryProduct = null;
  32. while (true) {
  33. if (allTime > 200){//如果超過了200ms,那就直接退出,然後從數據庫中查詢
  34. break;
  35. }
  36. try {
  37. inventoryProduct = inventoryProductBiz.loadInventoryProductCache(productId);
  38. if (inventoryProduct != null) {
  39. return inventoryProduct;
  40. } else {
  41. Thread.sleep(20);//如果查詢不到就等20毫秒
  42. }
  43. endTime = System.currentTimeMillis();
  44. allTime = endTime - startTime;
  45. } catch (Exception e) {
  46. }
  47. }
  48. /**
  49. * 代碼執行到這來,只有以下三種情況
  50. * 1、緩存中本來有數據,由於redis內存滿了,redis通過LRU算法清除了緩存,導致數據沒有了
  51. * 2、由於之前數據庫查詢比較慢或者內存太小處理不過來隊列中的數據,導致隊列裏擠壓了很多的數據,所以一直沒有從數據庫中獲取數據然後插入到緩存中
  52. * 3、數據庫中根本沒有這樣的數據,這種情況叫數據穿透,一旦別人知道這個商品沒有,如果一直執行查詢,就會一直查詢數據庫,如果過多,那麽有可能會導致數據庫癱瘓
  53. */
  54. inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId);
  55. if (inventoryProduct != null) {
  56. Request forcRrequest = new InventoryQueryCacheRequest(productId,inventoryProductBiz,true);
  57. requestAsyncProcessBiz.process(forcRrequest);//這個時候需要強制刷新數據庫,使緩存中有數據
  58. return inventoryProduct;
  59. }
  60. return null;
  61. }
  62. }

redis系列之數據庫與緩存數據一致性解決方案