1. 程式人生 > 資料庫 >Python呼叫Redis的示例程式碼

Python呼叫Redis的示例程式碼

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# *************************************
# @Time  : 2019/8/12
# @Author : Zhang Fan
# @Desc  : Library
# @File  : MyRedis.py
# @Update : 2019/8/23
# *************************************
import redis


class MyRedis(object):
  """
  ===================================================================
  =====================    MyRedis    ========================
  ===================================================================
  """
  def __init__(self):
    self.redis_conn = None
    self.redis_db = None

  def connect_to_redis(self,redis_host,redis_port=6379,db=0,password=None):
    """
    連線到Redis伺服器
    """
    self.redis_db = db
    print('Executing : Connect To Redis | host={0},port={1},db={2},password={3}'
           .format(redis_host,redis_port,self.redis_db,password))
    try:
      self.redis_conn = redis.StrictRedis(
        host=redis_host,port=redis_port,db=self.redis_db,password=password)
    except Exception as ex:
      logger.error(str(ex))
      raise Exception(str(ex))

  def redis_key_should_be_exist(self,name):
    """
    驗證redis存在指定鍵
    """
    if not self.redis_conn.exists(name):
      logger.error(("Redis of db%s doesn't exist in key [ %s ]." % (self.redis_db,name)))
      raise AssertionError

  def redis_key_should_not_be_exist(self,name):
    """
    驗證redis不存在指定鍵
    """
    if self.redis_conn.exists(name):
      logger.error(("Redis of db%s exist in key [ %s ]." % (self.redis_db,name)))
      raise AssertionError

  def getkeys_from_redis_bypattern(self,pattern,field=None):
    """
    獲取redis所有鍵值
    """
    keys_list = list()
    print('Executing : Getall Key | %s' % pattern)
    if field is None:
      return self.redis_conn.keys(pattern)
    else:
      keys = self.redis_conn.keys(pattern)
      for key in keys:
        if not self.redis_conn.hget(key,field) is None:
          keys_list.append(key)
      return keys_list

  # ========================== String Type =============================
  def get_from_redis(self,name):
    """
    獲取redis資料
    """
    print('Executing : Get Key | %s' % name)
    return self.redis_conn.get(name)

  def del_from_redis(self,name):
    """
    刪除redis中的任意資料型別
    """
    return self.redis_conn.delete(name)

  def set_to_redis(self,name,data,expire_time=0):
    """
    設定redis執行key的值
    """
    return self.redis_conn.set(name,expire_time)

  def append_to_redis(self,value):
    """
    新增資料到redis
    """
    return self.redis_conn.append(name,value)

    # ========================== Hash Type ==========================
  def hgetall_from_redis(self,name):
    """
    獲取redis hash所有資料
    """
    print('Executing : Hgetall Key | %s' % name)
    return self.redis_conn.hgetall(name)

  def hget_from_redis(self,key):
    """
    獲取redis hash指定key資料
    """
    print('Executing : Hget Key | %s' % name)
    return self.redis_conn.hget(name,key)

  def hset_to_redis(self,key,data):
    """
    設定redis指定key的值
    """
    print(('Executing : Hset Redis | name={0},key={1},data={2}'
           .format(name,data)))
    return self.redis_conn.hset(name,data)

  def hdel_to_redis(self,*keys):
    """
    刪除redis指定key的值
    """
    print('Executing : Hdel Key | ',*keys)
    self.redis_conn.hdel(name,*keys)

  # ========================= ZSet Type ================================
  def get_from_redis_zscore(self,values):
    """
    獲取name對應有序集合中 value 對應的分數
    """
    try:
      return int(self.redis_conn.zscore(name,values))
    except:
      return self.redis_conn.zscore(name,values)

  def get_from_redis_zrange(self,start=0,end=10):
    """
    按照索引範圍獲取name對應的有序集合的元素
    """
    return self.redis_conn.zrange(name,start,end,desc=False,withscores=True,score_cast_func=int)

  def del_from_redis_zrem(self,values):
    """
    刪除name對應的有序集合中值是values的成員
    """
    return self.redis_conn.zrem(name,values)

  def add_from_redis_zadd(self,value,score):
    """
    在name對應的有序集合中新增一條。若值存在,則修改對應分數。
    """
    return self.redis_conn.zadd(name,{value: score})

  def count_from_redis_zcard(self,name):
    """
    獲取name對應的有序集合元素的數量
    """
    return self.redis_conn.zcard(name)


if __name__ == '__main__':
  print('This is test.')
  mr = MyRedis()

以上就是Python呼叫Redis的示例程式碼的詳細內容,更多關於Python呼叫Redis的資料請關注我們其它相關文章!