Python呼叫Redis的示例程式碼
阿新 • • 發佈:2020-11-27
#!/usr/bin/env python # -*- coding:utf-8 -*- # ************************************* # @Time : 2019/8/12 # @Author : Zhang Fan # @Desc : Library # @File : MyRedis.py # @Update : 2019/8/23 # ************************************* import redis class MyRedis(object): """ =================================================================== ===================== MyRedis ======================== =================================================================== """ def __init__(self): self.redis_conn = None self.redis_db = None def connect_to_redis(self,redis_host,redis_port=6379,db=0,password=None): """ 連線到Redis伺服器 """ self.redis_db = db print('Executing : Connect To Redis | host={0},port={1},db={2},password={3}' .format(redis_host,redis_port,self.redis_db,password)) try: self.redis_conn = redis.StrictRedis( host=redis_host,port=redis_port,db=self.redis_db,password=password) except Exception as ex: logger.error(str(ex)) raise Exception(str(ex)) def redis_key_should_be_exist(self,name): """ 驗證redis存在指定鍵 """ if not self.redis_conn.exists(name): logger.error(("Redis of db%s doesn't exist in key [ %s ]." % (self.redis_db,name))) raise AssertionError def redis_key_should_not_be_exist(self,name): """ 驗證redis不存在指定鍵 """ if self.redis_conn.exists(name): logger.error(("Redis of db%s exist in key [ %s ]." % (self.redis_db,name))) raise AssertionError def getkeys_from_redis_bypattern(self,pattern,field=None): """ 獲取redis所有鍵值 """ keys_list = list() print('Executing : Getall Key | %s' % pattern) if field is None: return self.redis_conn.keys(pattern) else: keys = self.redis_conn.keys(pattern) for key in keys: if not self.redis_conn.hget(key,field) is None: keys_list.append(key) return keys_list # ========================== String Type ============================= def get_from_redis(self,name): """ 獲取redis資料 """ print('Executing : Get Key | %s' % name) return self.redis_conn.get(name) def del_from_redis(self,name): """ 刪除redis中的任意資料型別 """ return self.redis_conn.delete(name) def set_to_redis(self,name,data,expire_time=0): """ 設定redis執行key的值 """ return self.redis_conn.set(name,expire_time) def append_to_redis(self,value): """ 新增資料到redis """ return self.redis_conn.append(name,value) # ========================== Hash Type ========================== def hgetall_from_redis(self,name): """ 獲取redis hash所有資料 """ print('Executing : Hgetall Key | %s' % name) return self.redis_conn.hgetall(name) def hget_from_redis(self,key): """ 獲取redis hash指定key資料 """ print('Executing : Hget Key | %s' % name) return self.redis_conn.hget(name,key) def hset_to_redis(self,key,data): """ 設定redis指定key的值 """ print(('Executing : Hset Redis | name={0},key={1},data={2}' .format(name,data))) return self.redis_conn.hset(name,data) def hdel_to_redis(self,*keys): """ 刪除redis指定key的值 """ print('Executing : Hdel Key | ',*keys) self.redis_conn.hdel(name,*keys) # ========================= ZSet Type ================================ def get_from_redis_zscore(self,values): """ 獲取name對應有序集合中 value 對應的分數 """ try: return int(self.redis_conn.zscore(name,values)) except: return self.redis_conn.zscore(name,values) def get_from_redis_zrange(self,start=0,end=10): """ 按照索引範圍獲取name對應的有序集合的元素 """ return self.redis_conn.zrange(name,start,end,desc=False,withscores=True,score_cast_func=int) def del_from_redis_zrem(self,values): """ 刪除name對應的有序集合中值是values的成員 """ return self.redis_conn.zrem(name,values) def add_from_redis_zadd(self,value,score): """ 在name對應的有序集合中新增一條。若值存在,則修改對應分數。 """ return self.redis_conn.zadd(name,{value: score}) def count_from_redis_zcard(self,name): """ 獲取name對應的有序集合元素的數量 """ return self.redis_conn.zcard(name) if __name__ == '__main__': print('This is test.') mr = MyRedis()
以上就是Python呼叫Redis的示例程式碼的詳細內容,更多關於Python呼叫Redis的資料請關注我們其它相關文章!