1. 程式人生 > 其它 >輸入兩個年月日輸出兩個時間天數差

輸入兩個年月日輸出兩個時間天數差

Redis

redis-benchmark效能測試

Redis入門

概述

redis是什麼:Redis(Remote Dictionary Server)遠端服務字典,開源的使用ANSI C語言編寫,支援網路,基於記憶體,持久化的日誌型,Key-Value資料庫,並提供多種語言的API。免費開源,當下最熱門的NoSql技術之一,也被稱為結構化資料庫

redis能幹嘛(讀的速度11w/1s,寫的速度8w/s)

1.記憶體儲存、持久化,記憶體中是斷電即失的,所以說持久化很重要(rdb、aof)

2.效率高,可以用於告訴快取

3.釋出訂閱系統(簡單訊息佇列)

4.地圖資訊分析

5.計時器、計數器(瀏覽量)

6…

redis特性

開源,支援多種資料型別,支援持久化,提供多種語言API

1.多樣的資料型別

2.持久化

3.叢集

4.事務

學習中需要用到的東西

1.官網:https://redis.io/

2.中文網:http://redis.cn/

3.下載地址:通過官網下載即可,(windows在github上下載),推薦linux伺服器上搭建

預設埠號6379

4.啟動服務redis-server.exe

5.啟動客戶端連線redis-cli.exe

Redis是單執行緒

Redis是基於記憶體操作,CPU不是Redis效能瓶頸,Redis的瓶頸是根據機器的記憶體和網路頻寬

Redis是C語言寫的,官方提供的資料為10000+的QPS,完全不比同樣是使用key-value的Memecache差

Redis為什麼單執行緒還這麼快

誤區1:高效能的伺服器一定是多執行緒的

誤區2:多執行緒(CPU上下文切換)一定比單執行緒效率高

核心:redis是將所有的資料全部放在記憶體中的,所以說使用單執行緒去操作效率是最高的,多執行緒(CPU上下文切換:耗時的操作),對於記憶體系統來說,如果沒有上下文切換效率就是最高的!多次讀寫都是在一個CPU的,在記憶體情況下,這個是最佳方案。

NoSQL的四大分類

KV鍵值對:

新浪:Redis

美團:Redis+Tair

阿里,百度:Redis+memecache

文件型資料庫(bson格式和json一樣):

MongoDB(一般必須要掌握)是一個基於分散式檔案儲存的資料庫,C++編寫,主要用來處理大量文件

MongoDB是一個介於關係型資料庫和非關係型資料庫中中間的產品,MongoDB是非關係型資料庫中功能最豐富,最像關係型資料庫的

ConthDB

列儲存資料庫

HBase

分散式檔案系統

圖關係型資料庫

它不是存圖形,放的是關係,比如:朋友圈社交網路,廣告推薦

Neo4j,InfoGrid

五大資料型別

String List Set Hash Zset

String(字串)

常用命令

FLUSHdb 清空資料庫

keys * 檢視當前所有key的名字

EXISTS key 是否存在key

APPEND key “hello” 往key後面追加hello

STRLEN key 獲取key字串的長度

字串範圍 range(GETRANGE獲取,SETRANGE替換)

setex:設定過期時間

setnx:不存在即建立

clear:清空

127.0.0.1:6379>set key1 v1				# 設定值
OK
127.0.0.1:6379>get key1					# 獲取值
"V1"
127.0.0.1:6379>set key *				# 獲取所有key
1)"key1"
127.0.0.1:6379>EXISTS key1				# 判斷某一key是否存在
(integer)1
127.0.0.1:6379>APPEND key1 "hello"		# 追加字串
(integer)7
127.0.0.1:6379>get key1
"v1hello"
127.0.0.1:6379>STRLEN key1				# 獲取字串的長度
(integer)7
127.0.0.1:6379>APPEND name "Arum"		# 如果當前key不存在就詳單與setkey
(integer)4
127.0.0.1:6379>key *
1)"name"
2)"key1"
127.0.0.1:6379>get key

127.0.0.1:6379>set views 0				# 初始值0
OK
127.0.0.1:6379>get views
"0"
127.0.0.1:6379>incr views				# incr:自增1
(integer)1
127.0.0.1:6379>incr views
(integer)2
127.0.0.1:6379>get views
"2"
127.0.0.1:6379>decr views				# decr:自減1
(integer)1
127.0.0.1:6379>get views
"1"
127.0.0.1:6379>INCRBY views 10			# INCRBY:可以設定步長,設定增長量
"11"
127.0.0.1:6379>DECRBY views 5			# DECRBY:設定減少量
"6"
# 字串範圍 range
# GETRANGE獲取
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379>set key1 "hello,Arum"	# 設定key1的值
OK
127.0.0.1:6379>get key1
"hello,Arum"
127.0.0.1:6379>GETRANGE key1 0 3		# GETRANGE擷取字串,擷取區間為[0,3]的字串
"hell"
127.0.0.1:6379>GETRANGE key1 0 -1		# -1擷取全部的字串和get key一樣的
"hello,Arum"

# SETRANGE替換
127.0.0.1:6379>set key2 Aurora
OK
127.0.0.1:6379>get key2
"Aurora"
127.0.0.1:6379>SETRANGE key2 1 xx		# 替換指定位置開始的字串
(integer)6
127.0.0.1:6379>get key2
"Axxora"

# setex(set with expire) : 設定過期時間
127.0.0.1:6379>setex key3 30 "arum"		# 設定key3的值為arum設定30秒後過期
OK
127.0.0.1:6379>ttl key3					# ttl檢視過期時間
(integer)27
127.0.0.1:6379>get key3
"arum"

# setnx(set if not exist) : 不存在再設定(在分散式鎖中會常使用)
127.0.0.1:6379>setnx mykey "Redis"		# 如果mykey不存在,建立mykey,如果mykey存在則建立失敗
(integer)1
127.0.0.1:6379>keys *
1)"key2"
2)"mykey"
3)"key1"
127.0.0.1:6379>ttl key3
(integer)-2
127.0.0.1:6379>setnx mykey "MongoDB"
(integer)0
127.0.0.1:6379>get mykey
"Redis"
127.0.0.1:6379>mset k1 v1 k2 v2 k3 v3	# 同時設定多個值
OK
127.0.0.1:6379>keys *
1)"k1"
2)"k2"
3)"k3"
127.0.0.1:6379>mget k1 k2 k3			# 同時獲取多個值
1)"v1"
2)"v2"
3)"v3"
127.0.0.1:6379>msetnx k1 v1 k4 v4		# 如果不存在建立多個值(存在原子性,要麼同成功要麼同失敗)
(integer)0
127.0.0.1:6379>get k4
(nil)

# 物件
set user:1{name:arum,age:3}				# 設定一個user:1物件 值為Json字串來儲存一個物件
# 這裡的key是一個巧妙的設計:user:{id}:{filed},如此設計在redis是完全ok的
127.0.0.1:6379>mset user:1:name arum user:1:age 3	
OK
127.0.0.1:6379>mget user:1:name user:1:age
1)"arum"
2)"3"

# getset 先get再set		
127.0.0.1:6379>getset db redis			# 如果不存在值,則返回nil
(nil)
127.0.0.1:6379>get db
"redis"
127.0.0.1:6379>getset db mongodb		# 如果存在值,獲取原來的值並設定新的值
"redis"
127.0.0.1:6379>get db
"mongodb"

資料結構是相通的

String類似的使用場景:value除了是我們普通的字串外 還可以是數字

​ 計數器

​ 統計多單位的數量

List(列表)

基本的資料型別,列表。在redis裡面我們可以把list玩成,棧,佇列,阻塞佇列

所有list命令都是用l開頭

# PUSH 插入 --- LPUSH左邊插入   RPUSH 右邊插入
127.0.0.1:6379>LPUSH list one			# 將一個值或者多個值,插入到佇列頭部(左)
(integer)1
127.0.0.1:6379>LPUSH list two
(integer)2
127.0.0.1:6379>LPUSH list three
(integer)3
127.0.0.1:6379>LRANGE list 0 -1			# 獲取list中的值
1)"three"
2)"two"
3)"one"
127.0.0.1:6379>LRANGE list 0 1
1)"three"
2)"two"
127.0.0.1:6379>RPUSH list five			# 將一個值或者多個值,插入到佇列尾部(右)
(integer)4
127.0.0.1:6379>LRANGE list 0 -1
1)"three"
2)"two"
3)"one"
4)"five"
127.0.0.1:6379>LRANGE list 0 1
1)"three"
2)"two"

# POP 移除 --- LPOP左邊插入   RPOP 右邊插入
127.0.0.1:6379>LRANGE list 0 -1
1)"three"
2)"two"
3)"one"
4)"five"
127.0.0.1:6379>Lpop list				# 移除list的第一個元素
"three"
127.0.0.1:6379>Rpop list				# 移除list的最後一個元素
"five"
127.0.0.1:6379>LRANGE list 0 -1
1)"two"
2)"one"

# Lindex
127.0.0.1:6379>LRANGE list 0 -1
1)"two"
2)"one"
127.0.0.1:6379>Lindex list 1			# 通過下表獲得list中某一個值
"one"
127.0.0.1:6379>Lindex list 0
"two"

# Llen
127.0.0.1:6379>Lpush list one
(integer)1
127.0.0.1:6379>Lpush list two
(integer)2
127.0.0.1:6379>Lpush list three
(integer)3
127.0.0.1:6379>Llen list 				# 返回列表長度
(integer)3

#移除指定的值 Lrem
127.0.0.1:6379>Lpush list three
1)"three"
2)"three"
3)"two"
4)"one"
127.0.0.1:6379>Lrem list 1 one			# 移除list中的one從上到下取一個,精確匹配
(integer)1
127.0.0.1:6379>Lrange list 0 -1
1)"three"
2)"three"
3)"two"
127.0.0.1:6379>Lrem list 1 three
(integer)1
127.0.0.1:6379>Lrange list 0 -1
1)"three"
2)"two"
127.0.0.1:6379>Lpush list three
(integer)3
127.0.0.1:6379>Lrem list 2 three
(integer)2
127.0.0.1:6379>Lrange list 0 -1
1)"two"

# trim 修剪:list截斷
127.0.0.1:6379>flushdb					# 清空
OK
127.0.0.1:6379>keys *					# 檢視
(empty list or set)
127.0.0.1:6379>Rpush mylist "hello1"
(integer)1
127.0.0.1:6379>Rpush mylist "hello2"
(integer)2
127.0.0.1:6379>Rpush mylist "hello3"
(integer)3
127.0.0.1:6379>Rpush mylist "hello4"
(integer)4
127.0.0.1:6379>ltrim mylist 1 2			# 通過下表擷取指定的長度,這個list已經被改變,只剩擷取元素
OK
127.0.0.1:6379>Lrange mylist 0 -1
1)"hello2"
2)"hello3"

# rpoplpush 移除列表的最後一個元素並移動到新的列表中
127.0.0.1:6379>flushdb						# 清空
OK
127.0.0.1:6379>keys *						# 檢視
(empty list or set)
127.0.0.1:6379>Rpush mylist "hello1"
(integer)1
127.0.0.1:6379>Rpush mylist "hello2"
(integer)2
127.0.0.1:6379>Rpush mylist "hello3"
(integer)3
127.0.0.1:6379>rpoplpush mylist myohtherlist#移除mylist中最後一個元素 新增到myohtherlist中
"hello3"
127.0.0.1:6379>lrange mylist 0 -1			# 檢視原來列表
1)"hello1"
2)"hello2"
127.0.0.1:6379>lrange myotherlist 0 -1		# 檢視目標列表中,確實存在改值
1)"hello3"

# lset 將列表中指定下標的值替換為另外一個值(詳單與一個更新操作)
127.0.0.1:6379>flushdb						# 清空
OK
127.0.0.1:6379>keys *						# 檢視
(empty list or set)
127.0.0.1:6379>EXISTS list					# 判斷list列表是否存在
(integer)0
127.0.0.1:6379>lset list 0 item				# 如果不存在列表我們給指定下標賦值
(error)ERR no such key						# 則報錯
127.0.0.1:6379>lpush list value1
(integer)1
127.0.0.1:6379>lrange lsit 0 0
1)"value1"
127.0.0.1:6379>lset list 0 item				# 如果存在,則更新當前下表的值
OK
127.0.0.1:6379>lrange lsit 0 0
1)"item"
127.0.0.1:6379>lset list 1 other			# 如果不存在該下表,則報錯
(error)ERR no such key			

# LINSERT 將某個具體的value插入到列表中某個元素的前面(before)或者後面(after)
127.0.0.1:6379>flushdb						# 清空
OK
127.0.0.1:6379>keys *						# 檢視
(empty list or set)
127.0.0.1:6379>Rpush mylist "hello"
(integer)1
127.0.0.1:6379>Rpush mylist "world"
(integer)2
127.0.0.1:6379>Linsert before "world" "other"		# 將"other"插入到"world"前
(integer)3
127.0.0.1:6379>lrange mylist 0 -1
1)"hello"
2)"other"
3)"world"
127.0.0.1:6379>linsert mylist after "world" "new"	# 將"new"插入到"world"後
(integer)4
127.0.0.1:6379>lrange mylist 0 -1
1)"hello"
2)"other"
3)"world"
4)"new"

小結:1.它實際上是一個連結串列,before Node after ,left,right都可以插入值

​ 2.如果key不存在,建立新的連結串列

​ 3.如果key存在,新增內容

​ 4.如果移除了所有的值,空連結串列,也代表不存在

​ 5.在兩邊插入或改動值,效率最高。如果操作中間元素,相對來說效率會低一點

訊息排隊 訊息佇列(LPUSH RPOP,左邊過去右邊出來) 棧(LPUSH LPOP,左邊過去左邊出來)

Set(集合)

set:無序不重複集合

所有set命令都是用s開頭

# sadd:新增元素
# SMEMBERS:檢視指定set的所有值
# SISMEMBER:判斷某一個值是不是在set中
127.0.0.1:6379>sadd myset "hello"				# sadd,set集合中新增元素
(integer)1
127.0.0.1:6379>sadd myset "arum"
(integer)1
127.0.0.1:6379>sadd myset "loveaurora"
(integer)1
127.0.0.1:6379>SMEMBERS myset					# SMEMBERS,檢視指定set的所有值
1)"hello"
2)"arum"
3)"loveaurora"
127.0.0.1:6379>SISMEMBER myset hello			# SISMEMBER,判斷某一個值是不是在set中
(integer)1										# 1表示存在
127.0.0.1:6379>SMENBERS myset world
(integer)0										# 0表示不存在

#scard:獲取集合中元素個數
#srem:移除set中指定元素
127.0.0.1:6379>scard myset						# 獲取myset集合中元素個數
(integer)3
127.0.0.1:6379>srem myset hello					# srem:移除set中指定元素
(integer)1
127.0.0.1:6379>scard myset	
(integer)2
127.0.0.1:6379>SMEMBERS myset
1)"arum"
2)"loveaurora"

#Srandmember:隨機抽選出一個元素
127.0.0.1:6379>SRANDMEMBER myset				# 隨機抽取1個元素
127.0.0.1:6379>SRANDMEMBER myset 2				# 隨機抽取2個元素

# spop隨機刪除指定集合元素
127.0.0.1:6379> spop myset
"A"
127.0.0.1:6379> smembers myset
1) "C"
2) "D"
3) "B"

# smove:將一個指定的值移動到另外一個set集合中
127.0.0.1:6379> sadd myset A
(integer) 1
127.0.0.1:6379> sadd myset b
(integer) 1
127.0.0.1:6379> sadd myset c
(integer) 1
127.0.0.1:6379> sadd myset2 A2
(integer) 1
127.0.0.1:6379> sadd myset2 B2
(integer) 1
127.0.0.1:6379> smove myset myset2 A			 # smove:將myset中的A移動到myset2中
(integer) 1
127.0.0.1:6379> smembers myset
1) "c"
2) "b"
127.0.0.1:6379> smembers myset2
1) "B2"
2) "A"
3) "A2"

微博,B站,抖音,QQ等(共同關注,共同好友)

# 差集:sdiff 得到右邊集合沒有的元素
127.0.0.1:6379> sadd key1 a
(integer) 1
127.0.0.1:6379> sadd key1 b
(integer) 1
127.0.0.1:6379> sadd key1 c
(integer) 1
127.0.0.1:6379> sadd key2 b
(integer) 1
127.0.0.1:6379> sadd key2 c
(integer) 1
127.0.0.1:6379> sadd key2 d
(integer) 1
127.0.0.1:6379> SDIFF key1 key2
1) "a"
127.0.0.1:6379> SDIFF key2 key1
1) "d"

# 交集:sinter
127.0.0.1:6379> sinter key1 key2
1) "c"
2) "b"

# 並集:SUNION
127.0.0.1:6379> sunion key1 key2
1) "c"
2) "d"
3) "a"
4) "b"

例子:微博,A使用者將所有關注的人放在一個set集合中!將粉絲也放在一個集合中

共同關注,共同好友,二度好友(六度分隔理論,你和任何一個陌生人之間所間隔的人不會超五個,也就是說,最多通過六個人你就能夠認識任何一個陌生人)

Hash(雜湊)

Map集合,key-map,存的值也是鍵值對,只是這時候的value也是個map集合!本質和String型別沒太大區別,還是一個簡單的集合

同理,H開頭的命令都是hash

127.0.0.1:6379> hset myhash arum1 a					# set一個具體 key-value
(integer) 1
127.0.0.1:6379> hget myhash arum1					# get一個欄位值
"a"
127.0.0.1:6379> hmset myhash arum1 b arum2 c		# set多個key-value
OK
127.0.0.1:6379> hmget myhash arum1 arum2			# get多個key-value
1) "b"
2) "c"
127.0.0.1:6379> hgetall myhash						# 獲取全部的資料
1) "arum1"
2) "b"
3) "arum2"
4) "c"
127.0.0.1:6379> hdel myhash arum1			# 刪除hash對應的key欄位,對應的value值也消失
(integer) 1
127.0.0.1:6379> hgetall myhash
1) "arum2"
2) "c"

# hlen:獲取hash長度
127.0.0.1:6379> hset myhash vkey1 vvalue1
(integer) 1
127.0.0.1:6379> hset myhash vkey2 vvalue2
(integer) 1
127.0.0.1:6379> hset myhash vkey3 vvalue3
(integer) 1
127.0.0.1:6379> hset myhash vkey4 vvalue4
(integer) 1
127.0.0.1:6379> hgetall myhash
1) "vkey1"
2) "vvalue1"
3) "vkey2"
4) "vvalue2"
5) "vkey3"
6) "vvalue3"
7) "vkey4"
8) "vvalue4"
127.0.0.1:6379> hlen myhash						# 獲取hash表的欄位數量
(integer) 4

# hexists:判斷指定hash是否存在
127.0.0.1:6379> hexists myhash vkey1
(integer) 1
127.0.0.1:6379> hexists myhash vkey5			# hexists:判斷指定hash是否存在
(integer) 0

# 只獲得所有hash中的key:hkeys myhash
# 只獲得所有hash中的value:hvals myhash
127.0.0.1:6379> hkeys myhash
1) "vkey1"
2) "vkey2"
3) "vkey3"
4) "vkey4"
127.0.0.1:6379> hvals myhash
1) "vvalue1"
2) "vvalue2"
3) "vvalue3"
4) "vvalue4"

# hincrby:增量
# hsetnx:如果不存在則可設定,如果存在則不可設定
127.0.0.1:6379> hset myhash vkey5 5
(integer) 1
127.0.0.1:6379> hincrby myhash vkey5 1			# hincrby:增量	
(integer) 6
127.0.0.1:6379> hincrby myhash vkey5 -1
(integer) 5
127.0.0.1:6379> hsetnx myhash vkey6 vvalue6		# hsetnx:如果不存在則可設定
(integer) 1
127.0.0.1:6379> hsetnx myhash vkey6 vvalue7		# hsetnx:如果存在則不可設定
(integer) 0

hash存變更的資料user age name,尤其是使用者資訊之類的,經常變更的資訊!hash更適合物件的儲存,String更加適合字串儲存

127.0.0.1:6379> hset user:1 name arum
(integer) 1
127.0.0.1:6379> hget user:1 name
"arum"

Zset(有序集合)

在set的基礎上增加了一個值,set k1 v1 zset k1 score1 v1

# 新增一個/多個值
127.0.0.1:6379> zadd myset 1 one
(integer) 1
127.0.0.1:6379> zadd myset 2 two 3 three
(integer) 2
127.0.0.1:6379> zrange myset 0 -1
1) "one"
2) "two"
3) "three"

# 排序如何實現,按照salary從小到大排序:zrangebyscore salary min max(範圍[min,max])
127.0.0.1:6379> zadd salary 2500 A
(integer) 1
127.0.0.1:6379> zadd salary 5000 B
(integer) 1
127.0.0.1:6379> zadd salary 1000 C
(integer) 1
127.0.0.1:6379> zrevrange salary 0 -1				# 從大到小排序
1) "B"
2) "A"
127.0.0.1:6379> zrangebyscore salary -inf +inf		# -inf負無窮,+inf正無窮
1) "C"
2) "A"
3) "B"
127.0.0.1:6379> zrangebyscore salary -inf +inf withscores	# 獲取排行以及具體資訊
1) "C"
2) "1000"
3) "A"
4) "2500"
5) "B"
6) "5000"

# zrem:移除有序集合中指定元素
127.0.0.1:6379> zrange salary 0 -1
1) "C"
2) "A"
3) "B"
127.0.0.1:6379> zrem salary C
(integer) 1
127.0.0.1:6379> zrange salary 0 -1
1) "A"
2) "B"

# zcard:獲取有序集合中的個數
127.0.0.1:6379> zcard salary
(integer) 2

# 獲取指定區間成員數量:zcount
127.0.0.1:6379> zadd myset 1 hello
(integer) 1
127.0.0.1:6379> zadd myset 2 world 3 arum
(integer) 2
127.0.0.1:6379> zcount myset 1 3
(integer) 3
127.0.0.1:6379> zcount myset 1 2
(integer) 2

小結:加了個標誌位,沒啥特殊

案例思路:set排序 儲存班級成績表,工資表排序

普通訊息:1 ;重要訊息:2; (加權重進行判斷)

排行榜應用實現,

三種特殊資料型別

Geospatial地理位置

# geotadd:新增地理位置
# 規測:兩級無法新增,我們一般會下載城市資料,直接通過java程式一次性匯入
	# (error) ERR invalid longitude,latitude pair 39.900000,166.400000
# 引數 key 值(緯度、經度、名稱)
127.0.0.1:6379> geoadd china:city 166.40 39.90 beijing
(integer) 1
127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
(integer) 1
127.0.0.1:6379> geoadd china:city 106.50 29.53 chonqing 114.05 22.52 shengzheng
(integer) 2
127.0.0.1:6379> geoadd china:city 120.16 30.24 hangzhou 108.96 34.26 xian
(integer) 2

# geopos:獲取指定城市的經度和緯度(獲得當前定位)
127.0.0.1:6379> geopos china:city beijing
1) 1) "166.40000134706497192"
   2) "39.90000009167092543"
127.0.0.1:6379> geopos china:city xian
1) 1) "108.96000176668167114"
   2) "34.25999964418929977"
127.0.0.1:6379> geopos china:city chonqing shengzhen
1) 1) "106.49999767541885376"
   2) "29.52999957900659211"
2) 1) "114.04999762773513794"
   2) "22.5200000879503861"

位置共享,兩人位置之間的距離

# geodist
# m 米
# km 千米
# mi 英里
# ft 英尺
127.0.0.1:6379> geodist china:city beijing shanghai
"4132638.7228"
127.0.0.1:6379> geodist china:city beijing shanghai km
"4132.6387"

我附近的人(獲得所有附件的人的地址(手機定位))通過半徑來查詢

# 以(110,30)為中心獲取半徑為1000km內的座標
127.0.0.1:6379> georadius china:city 110 30 1000 km
1) "chonqing"
2) "xian"
3) "shengzhen"
4) "hangzhou"
# 以(110,30)為中心獲取半徑為500km內的座標
127.0.0.1:6379> georadius china:city 110 30 500 km
1) "chonqing"
2) "xian"
# 以(110,30)為中心獲取半徑為500km內的座標的經度
127.0.0.1:6379> georadius china:city 110 30 500 km withdist
1) 1) "chonqing"
   2) "341.9374"
2) 1) "xian"
   2) "483.8340"
# 以(110,30)為中心獲取半徑為500km內的座標的經度,緯度
127.0.0.1:6379> georadius china:city 110 30 500 km withdist withcoord
1) 1) "chonqing"
   2) "341.9374"
   3) 1) "106.49999767541885376"
      2) "29.52999957900659211"
2) 1) "xian"
   2) "483.8340"
   3) 1) "108.96000176668167114"
      2) "34.25999964418929977"
# 以(110,30)為中心獲取半徑為500km內的一個座標的經度,緯度
127.0.0.1:6379> georadius china:city 110 30 500 km withdist withcoord count 1
1) 1) "chonqing"
   2) "341.9374"
   3) 1) "106.49999767541885376"
      2) "29.52999957900659211"
# 以(110,30)為中心獲取半徑為500km內的兩個座標的經度,緯度
127.0.0.1:6379> georadius china:city 110 30 500 km withdist withcoord count 2
1) 1) "chonqing"
   2) "341.9374"
   3) 1) "106.49999767541885376"
      2) "29.52999957900659211"
2) 1) "xian"
   2) "483.8340"
   3) 1) "108.96000176668167114"
      2) "34.25999964418929977"

GEORADIUSBYMEMBER

# georadiusbymember:找出位於指定元素周圍的其它元素
127.0.0.1:6379> georadiusbymember china:city shanghai 2000 km
1) "chonqing"
2) "xian"
3) "shengzhen"
4) "hangzhou"
5) "shanghai"
127.0.0.1:6379> georadiusbymember china:city shanghai 500 km
1) "hangzhou"
2) "shanghai"

GEOHASH命令 返回一個或多個元素的Geohash表示,該命令將返回11個字元的Geohash字串

# 將二維的經緯度轉換為一維的字串,如果兩個字串越接近,那麼則距離越近(很少使用)
127.0.0.1:6379> geohash china:city shanghai chonqing
1) "wtw3sj5zbj0"
2) "wm5xzrybty0"

GEO 底層實現原理其實就是ZSET!我們可以使用ZSET命令操作GEO

127.0.0.1:6379> ZRANGE china:city 0 -1			# 檢視地圖中全部元素
1) "chonqing"
2) "xian"
3) "shengzhen"
4) "hangzhou"
5) "shanghai"
6) "beijing"
127.0.0.1:6379> ZREM china:city beijing			# 移除指定元素
(integer) 1
127.0.0.1:6379> ZRANGE china:city 0 -1
1) "chonqing"
2) "xian"
3) "shengzhen"
4) "hangzhou"
5) "shanghai"

Hyperloglog基數統計

什麼是基數,不重複的元素,可以接受誤差

簡介

Redis2.8.9版本更新了Hyperloglog資料結構

Redis Hyperloglog 基數統計的演算法

優點:佔用記憶體是固定的,2^64不同元素的基數,只需要廢12kb記憶體!如果要從記憶體角度來比較的話,Hyperloglog首選!注:0.81%錯誤率

網頁的UV(一個人訪問一個網站多次,但是還是算一個人)

傳統的方式,set儲存使用者的id,然後就可以統計set中元素數量作為標準判斷;

這個方式如果儲存大量使用者id就會比較麻煩。我們的目的是為了計數而不是儲存使用者id;

# pfadd:插入基數
# pfcount:統計基數數量
# pfmerge:合併兩組基數(並集)
127.0.0.1:6379> PFadd mykey a b c e f g h i j					# 建立第一組元素mykey
(integer) 1
127.0.0.1:6379> pfcount mykey									# 統計基數數量
(integer) 9
127.0.0.1:6379> pfadd mykey2 i j k l m n o p q					# 建立第二組元素mykey2
(integer) 1
127.0.0.1:6379> pfcount mykey2									# 統計基數數量
(integer) 9	
127.0.0.1:6379> pfmerge mykey3 mykey mykey2						# 合併mykey,mykey2
OK
127.0.0.1:6379> pfcount mykey3									# 統計基數數量(並集)
(integer) 16

如果允許容錯,那麼一可以使用Hyperloglog

如果不允許容錯,那麼使用set或者自己的資料型別即可

Bitmap位儲存

統計疫情感染人數,統計使用者資訊(活躍,不活躍,登入,未登入,365天打卡)兩個狀態的都可以使用Bitmap

Bitmap點陣圖,資料結構!都是操作二進位制位來進行記錄,就只有0和1兩個狀態

365天=365bit 1位元組=8bit 46個位元組左右

# 使用bitmap來記錄 週一到週日的打卡:setbit
# 週一:1 週二:0 週三:1 週四:0 週五:0 週六:1 週日:1
127.0.0.1:6379> setbit sign 0 1
(integer) 0
127.0.0.1:6379> setbit sign 1 0
(integer) 0
127.0.0.1:6379> setbit sign 2 1
(integer) 0
127.0.0.1:6379> setbit sign 3 0
(integer) 0
127.0.0.1:6379> setbit sign 4 0
(integer) 0
127.0.0.1:6379> setbit sign 5 1
(integer) 0
127.0.0.1:6379> setbit sign 6 1
(integer) 0

# 檢視某一天是否有打卡:getbit
127.0.0.1:6379> getbit sign 3
(integer) 0
127.0.0.1:6379> getbit sign 6
(integer) 1

# 統計這周打卡記錄,就可以看到是否全勤
# 統計操作,統計打卡的天數:bitcount
127.0.0.1:6379> bitcount sign
(integer) 4

事務

事務的本質:一組命令的集合,一個事務中的所有命令都會被序列化,在事務的執行過程中會按照順序執行

一次性、順序性、排他性!執行一系列命令

-----佇列 set set set 執行 -----

MYSQL:ACID

原子性:一個事務要麼同時成功要麼同時失敗

Redis事務沒有隔離級別的概念

所有的命令在事務中並沒有直接被執行!只有發起執行命令的時候才會執行!Exec

Redis單條命令是保證原子性的,但是事務不保證原子性

Redis的事務:

  • 開啟事務(multi)
  • 命令入隊(…)
  • 執行事務(exec)
  • 放棄事務(discard)
# 正常執行事務(每個事務執行完就沒了,需要重新開啟)
127.0.0.1:6379> multi				# 開啟事務
OK
127.0.0.1:6379> set k1 v1			# 命令入隊
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> exec				# 執行事務
1) OK
2) OK
3) "v2"
4) OK
# discard:放棄事務,整個事務清空
127.0.0.1:6379> multi				# 開啟事務
OK
127.0.0.1:6379> set k1 v1			# 命令入隊
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> discard				# 取消事務
OK
127.0.0.1:6379> get k4
(nil)

編譯型異常(程式碼有問題!命令有錯!),事務中所有命令都不會執行!

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> getset k3				# 錯誤的命令
(error) ERR wrong number of arguments for 'getset' command
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> set k5 v5
QUEUED
127.0.0.1:6379> exec					# 執行事務報錯
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k5					# 所有的命令都不會執行
(nil)

執行時異常(1/0),如果事務佇列中存在語法性錯誤,那麼執行命令的時候其它命令是可以正常執行的,錯誤命令丟擲異常

127.0.0.1:6379> set k1 v1
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> incr k1						# 字串自增(語法錯誤)
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> get k3
QUEUED
127.0.0.1:6379> exec
1) (error) ERR value is not an integer or out of range		# 報錯,依舊執行成功
2) OK
3) OK
4) "v3"
127.0.0.1:6379> get k2
"v2"
127.0.0.1:6379> get k3
"v3"

小結:命令入隊中報錯則事務執行失敗,語法錯誤則單條命令錯誤,事務依舊執行

監控 Watch

悲觀鎖:認為什麼時候都會出現問題,無論如何都會加鎖

樂觀鎖

  • 認為什麼時候都不會出現問題,所以不會上鎖!更新資料的時候去判斷一下,在此期間是否有人修改過這個資料
  • 獲取version
  • 更新時比較version

Redis測監視測試

# 正常執行成功
127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> get money	
"100"
127.0.0.1:6379> watch money					# 監視money物件
OK
127.0.0.1:6379> multi				# 事務正常結束,資料期間沒有發生變動,這個時候就正常執行成功
OK
127.0.0.1:6379> decrby money 20
QUEUED
127.0.0.1:6379> incrby out 20
QUEUED
127.0.0.1:6379> exec
1) (integer) 80
2) (integer) 20

# 模擬多個執行緒,修改值,使用watch可以當作redis的樂觀鎖操作
# 執行緒1
127.0.0.1:6379> watch money				# 1.監視money
OK
127.0.0.1:6379> multi					# 2.開啟事務
OK
127.0.0.1:6379> decrby money 10			# 3.命令入隊
QUEUED
127.0.0.1:6379> incrby out 10
QUEUED
127.0.0.1:6379> exec					# 5.執行事務
(nil)									# 6.由於執行之前值被第二個執行緒修改,故事務執行失敗

# 執行緒2
127.0.0.1:6379> get money				# 4.第二個執行緒修改值
"80"
127.0.0.1:6379> set money 1000
OK
127.0.0.1:6379> get money
"1000"

(事務執行的時候,watch判斷值是否是原來值,是則繼續不是則失敗)
127.0.0.1:6379> unwatch					# 如果發現事務執行失敗,就先解鎖
OK
127.0.0.1:6379> watch money				# 獲取最新的值,再次監視
OK
127.0.0.1:6379> multi					# 開啟事務
OK
127.0.0.1:6379> decrby money 10			# 命令入隊
QUEUED
127.0.0.1:6379> incrby money 10
QUEUED
127.0.0.1:6379> exec					# 比對監視的值是否發生了變化,如果沒有變化 那麼可以執行成功
1) (integer) 990
2) (integer) 1000
# 如果修改失敗 獲取最新的值就好

redis實現樂觀鎖(使用watch監控)

Jedis

我們要使用Java來操作Redis

什麼是Jedis 是Redis官方推薦的Java連線開發工具 使用Java才做Redis中介軟體 如果你要使用java操作Redis,那麼一定要對Jedis十分的熟悉

知其然並知其所以然,慢慢來會好的

1.匯入依賴

  <dependencies>
        <!-- 匯入jedis的包 -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.2.0</version>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>

2.編碼測試:

  • 連線資料庫:Jedis jedis = new Jedis(“127.0.0.1”,6379);
import redis.clients.jedis.Jedis;

public class TestPing {
    public static void main(String[] args) {
        // 1、new Jedis物件即可
        Jedis jedis = new Jedis("127.0.0.1",6379);
        // Jedis 所有的命令就是我們之前學的所有的指令
        System.out.println(jedis.ping());

    }
}
// 連線成功 輸出PONG
  • 操作命令
  • 斷開連線:jedis.close();

事務

public class TestTX {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("127.0.0.1", 6379);

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("hello","world");
        jsonObject.put("name","alum");
        // 開啟事務
        Transaction multi = jedis.multi();
        // 轉格式
        String result = jsonObject.toJSONString();
		// jedis.watch(result); 加樂觀鎖
        try {
            // 命令入隊
            multi.set("user1",result);
            multi.set("user2",result);
            // 如果成功,執行事務
            multi.exec();
        } catch (Exception e) {
            // 如果失敗,放棄事務
            multi.discard();
            e.printStackTrace();
        } finally {
            System.out.println(jedis.get("user1"));
            System.out.println(jedis.get("user2"));
            // 關閉連線
            jedis.close();
        }
    }
}

SpringBoot整合Redis

SpringBoot操作資料:spring-data jpa jdbc mongodb redis

SpringData也是和SpringBoot齊名的專案

說明:在SpringBoot2.x之後,原來使用的jedis被替換為lettuce?

Jedis:採用的直連,多個執行緒操作的話是不安全的,如果想要避免不安全,使用Jedis pool連線池,更像Bio模式

Lettuce:採用netty,例項可以在多個執行緒中共享,不存線上程不安全的情況,可以減少執行緒數量,更像Nio模式

1.匯入依賴

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.配置連線

# 配置redis
spring.redis.host=127.0.0.1
spring.redis.port=6379

3.測試

   @Autowired
    private RedisTemplate redisTemplate;
    @Test
    void contextLoads() {

        /**
         * 基本操作
         */
       /* redisTemplate.opsForValue()操作String,類似於String
        redisTemplate.opsForList() 操作List,類似於List
        redisTemplate.opsForCluster()
        redisTemplate.opsForGeo()
        redisTemplate.opsForHash()
        redisTemplate.opsForHyperLogLog()
        redisTemplate.opsForSet()
        redisTemplate.opsForZSet()*/

        /**
         * 除了基本的操作,我們常用的方法都可以直接通過redisTemplate操作,比如事務,和基本的CRUD
         */
        // redisTemplate.exec();

        /**
         * 獲取redis連線物件
         */
        /*RedisConnection connection = 			 redisTemplate.getConnectionFactory().getConnection();
        connection.flushDb();
        connection.flushAll();*/

        redisTemplate.opsForValue().set("mykey","arum");
        redisTemplate.opsForValue().set("mykey2","你看起來很好吃");
        Object key = redisTemplate.opsForValue().get("mykey2");
        System.out.println(key);
    }
 @Test
    void test() throws JsonProcessingException {
        // 真實開發一般都使用json來傳遞物件
        User alum = new User("Alum", 3);
        // 如果直接傳物件,需要物件序列化
        String jsonUser = new ObjectMapper().writeValueAsString(alum);
        redisTemplate.opsForValue().set("alum",jsonUser);
        System.out.println(redisTemplate.opsForValue().get("alum"));
    }

4.自定義RedisTemplate

@Configuration
public class RedisConfig {
    // 編寫我們自己的redisTemplate,自定義RedisTemplate
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
        
        // 為了自己開發方便,一般直接使用<String,Object>型別
        RedisTemplate<String,Object> template = new RedisTemplate<String,Object>();
        template.setConnectionFactory(factory);
        
        // Json序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        // String的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        // key採用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的key也採用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式採用jackon
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value序列化方式採用jackon
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();

        return template;

    }
}

5.RedisUtils

企業開發中,80%的情況下,都不會使用原生的方法去編寫程式碼(通過RedisUtils)

package com.alum.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public final class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // =============================common============================
    /**
     * 指定快取失效時間
     * @param key  鍵
     * @param time 時間(秒)
     */
    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根據key 獲取過期時間
     * @param key 鍵 不能為null
     * @return 時間(秒) 返回0代表為永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }


    /**
     * 判斷key是否存在
     * @param key 鍵
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 刪除快取
     * @param key 可以傳一個值 或多個
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
            }
        }
    }


    // ============================String=============================

    /**
     * 普通快取獲取
     * @param key 鍵
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通快取放入
     * @param key   鍵
     * @param value 值
     * @return true成功 false失敗
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 普通快取放入並設定時間
     * @param key   鍵
     * @param value 值
     * @param time  時間(秒) time要大於0 如果time小於等於0 將設定無限期
     * @return true成功 false 失敗
     */

    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 遞增
     * @param key   鍵
     * @param delta 要增加幾(大於0)
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("遞增因子必須大於0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }


    /**
     * 遞減
     * @param key   鍵
     * @param delta 要減少幾(小於0)
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("遞減因子必須大於0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }


    // ================================Map=================================

    /**
     * HashGet
     * @param key  鍵 不能為null
     * @param item 項 不能為null
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 獲取hashKey對應的所有鍵值
     * @param key 鍵
     * @return 對應的多個鍵值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     * @param key 鍵
     * @param map 對應多個鍵值
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * HashSet 並設定時間
     * @param key  鍵
     * @param map  對應多個鍵值
     * @param time 時間(秒)
     * @return true成功 false失敗
     */
    public boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 向一張hash表中放入資料,如果不存在將建立
     *
     * @param key   鍵
     * @param item  項
     * @param value 值
     * @return true 成功 false失敗
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一張hash表中放入資料,如果不存在將建立
     *
     * @param key   鍵
     * @param item  項
     * @param value 值
     * @param time  時間(秒) 注意:如果已存在的hash表有時間,這裡將會替換原有的時間
     * @return true 成功 false失敗
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 刪除hash表中的值
     *
     * @param key  鍵 不能為null
     * @param item 項 可以使多個 不能為null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }


    /**
     * 判斷hash表中是否有該項的值
     *
     * @param key  鍵 不能為null
     * @param item 項 不能為null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }


    /**
     * hash遞增 如果不存在,就會建立一個 並把新增後的值返回
     *
     * @param key  鍵
     * @param item 項
     * @param by   要增加幾(大於0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }


    /**
     * hash遞減
     *
     * @param key  鍵
     * @param item 項
     * @param by   要減少記(小於0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }


    // ============================set=============================

    /**
     * 根據key獲取Set中的所有值
     * @param key 鍵
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 根據value從一個set中查詢,是否存在
     *
     * @param key   鍵
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 將資料放入set快取
     *
     * @param key    鍵
     * @param values 值 可以是多個
     * @return 成功個數
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 將set資料放入快取
     *
     * @param key    鍵
     * @param time   時間(秒)
     * @param values 值 可以是多個
     * @return 成功個數
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 獲取set快取的長度
     *
     * @param key 鍵
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 移除值為value的
     *
     * @param key    鍵
     * @param values 值 可以是多個
     * @return 移除的個數
     */

    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    // ===============================list=================================

    /**
     * 獲取list快取的內容
     *
     * @param key   鍵
     * @param start 開始
     * @param end   結束 0 到 -1代表所有值
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 獲取list快取的長度
     *
     * @param key 鍵
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 通過索引 獲取list中的值
     *
     * @param key   鍵
     * @param index 索引 index>=0時, 0 表頭,1 第二個元素,依次類推;index<0時,-1,表尾,-2倒數第二個元素,依次類推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 將list放入快取
     *
     * @param key   鍵
     * @param value 值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 將list放入快取
     * @param key   鍵
     * @param value 值
     * @param time  時間(秒)
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 將list放入快取
     *
     * @param key   鍵
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 將list放入快取
     *
     * @param key   鍵
     * @param value 值
     * @param time  時間(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 根據索引修改list中的某條資料
     *
     * @param key   鍵
     * @param index 索引
     * @param value 值
     * @return
     */

    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 移除N個值為value
     *
     * @param key   鍵
     * @param count 移除多少個
     * @param value 值
     * @return 移除的個數
     */

    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }

    }

}

Redis.conf詳解

啟動的時候就通過配置檔案來啟動

配置檔案unit單位對大小寫不敏感

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-y4hkXJp6-1608520475601)(C:\Users\deity\AppData\Roaming\Typora\typora-user-images\image-20201218154911758.png)]

包含,就好比我們學的Spring、Import、include,可以將其它配置檔案包含進來

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-O8EiF6vs-1608520475603)(C:\Users\deity\AppData\Roaming\Typora\typora-user-images\image-20201218155133475.png)]

網路

繫結的id(通過什麼id訪問)

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-l0okbGb6-1608520475605)(C:\Users\deity\AppData\Roaming\Typora\typora-user-images\image-20201218155310022.png)]

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-wYhLx3P7-1608520475608)(C:\Users\deity\AppData\Roaming\Typora\typora-user-images\image-20201218155446561.png)]

bind 127.0.0.1				# 繫結的ip
protected-mode yes			# 保護模式
port 6379					# 埠設定,(叢集需要多個埠)

General通用

daemonize yes						# 以守護程序的方式執行,預設是no,我們需要自己開啟為yes
supervised no						# 管理守護程序的,預設是no,一般不要動
pidfile /var/run/redis_6379.pid		# 如果以後臺的方式執行,我們就需要指定一個pid檔案	


# 日誌
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)				
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)		生產環境適用
# warning (only very important / critical messages are logged) 	
loglevel notice

# Specify the log file name. Also the empty string can be used to force
# Redis to log on the standard output. Note that if you use standard
# output for logging but daemonize, logs will be sent to /dev/null
logfile ""				# 日誌的檔案位置名

databases 16				# 預設資料庫數量,預設是16個數據庫

# By default Redis shows an ASCII art logo only when started to log to the
# standard output and if the standard output is a TTY. Basically this means
# that normally a logo is displayed only in interactive sessions.
#
# However it is possible to force the pre-4.0 behavior and always show a
# ASCII art logo in startup logs by setting the following option to yes.
always-show-logo yes		# 是否總是顯示logo,預設開啟logo

快照

持久化,在規定的時間內,執行了多少次操作,則會持久化到檔案.rdb.aof

redis是記憶體資料,如果沒有持久化,資料斷電即失

save 900 1					# 如果900秒內如果至少有一個key進行了修改,我們就進行持久化操作
save 300 10					# 如果300秒內如果至少有十個key進行了修改,我們就進行持久化操作
save 60 10000				# 如果60秒內如果至少有萬個key進行了修改,我們就進行持久化操作
# 我們之後持久化,會自己定義這個測試

stop-writes-on-bgsave-error yes		# 持久化如果出錯了,是否還需要繼續工作
rdbcompression yes					# 是否壓縮rdb檔案(rdb:持久化檔案),需要消耗一些cpu資源

rdbchecksum yes						# 儲存rdb檔案的時候,進行錯誤校驗

# The filename where to dump the DB
dbfilename dump.rdb
dir ./								# rdb檔案儲存目錄
					

REPLICATION複製,主從複製範疇

SECURITY安全

# requirepass foobared			# 設定密碼 預設為空

127.0.0.1:6379> config get requirepass		# 獲取redis密碼
1) "requirepass"
2) ""

# 登入
127.0.0.1:6379> auth 123456				# 使用密碼進行登入
OK

CLIIENTS限制

maxclients 10000		# 設定能連線上redis的最大客戶端的數量
maxmemory <bytes>		# redis配置最大的記憶體容量
maxmemory-policy noeviction 	# 記憶體達到上限之後的處理策略
	
maxmemory-policy 六種方式
1、volatile-lru:只對設定了過期時間的key進行LRU(預設值) 
2、allkeys-lru : 刪除lru演算法的key   
3、volatile-random:隨機刪除即將過期key   
4、allkeys-random:隨機刪除   
5、volatile-ttl : 刪除即將過期的   
6、noeviction : 永不過期,返回錯誤

APPEND ONLY模式

aof配置(相當於吧你的記錄都記錄了一遍)
appendonly no 						# 預設是不開啟aof模式的,預設是使用rdb方式持久化的,在大部分的情況下,rdb完全夠用
appendfilename "appendonly.aof"		# 持久化檔名字 

# appendfsync always				# 每次修改都會執行sync,消耗效能
appendfsync everysec				# 每秒執行一次sync,可能會丟失這1s的資料
# appendfsync no					# 不執行sync(不執行同步),這個時候作業系統自己同步資料,速度最快

Redis持久化

RDB(Redis DataBase)

在主從複製中rdb是備用的,在從機中幾乎不佔記憶體

Redis是記憶體資料庫,如果不將記憶體中的資料庫狀態儲存到磁碟,那麼一旦伺服器程序退出,伺服器中的資料庫狀態也會小時,所以Redis提供了持久化功能

在指定的時間間隔內將記憶體中的資料集快照寫入磁碟,也就是行話講的Snapshot快照,它恢復時是將快照檔案直接讀到記憶體裡。Redis會單獨建立(fork)一個子程序來進行持久化,會先將資料寫入到一個臨時檔案中,待持久化過程都結束了,再用這個臨時檔案替換上次持久化好的檔案。整個過程中,主程序是不進行任何IO操作的。這就確保了極高的效能。如果需要進行大規模資料的恢復,且對於資料恢復的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺點是最後一次持久化後的資料可能丟失。我們預設的就是RDB,一般情況下不需要修改這個配置!

有時候在生產環境我們會將這個檔案進行備份!rdb儲存的檔案是dump.rdb 都是在我們的配置檔案中快照中進行配置的!

觸發機制

  • save的規則滿足的情況下,會自動觸發rdb規則
  • 執行 flflushall 命令,也會觸發我們的rdb規則!
  • 退出redis,也會產生 rdb 檔案!

備份就自動生成一個 dump.rdb

恢復RDB檔案

1.只需要將rdb檔案放在我們redis啟動目錄下就可以,redis啟動的時候會自動檢查dump.rdb恢復其中的資料

2.檢視需要存在的位置

127.0.0.1:6379>config get dir
1)"dir"
2)"/usr/local/bin"					# 如果在這個目錄下存在dump.rdb檔案,啟動就會自動恢復其中的資料

優點:

  1. 適合大規模資料恢復
  2. 對資料完整性不高,可使用

缺點:

  1. 需要一定的時間間隔進行操作,如果redis意外宕機,這個最後一次修改的資料就沒了
  2. fork程序的時候,會佔用一定的記憶體空間

AOF(Append Only File)

將我們的所有命令都記錄下來,history,恢復的時候就把這個檔案全部再執行一遍

以日誌的形式來記錄每個寫操作,將Redis執行過的所有指令記錄下來(讀操作不記錄),只許追加檔案但不可以改寫檔案,redis啟動之初會讀取該檔案重新構建資料,換言之,redis重啟的話就根據日誌檔案的內容將寫指令從前到後執行一次以完成資料的恢復工作

Aof儲存的是 appendonly.aof 檔案

appendonly yes			# 預設是不開啟的,改為yes即開啟aof

重啟redis即生效

如果aof檔案有錯誤,redis是啟動不起來的,我們需要修復aof檔案,redis提供了一個工具redis-check-aof --fix appendonly.aof

如果檔案正常,重啟就可以直接恢復了!

重寫規則說明:aof 預設就是檔案的無限追加,檔案會越來越大

如果 aof 檔案大於 64m,太大了! fork一個新的程序來將我們的檔案進行重寫!

auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

優點:

1、每一次修改都同步,檔案的完整會更加好!

2、每秒同步一次,可能會丟失一秒的資料

3、從不同步,效率最高的!

缺點:

1、相對於資料檔案來說,aof遠遠大於 rdb,修復的速度也比 rdb慢!

2、Aof 執行效率也要比 rdb 慢,所以我們redis預設的配置就是rdb持久化

擴充套件

1、RDB 持久化方式能夠在指定的時間間隔內對你的資料進行快照儲存

2、AOF 持久化方式記錄每次對伺服器寫的操作,當伺服器重啟的時候會重新執行這些命令來恢復原始的資料,AOF命令以Redis 協議追加儲存每次寫的操作到檔案末尾,Redis還能對AOF檔案進行後臺重寫,使得AOF檔案的體積不至於過大。

3、只做快取,如果你只希望你的資料在伺服器執行的時候存在,你也可以不使用任何持久化

4、同時開啟兩種持久化方式

  • 在這種情況下,當redis重啟的時候會優先載入AOF檔案來恢復原始的資料,因為在通常情況下AOF
  • 檔案儲存的資料集要比RDB檔案儲存的資料集要完整。
  • RDB 的資料不實時,同時使用兩者時伺服器重啟也只會找AOF檔案,那要不要只使用AOF呢?作者建議不要,因為RDB更適合用於備份資料庫(AOF在不斷變化不好備份),快速重啟,而且不會有AOF可能潛在的Bug,留著作為一個萬一的手段。

5、效能建議

  • 因為RDB檔案只用作後備用途,建議只在Slave上持久化RDB檔案,而且只要15分鐘備份一次就夠了,只保留 save 900 1 這條規則。
  • 如果Enable AOF ,好處是在最惡劣情況下也只會丟失不超過兩秒資料,啟動指令碼較簡單隻load自己的AOF檔案就可以了,代價一是帶來了持續的IO,二是AOF rewrite 的最後將 rewrite 過程中產生的新資料寫到新檔案造成的阻塞幾乎是不可避免的。只要硬碟許可,應該儘量減少AOF rewrite的頻率,AOF重寫的基礎大小預設值64M太小了,可以設到5G以上,預設超過原大小100%大小重寫可以改到適當的數值。
  • 如果不Enable AOF ,僅靠 Master-Slave Repllcation 實現高可用性也可以,能省掉一大筆IO,也減少了rewrite時帶來的系統波動。代價是如果Master/Slave 同時倒掉,會丟失十幾分鐘的資料,啟動指令碼也要比較兩個 Master/Slave 中的 RDB檔案,載入較新的那個,微博就是這種架構。

Redis釋出訂閱

Redis 釋出訂閱 (pub/sub) 是一種訊息通訊模式:傳送者 (pub) 傳送訊息,訂閱者 (sub) 接收訊息。

Redis 客戶端可以訂閱任意數量的頻道。

傳送端:

127.0.0.1:6379>PUBLISH alum "hello,alum"			# 釋出者釋出訊息到頻道alum
(integer)1
127.0.0.1:6379>PUBLISH alum "hello,redis"			# 釋出者釋出訊息到頻道alum
(integer)1

訂閱端:

127.0.0.1:6379>SUBSCRIBE alum						# 訂閱一個頻道alun
Reading message...(press Ctrl-C to quit)
1)"subscribe"
2)"alum"
3)(integer)1
# 等待讀取推送端的訊息
1)"message"											# 訊息
2)"alum"											# 哪個頻道的訊息
3)"hello,alum"										# 訊息的具體內容

1)"message"											# 訊息
2)"alum"											# 哪個頻道的訊息
3)"hello,redis"										# 訊息的具體內容

Redis 釋出訂閱命令

下表列出了 redis 釋出訂閱常用命令:

序號命令及描述
1[PSUBSCRIBE pattern 訂閱一個或多個符合給定模式的頻道。
2[PUBSUB subcommand 檢視訂閱與釋出系統狀態。
3 將資訊傳送到指定的頻道。
4[PUNSUBSCRIBE 退訂所有給定模式的頻道。
5[SUBSCRIBE channel 訂閱給定的一個或多個頻道的資訊。
6[UNSUBSCRIBE 指退訂給定的頻道。

使用場景:

1、實時訊息系統!

2、事實聊天!(頻道當做聊天室,將資訊回顯給所有人即可!)

3、訂閱,關注系統都是可以的!

稍微複雜的場景我們就會使用 訊息中介軟體 MQ ()Redis主從複製

Redis主從複製

概念

主從複製,是指將一臺Redis伺服器的資料,複製到其他的Redis伺服器。前者稱為主節點(master/leader),後者稱為從節點(slave/follower);資料的複製是單向的,只能由主節點到從節點。Master以寫為主,Slave 以讀為主。

預設情況下,每臺Redis伺服器都是主節點;且一個主節點可以有多個從節點(或沒有從節點),但一個從節點只能有一個主節點。()

主要作用

1、資料冗餘:主從複製實現了資料的熱備份,是持久化之外的一種資料冗餘方式。

2、故障恢復:當主節點出現問題時,可以由從節點提供服務,實現快速的故障恢復;實際上是一種服務的冗餘。

3、負載均衡:在主從複製的基礎上,配合讀寫分離,可以由主節點提供寫服務,由從節點提供讀服務(即寫Redis資料時應用連線主節點,讀Redis資料時應用連線從節點),分擔伺服器負載;尤其是在寫少讀多的場景下,通過多個從節點分擔讀負載,可以大大提高Redis伺服器的併發量。

4、高可用(叢集)基石:除了上述作用以外,主從複製還是哨兵和叢集能夠實施的基礎,因此說主從複製是Redis高可用的基礎。一般來說,要將Redis運用於工程專案中,只使用一臺Redis是萬萬不能的(宕機),原因如下:

1、從結構上,單個Redis伺服器會發生單點故障,並且一臺伺服器需要處理所有的請求負載,壓力較大;

2、從容量上,單個Redis伺服器記憶體容量有限,就算一臺Redis伺服器記憶體容量為256G,也不能將所有記憶體用作Redis儲存記憶體,一般來說,單臺Redis最大使用記憶體不應該超過20G。電商網站上的商品,一般都是一次上傳,無數次瀏覽的,說專業點也就是"多讀少寫"。

配置

只配從庫,不配置主庫

127.0.0.1:6379> info replication 							# 檢視當前庫的資訊 
# Replication 
role:master													# 角色 master
connected_slaves:0 											# 沒有從機 
master_replid:b63c90e6c501143759cb0e7f450bd1eb0c70882a master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1 
repl_backlog_active:0 
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

複製3個配置檔案,然後修改對應的資訊

1、埠 # port 6379

2、pid 名字 # pidfile /var/run/redis_6379.pid

3、log檔名字 # logfile “6379.log”

4、dump.rdb 名字 # dbfilename dump6379.rdb

修改完畢之後,啟動我們的3個redis伺服器,可以通過程序資訊檢視!(ps -ef|grep redis)

一主二從

127.0.0.1:6380> SLAVEOF 127.0.0.1 6379 								# SLAVEOF host 6379 找誰當自己的老大! 
OK
127.0.0.1:6380> info replication 
# Replication 
role:slave 															# 當前角色是從機 
master_host:127.0.0.1												# 可以的看到主機的資訊 
master_port:6379
master_link_status:up master_last_io_seconds_ago:3 
master_sync_in_progress:0
slave_repl_offset:14 
slave_priority:100
slave_read_only:1
connected_slaves:0 
master_replid:a81be8dd257636b2d3e7a9f595e69d73ff03774e master_replid2:0000000000000000000000000000000000000000
master_repl_offset:14 
second_repl_offset:-1
repl_backlog_active:1 
repl_backlog_size:1048576 
repl_backlog_first_byte_offset:1 
repl_backlog_histlen:14


# 在主機中檢視!
127.0.0.1:6379> info replication 
# Replication 
role:master connected_slaves:1 										# 多了從機的配置
slave0:ip=127.0.0.1,port=6380,state=online,offset=42,lag=1			# 多了從機的配置 
master_replid:a81be8dd257636b2d3e7a9f595e69d73ff03774e master_replid2:0000000000000000000000000000000000000000 
master_repl_offset:42
second_repl_offset:-1 
repl_backlog_active:1
repl_backlog_size:1048576 
repl_backlog_first_byte_offset:1
repl_backlog_histlen:42

真實的從主配置應該在配置檔案中配置,這樣的話是永久的,我們這裡使用的是命令,暫時的。

主機可以寫,從機不能寫只能讀!主機中的所有資訊和資料,都會自動被從機儲存!

測試:

主機斷開連線,從機依舊連線到主機的,但是沒有寫操作,這個時候,主機如果回來了,從機依舊可以直接獲取到主機寫的資訊!

如果是使用命令列,來配置的主從,這個時候如果重啟了,就會變回主機!只要變為從機,立馬就會從主機中獲取值!

複製原理:

Slave 啟動成功連線到 master 後會傳送一個sync(同步命令)

Master 接到命令,啟動後臺的存檔程序,同時收集所有接收到的用於修改資料集命令,在後臺程序執行完畢之後,master將傳送整個資料檔案到slave,並完成一次完全同步。

全量複製:而slave服務在接收到資料庫檔案資料後,將其存檔並載入到記憶體中。

增量複製:Master 繼續將新的所有收集到的修改命令依次傳給slave,完成同步

但是隻要是重新連線master,一次完全同步(全量複製)將被自動執行! 我們的資料一定可以在從機中看到!

哨兵模式

自動選舉老大模式

主從切換技術的方法是:當主伺服器宕機後,需要手動把一臺從伺服器切換為主伺服器,這就需要人工干預,費事費力,還會造成一段時間內服務不可用。這不是一種推薦的方式,更多時候,我們優先考慮哨兵模式。Redis從2.8開始正式提供了Sentinel(哨兵) 架構來解決這個問題。

謀朝篡位的自動版,能夠後臺監控主機是否故障,如果故障了根據投票數自動將從庫轉換為主庫

哨兵模式是一種特殊的模式,首先Redis提供了哨兵的命令,哨兵是一個獨立的程序,作為程序,它會獨立執行。其原理是哨兵通過傳送命令,等待Redis伺服器響應,從而監控執行的多個Redis例項。

這裡的哨兵有兩個作用:

  • 通過傳送命令,讓Redis伺服器返回監控其執行狀態,包括主伺服器和從伺服器。
  • 當哨兵監測到master宕機,會自動將slave切換成master,然後通過釋出訂閱模式通知其他的從伺服器,修改配置檔案,讓它們切換主機。

然而一個哨兵程序對Redis伺服器進行監控,可能會出現問題,為此,我們可以使用多個哨兵進行監控。各個哨兵之間還會進行監控,這樣就形成了多哨兵模式。

假設主伺服器宕機,哨兵1先檢測到這個結果,系統並不會馬上進行failover過程,僅僅是哨兵1主觀的認為主伺服器不可用,這個現象成為主觀下線。當後面的哨兵也檢測到主伺服器不可用,並且數量達到一定值時,那麼哨兵之間就會進行一次投票,投票的結果由一個哨兵發起,進行failover[故障轉移]操作。切換成功後,就會通過釋出訂閱模式,讓各個哨兵把自己監控的從伺服器實現切換主機,這個過程稱為客觀下線

測試:

我們目前的狀態是 一主二從!

1、配置哨兵配置檔案 sentinel.conf

# sentinel monitor 被監控的名稱 host port 1 
sentinel monitor myredis 127.0.0.1 6379 1
#後面的這個數字1,代表主機掛了,slave投票看讓誰接替成為主機,票數最多的,就會成為主機!

2、啟動哨兵

redis-sentinel ArumConfig/sentinel.conf

如果Master 節點斷開了,這個時候就會從從機中隨機選擇一個伺服器! (這裡面有一個投票演算法!)

如果主機此時回來了,只能歸併到新的主機下,當做從機,這就是哨兵模式的規則!

優點:

1、哨兵叢集,基於主從複製模式,所有的主從配置優點,它全有

2、 主從可以切換,故障可以轉移,系統的可用性就會更好

3、哨兵模式就是主從模式的升級,手動到自動,更加健壯!

缺點:

1、Redis 不好啊線上擴容的,叢集容量一旦到達上限,線上擴容就十分麻煩!

2、實現哨兵模式的配置其實是很麻煩的,裡面有很多選擇!

哨兵模式的配置:

# Example sentinel.conf 

# 哨兵sentinel例項執行的埠 預設26379 
port 26379 

# 哨兵sentinel的工作目錄 
dir /tmp 

# 哨兵sentinel監控的redis主節點的 ip port 
# master-name 可以自己命名的主節點名字 只能由字母A-z、數字0-9 、這三個字元".-_"組成。
# quorum 配置多少個sentinel哨兵統一認為master主節點失聯 那麼這時客觀上認為主節點失聯了
# sentinel monitor <master-name> <ip> <redis-port> <quorum> 
sentinel monitor mymaster 127.0.0.1 6379 2 

# 當在Redis例項中開啟了requirepass foobared 授權密碼 這樣所有連線Redis例項的客戶端都要提供 密碼
# 設定哨兵sentinel 連線主從的密碼 注意必須為主從設定一樣的驗證密碼 
# sentinel auth-pass <master-name> <password> 
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd 

# 指定多少毫秒之後 主節點沒有應答哨兵sentinel 此時 哨兵主觀上認為主節點下線 預設30秒 
# sentinel down-after-milliseconds <master-name> <milliseconds> 
sentinel down-after-milliseconds mymaster 30000 

# 這個配置項指定了在發生failover主備切換時最多可以有多少個slave同時對新的master進行 同步,這個數字越小,完成failover所需的時間就越長,但是如果這個數字越大,就意味著越 多的slave因為replication而不可用。可以通過將這個值設為 1 來保證每次只有一個slave 處於不能處理命令請求的狀態。 
# sentinel parallel-syncs <master-name> <numslaves> 
sentinel parallel-syncs mymaster 1 

# 故障轉移的超時時間 failover-timeout 可以用在以下這些方面: 
#1. 同一個sentinel對同一個master兩次failover之間的間隔時間。 
#2. 當一個slave從一個錯誤的master那裡同步資料開始計算時間。直到slave被糾正為向正確的master那 裡同步資料時。 
#3.當想要取消一個正在進行的failover所需要的時間。 
#4.當進行failover時,配置所有slaves指向新的master所需的最大時間。不過,即使過了這個超時, slaves依然會被正確配置為指向master,但是就不按parallel-syncs所配置的規則來了 
# 預設三分鐘 # sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000

# SCRIPTS EXECUTION

#配置當某一事件發生時所需要執行的指令碼,可以通過指令碼來通知管理員,例如當系統執行不正常時發郵件通知 相關人員。 
#對於指令碼的執行結果有以下規則: 
#若指令碼執行後返回1,那麼該指令碼稍後將會被再次執行,重複次數目前預設為10 
#若指令碼執行後返回2,或者比2更高的一個返回值,指令碼將不會重複執行。 
#如果指令碼在執行過程中由於收到系統中斷訊號被終止了,則同返回值為1時的行為相同。 
#一個指令碼的最大執行時間為60s,如果超過這個時間,指令碼將會被一個SIGKILL訊號終止,之後重新執行。

#通知型指令碼:當sentinel有任何警告級別的事件發生時(比如說redis例項的主觀失效和客觀失效等等), 將會去呼叫這個指令碼,這時這個指令碼應該通過郵件,SMS等方式去通知系統管理員關於系統不正常執行的信 息。呼叫該指令碼時,將傳給指令碼兩個引數,一個是事件的型別,一個是事件的描述。如果sentinel.conf配 置檔案中配置了這個指令碼路徑,那麼必須保證這個指令碼存在於這個路徑,並且是可執行的,否則sentinel無 法正常啟動成功。
#通知指令碼 
# shell程式設計 
# sentinel notification-script <master-name> <script-path>
sentinel notification-script mymaster /var/redis/notify.sh 

# 客戶端重新配置主節點引數指令碼
# 當一個master由於failover而發生改變時,這個指令碼將會被呼叫,通知相關的客戶端關於master地址已 經發生改變的資訊。 
# 以下引數將會在呼叫指令碼時傳給指令碼: 
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port> 
# 目前<state>總是“failover”, 
# <role>是“leader”或者“observer”中的一個。
# 引數 from-ip, from-port, to-ip, to-port是用來和舊的master和新的master(即舊的slave)通訊的
# 這個指令碼應該是通用的,能被多次呼叫,不是針對性的。 
# sentinel client-reconfig-script <master-name> <script-path> 
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh 		# 一般都是由運維來配置

Redis快取穿透和雪崩

Redis快取的使用,極大的提升了應用程式的效能和效率,特別是資料查詢方面。但同時,它也帶來了一些問題。其中,最要害的問題,就是資料的一致性問題,從嚴格意義上講,這個問題無解。如果對資料的一致性要求很高,那麼就不能使用快取。

另外的一些典型問題就是,快取穿透、快取雪崩和快取擊穿。目前,業界也都有比較流行的解決方案。

快取穿透

概念

使用者想要查詢一個數據,發現redis記憶體資料庫沒有,也就是快取沒有命中,於是向持久層資料庫查詢。發現也沒有,於是本次查詢失敗。當用戶很多的時候,快取都沒有命中(秒殺!),於是都去請求了持久層資料庫。這會給持久層資料庫造成很大的壓力,這時候就相當於出現了快取穿透。

解決方案

1、布隆過濾器

布隆過濾器是一種資料結構,對所有可能查詢的引數以hash形式儲存,在控制層先進行校驗,不符合則丟棄,從而避免了對底層儲存系統的查詢壓力;

2、快取空物件

當儲存層不命中後,即使返回的空物件也將其快取起來,同時會設定一個過期時間,之後再訪問這個資料將會從快取中獲取,保護了後端資料來源;

但是這種方法會存在兩個問題:

  • 如果空值能夠被快取起來,這就意味著快取需要更多的空間儲存更多的鍵,因為這當中可能會有很多的空值的鍵;
  • 即使對空值設定了過期時間,還是會存在快取層和儲存層的資料會有一段時間視窗的不一致,這對於需要保持一致性的業務會有影響。

快取擊穿

概念

這裡需要注意和快取擊穿的區別,快取擊穿,是指一個key非常熱點,在不停的扛著大併發,大併發集中對這一個點進行訪問,當這個key在失效的瞬間,持續的大併發就穿破快取,直接請求資料庫,就像在一個屏障上鑿開了一個洞。

當某個key在過期的瞬間,有大量的請求併發訪問,這類資料一般是熱點資料,由於快取過期,會同時訪問資料庫來查詢最新資料,並且回寫快取,會導使資料庫瞬間壓力過大。

解決方案

1、設定熱點資料永不過期

從快取層面來看,沒有設定過期時間,所以不會出現熱點 key 過期後產生的問題。

2、加互斥鎖

分散式鎖:使用分散式鎖,保證對於每個key同時只有一個執行緒去查詢後端服務,其他執行緒沒有獲得分散式鎖的許可權,因此只需要等待即可。這種方式將高併發的壓力轉移到了分散式鎖,因此對分散式鎖的考驗很大。

快取雪崩

概念

快取雪崩,是指在某一個時間段,快取集中過期失效。Redis 宕機!

產生雪崩的原因之一,比如在寫本文的時候,馬上就要到雙十二零點,很快就會迎來一波搶購,這波商品時間比較集中的放入了快取,假設快取一個小時。那麼到了凌晨一點鐘的時候,這批商品的快取就都過期了。而對這批商品的訪問查詢,都落到了資料庫上,對於資料庫而言,就會產生週期性的壓力波峰。於是所有的請求都會達到儲存層,儲存層的呼叫量會暴增,造成儲存層也會掛掉的情況。

其實集中過期,倒不是非常致命,比較致命的快取雪崩,是快取伺服器某個節點宕機或斷網。因為自然形成的快取雪崩,一定是在某個時間段集中建立快取,這個時候,資料庫也是可以頂住壓力的。無非就是對資料庫產生週期性的壓力而已。而快取服務節點的宕機,對資料庫伺服器造成的壓力是不可預知的,很有可能瞬間就把資料庫壓垮。

解決方案

1、redis高可用

這個思想的含義是,既然redis有可能掛掉,那我多增設幾臺redis,這樣一臺掛掉之後其他的還可以繼續工作,其實就是搭建的叢集(異地多活!)

2、限流降級

這個解決方案的思想是,在快取失效後,通過加鎖或者佇列來控制讀資料庫寫快取的執行緒數量。比如對某個key只允許一個執行緒查詢資料和寫快取,其他執行緒等待。

3、資料預熱

資料加熱的含義就是在正式部署之前,我先把可能的資料先預先訪問一遍,這樣部分可能大量訪問的資料就會載入到快取中。在即將發生大併發訪問前手動觸發載入快取不同的key,設定不同的過期時間,讓快取失效的時間點儘量均勻。