Python定時從Mysql提取資料存入Redis的實現
阿新 • • 發佈:2020-05-12
設計思路:
1.程式一旦run起來,python會把mysql中最近一段時間的資料全部提取出來
2.然後例項化redis類,將資料簡單解析後逐條傳入redis佇列
3.定時器設計每天凌晨12點開始跑
ps:redis是個記憶體資料庫,做後臺訊息佇列的快取時有很大的用處,有興趣的小夥伴可以去檢視相關的文件。
# -*- coding:utf-8 -*- import MySQLdb import schedule import time import datetime import random import string import redis # get the data from mysql class FromSql(object): def __init__(self,conn): self.conn = conn def acquire(self): cursor = self.conn.cursor() try: sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1" cursor.execute(sql) rs = cursor.fetchall() #print (rs) for eve in rs: print('%s,%s,%s' % eve) copy_rs = rs cursor.close() return copy_rs except Exception as e: print("The error: %s" % e) class RedisQueue(object): def __init__(self,name,namespace='queue',**redis_kwargs): """The default connection parameters are: host='localhost',port=6379,db=0""" self.__db= redis.Redis(**redis_kwargs) self.key = '%s:%s' %(namespace,name) def qsize(self): return self.__db.llen(self.key) def put(self,item): self.__db.rpush(self.key,item) def get(self,block=True,timeout=None): if block: item = self.__db.blpop(self.key,timeout=timeout) else: item = self.__db.lpop(self.key) if item: item = item[1] return item def get_nowait(self): return self.get(False) if __name__ == "__main__": # connect mysqldb conn_sql = MySQLdb.connect( host = '127.0.0.1',port = 3306,user = 'root',passwd = '',db = 'test',charset = 'utf8' ) def job_for_redis(): get_data = FromSql(conn_sql) data = get_data.acquire() q = RedisQueue('test',host='localhost',db=0) for single_data in data: for meta_data in single_data: q.put(meta_data) print(meta_data) print("All data had been inserted.") """ try: schedule.every().day.at("00:00").do(job_for_redis) except Exception as e: print('Error: %s'% e) # finally: # conn.close() while True: schedule.run_pending() time.sleep(1) """
補充知識:python定時獲取匯率存入資料庫
python定時任務:
我們可以使用 輕量級的第三方模組schedule。首先先安裝:pip install schedule
定時任務的的小測試:
import schedule import time def job(): print("I'm working...") schedule.every(10).minutes.do(job) # 每隔10分鐘執行一次任務 schedule.every().hour.do(job) # 每隔一小時執行一次任務 schedule.every().day.at("10:30").do(job) # 每天10:30執行一次任務 schedule.every(5).to(10).days.do(job) # 每5-10天執行一次任務 schedule.every().monday.do(job) # 每週一的這個時候執行一次任務 schedule.every().wednesday.at("13:15").do(job) # 每週三13:15執行一次任務 while True: schedule.run_pending()
獲取資料存入資料庫:(格式可能不太對,還有一些符號。自己修改一下即可)
import pymysql import schedule import time import requests import pandas from sqlalchemy import create_engine #獲取美元的所有外匯 def job(): content = '美元' url = 'http://www.boc.cn/sourcedb/whpj/index.html' #外匯資料地址 html = requests.get(url).content.decode('utf-8') index = html.index('<td>' + content + '</td>') str = html[index:index+300] result = re.findall('<td>(.*?)</td>',str) print("幣種:" + result[0]) print("現匯買入價:" + result[1]) print("現鈔買入價:" + result[2]) print("現匯賣出價:" + result[3]) print("現鈔賣出價:" + result[4]) print("中行結算價:" + result[5]) print("釋出時間:" + result[6] + ' ' + result[7]) #本地地址 資料庫賬號 密碼 資料庫名 db = pymysql.connect('localhost','root','pinyougoudb') cursor = db.cursor() #sql語句 sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1],result[2],result[3],result[4],result[5],result[6] + ' ' + result[7],result[0]) cursor.execute(sql) db.commit() print('success') # 查詢語句,將存入的資料查出來 # sqlalchemy 進行資料庫初始化 engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb') sql = '''select * from tb_money''' # pandas 進行資料庫讀寫 df = pandas.read_sql_query(sql,engine) print(df) db.commit() # 每隔幾分中重新整理一次 #schedule.every(0.1).minutes.do(job) #每天什麼時候重新整理 schedule.every().day.at("09:29").do(job) schedule.every().day.at("09:30").do(job) #一直迴圈 知道滿足條件執行 while True: schedule.run_pending()
以上這篇Python定時從Mysql提取資料存入Redis的實現就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。