1. 程式人生 > 資料庫 >Python定時從Mysql提取資料存入Redis的實現

Python定時從Mysql提取資料存入Redis的實現

設計思路:

1.程式一旦run起來,python會把mysql中最近一段時間的資料全部提取出來

2.然後例項化redis類,將資料簡單解析後逐條傳入redis佇列

3.定時器設計每天凌晨12點開始跑

ps:redis是個記憶體資料庫,做後臺訊息佇列的快取時有很大的用處,有興趣的小夥伴可以去檢視相關的文件。

 # -*- coding:utf-8 -*- 

import MySQLdb
import schedule
import time
import datetime
import random
import string
import redis

# get the data from mysql
class FromSql(object):
  def __init__(self,conn):
    self.conn = conn

  def acquire(self):
    cursor = self.conn.cursor()
    try:
      sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1"

      cursor.execute(sql)
      rs = cursor.fetchall()
      #print (rs)
      for eve in rs:

        print('%s,%s,%s' % eve)
      copy_rs = rs
      cursor.close()

      return copy_rs 

    except Exception as e:
      print("The error: %s" % e)


class RedisQueue(object):

  def __init__(self,name,namespace='queue',**redis_kwargs):
    """The default connection parameters are: host='localhost',port=6379,db=0"""
    self.__db= redis.Redis(**redis_kwargs)
    self.key = '%s:%s' %(namespace,name)

  def qsize(self):
    return self.__db.llen(self.key)

  def put(self,item):
    self.__db.rpush(self.key,item)

  def get(self,block=True,timeout=None):

    if block:
      item = self.__db.blpop(self.key,timeout=timeout)
    else:
      item = self.__db.lpop(self.key)

    if item:
      item = item[1]
    return item

  def get_nowait(self):
    return self.get(False)


if __name__ == "__main__":
  # connect mysqldb
  conn_sql = MySQLdb.connect(
            host = '127.0.0.1',port = 3306,user = 'root',passwd = '',db = 'test',charset = 'utf8'
            )


def job_for_redis():
    get_data = FromSql(conn_sql)
    data = get_data.acquire()

    q = RedisQueue('test',host='localhost',db=0)
    for single_data in data:
      for meta_data in single_data:
        q.put(meta_data)
        print(meta_data)
    print("All data had been inserted.") 

"""
  try:
    schedule.every().day.at("00:00").do(job_for_redis)
  except Exception as e:
    print('Error: %s'% e)
#  finally:
#    conn.close()

  while True:
    schedule.run_pending()
    time.sleep(1)
"""

補充知識:python定時獲取匯率存入資料庫

python定時任務:

我們可以使用 輕量級的第三方模組schedule。首先先安裝:pip install schedule

定時任務的的小測試:

import schedule
import time
 
def job():
  print("I'm working...")
 
schedule.every(10).minutes.do(job)       # 每隔10分鐘執行一次任務
schedule.every().hour.do(job)          # 每隔一小時執行一次任務
schedule.every().day.at("10:30").do(job)    # 每天10:30執行一次任務
schedule.every(5).to(10).days.do(job)      # 每5-10天執行一次任務
schedule.every().monday.do(job)         # 每週一的這個時候執行一次任務
schedule.every().wednesday.at("13:15").do(job) # 每週三13:15執行一次任務
 
while True:
  schedule.run_pending()

獲取資料存入資料庫:(格式可能不太對,還有一些符號。自己修改一下即可)

import pymysql
import schedule
import time
import requests
import pandas
from sqlalchemy import create_engine

#獲取美元的所有外匯
def job():
  content = '美元'
  url = 'http://www.boc.cn/sourcedb/whpj/index.html' #外匯資料地址
  html = requests.get(url).content.decode('utf-8')

  index = html.index('<td>' + content + '</td>')
  str = html[index:index+300]
  result = re.findall('<td>(.*?)</td>',str)

  print("幣種:" + result[0])
  print("現匯買入價:" + result[1])
  print("現鈔買入價:" + result[2])
  print("現匯賣出價:" + result[3])
  print("現鈔賣出價:" + result[4])
  print("中行結算價:" + result[5])
  print("釋出時間:" + result[6] + ' ' + result[7])
  
 #本地地址 資料庫賬號 密碼  資料庫名
  db = pymysql.connect('localhost','root','pinyougoudb')
  cursor = db.cursor()
  
 #sql語句
  sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1],result[2],result[3],result[4],result[5],result[6] + ' ' + result[7],result[0])

  cursor.execute(sql)
  db.commit()
  print('success')

 # 查詢語句,將存入的資料查出來
  # sqlalchemy 進行資料庫初始化
  engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb')
  sql = '''select * from tb_money'''

  # pandas 進行資料庫讀寫
  df = pandas.read_sql_query(sql,engine)
  print(df)

  db.commit()


# 每隔幾分中重新整理一次
#schedule.every(0.1).minutes.do(job)

#每天什麼時候重新整理
schedule.every().day.at("09:29").do(job)
schedule.every().day.at("09:30").do(job)

#一直迴圈 知道滿足條件執行
while True:
  schedule.run_pending()

以上這篇Python定時從Mysql提取資料存入Redis的實現就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。