Python Flask框架 資料庫連線池
Python Flask 框架
..............
資料庫連結池
pip3 install pymysql dbutils
簡單實現
''' @Date : 2020-11-12 20:02:49 @LastEditors : Pineapple @LastEditTime : 2020-11-13 21:01:53 @FilePath : /database_pool/連線池.py @Blog : https://blog.csdn.net/pineapple_C @Github : https://github.com/Pineapple666 ''' from threading import Thread import pymysql from dbutils.pooled_db import PooledDB POOL = PooledDB( creator=pymysql, # 指定建立連線的包 maxconnections=6, # 最大連線數 mincached=2, # 初始連結數 blocking=True, # 阻塞時是否等待 ping=0, # 測試連線是否正常, 不同情況的值不同 # 連線mysql的必備引數 host='127.0.0.1', port=3306, user='root', password='mysql', database='job51', charset='utf8' ) def task(num): # 去連線池獲取連線 conn = POOL.connection() cursor = conn.cursor() # cursor.execute('select * from job51') cursor.execute('select sleep(3)') result = cursor.fetchall() cursor.close() # 將連線放回到連線池 conn.close() print(num, '----------->', result) for i in range(40): t = Thread(target=task, args=(i,)) t.start()
建立POOL物件時, 連線數為零, 只是建立好了一個容量為6的空池子.
按照不同的需求, 引數ping可以指定為0, 1, 2, 4, 7
原始碼註釋是這樣的:
ping: determines when the connection should be checked with ping() (0 = None = never, 1 = default = whenever fetched from the pool, 2 = when a cursor is created, 4 = when a query is executed, 7 = always, and all other bit combinations of these values)
實現相關查詢功能
基於函式實現sqlhelper
''' @Date : 2020-11-12 20:59:10 @LastEditors : Pineapple @LastEditTime : 2020-11-12 21:09:27 @FilePath : /flask_test/database_pool/sqlhelper.py @Blog : https://blog.csdn.net/pineapple_C @Github : https://github.com/Pineapple666 ''' import pymysql from dbutils.pooled_db import PooledDB POOL = PooledDB( creator=pymysql, maxconnections=6, mincached=2, blocking=True, ping=0, host='127.0.0.1', port=3306, user='root', password='mysql', database='job51', charset='utf8' ) def fetchall(sql, *args): """獲取所有資料""" conn = POOL.connection() cursor = conn.cursor() cursor.execute(sql, args) result = cursor.fetchall() cursor.close() conn.close() return result def fetchone(sql, *args): """獲取一條資料""" conn = POOL.connection() cursor = conn.cursor() cursor.execute(sql, args) result = cursor.fetchone() cursor.close() conn.close() return result
pymysql的execute方法會將args新增到sql語句中, 所以在編寫函式的時候可以使用*args打包引數的功能, 打包成一個元組傳入execute方法
這樣就簡單的實現了常用的 獲取全部資料fetchall
方法, 和獲取一條資料fetchone
方法
編寫pool_test.py 來測試一下
'''
@Date : 2020-11-13 21:22:50
@LastEditors : Pineapple
@LastEditTime : 2020-11-13 21:33:32
@FilePath : /database_pool/pool_test.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
from flask import Flask
import sqlhelper
app = Flask(__name__)
@app.route('/login')
def login():
print(sqlhelper.fetchall('select * from book where rating_nums=%s', '9.0'))
return 'login'
@app.route('/index')
def index():
print(sqlhelper.fetchone('select * from job51 where name=%s', '前端開發工程師'))
return 'index'
@app.route('/order')
def order():
return 'order'
if __name__ == "__main__":
app.run(debug=True)
基於類實現sqlhelper
既然設計了資料庫連線池, 所以我們希望全域性只有一個連線池, 那麼這個類必須是一個單例模式, 好在Python的類很輕鬆就能實現這種功能, 在導包的時候會生成.pyc Python位元組碼檔案, 之後再次使用就會執行這個.pyc 位元組碼檔案. 所以如果在同一個問價中, 我們可以通過匯入模組的方式輕鬆的實現一個單例類.
# s1.py 檔案中
class Foo(object):
def test(self):
print("123")
v = Foo()
# v是Foo的例項
------
# s2.py 檔案中
from s1 import v as v1
print(v1,id(v1)) #<s1.Foo object at 0x0000000002221710> 35788560
from s1 import v as v2
print(v1,id(v2)) #<s1.Foo object at 0x0000000002221710> 35788560
# 兩個的記憶體地址是一樣的
# 檔案載入的時候,第一次匯入後,再次匯入時不會再重新載入。
Python幫了我們這麼多, 我們就可以專心的設計sqlhelper類了.
'''
@Date : 2020-11-13 16:46:20
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 09:10:00
@FilePath : /database_pool/sqlhelper2.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
import pymysql
from dbutils.pooled_db import PooledDB
class SqlHelper:
def __init__(self) -> None:
self.pool = PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
blocking=True,
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='mysql',
database='job51',
charset='utf8'
)
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn, cursor
def close(self, conn, cursor):
cursor.close()
conn.close()
def fetchall(self, sql, *args):
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn, cursor)
return result
def fetchone(self, sql, *args):
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
db = SqlHelper()
編寫類和編寫函式一樣簡單, 新增的open和close方法會實現資料庫的連線和關閉, 不僅去掉了重複的程式碼, 在使用sqlhelper的時候輕鬆呼叫open和close並在中間加上其他的功能.
編寫sqlhelper來測試一下:
'''
@Date : 2020-11-13 21:22:50
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 09:47:20
@FilePath : /database_pool/pool_test.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
from flask import Flask
from sqlhelper2 import db
app = Flask(__name__)
@app.route('/login')
def login():
print(db.fetchall('select * from book where rating_nums=%s', '9.0'))
return 'login'
@app.route('/index')
def index():
print(db.fetchone('select * from quotes where author=%s', 'Tim Peters'))
return 'index'
@app.route('/order')
def order():
author = 'Tim Peters'
txt = 'Simple is better than complex.'
tags = 'The Zen of Python'
conn, cursor = db.open()
sql = 'insert into quotes (author, txt, tags) values(%s, %s, %s)'
cursor.execute(sql, (author, txt, tags))
conn.commit()
db.close(conn, cursor)
return 'oder'
if __name__ == "__main__":
app.run(debug=True)
在login和index函式裡還是呼叫了sqlhelper已經寫好的fetchall fetchone方法, 在order中通過呼叫sqlhelper的open和close方法,在其中間實現了插入的功能, 這樣的sqlhelper不僅用起來方便, 而且拓展性強
上下文管理
在Python中使用with關鍵字實現上下文管理器
class Foo:
def __enter__(self):
return 123
def __exit__(self, exc_type, exc_val, exc_tb):
pass
foo = Foo()
with foo as f:
print(f)
在使用with關鍵字時會呼叫類的 __enter__
方法, 此方法返回的內容就是as後的物件f, 在退出時會呼叫類的 __exit__
方法
我們最熟悉的檔案操作, open()方法就是這樣實現的, 若不使用上下文管理器, 每開啟一個檔案都要呼叫此檔案物件的close方法進行關閉
資料庫的連線和關閉操作也可以使用上下文管理器的方式
'''
@Date : 2020-11-14 10:14:38
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 10:36:33
@FilePath : /database_pool/sqlhelper3.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
import pymysql
from dbutils.pooled_db import PooledDB
class SqlHelper:
def __init__(self) -> None:
self.pool = PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
blocking=True, # 阻塞時是否等待
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='mysql',
database='job51',
charset='utf8'
)
def __enter__(self):
self.conn = self.pool.connection()
self.cursor = self.conn.cursor()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.conn.close()
def fetchall(self, sql, *args):
with self as db:
db.cursor.execute(sql, args)
result = db.cursor.fetchall()
return result
def fetchone(self, sql, *args):
with self as db:
db.cursor.execute(sql, args)
result = db.cursor.fetchone()
return result
sqlhelper = SqlHelper()
用__enter__
和__exit__
方法實現open和close的操作
編寫pool_test.py 測試一下
'''
@Date : 2020-11-13 21:22:50
@LastEditors : Pineapple
@LastEditTime : 2020-11-14 10:29:44
@FilePath : /database_pool/pool_test.py
@Blog : https://blog.csdn.net/pineapple_C
@Github : https://github.com/Pineapple666
'''
from flask import Flask
from sqlhelper3 import sqlhelper
app = Flask(__name__)
@app.route('/login')
def login():
print(sqlhelper.fetchall('select * from book where rating_nums=%s', '9.0'))
return 'login'
@app.route('/index')
def index():
print(sqlhelper.fetchone('select * from quotes where author=%s', 'Tim Peters'))
return 'index'
@app.route('/order')
def order():
author = 'Tim Peters'
txt = 'Simple is better than complex.'
tags = 'The Zen of Python'
with sqlhelper as db:
sql = 'insert into quotes (author, txt, tags) values(%s, %s, %s)'
db.cursor.execute(sql, (author, txt, tags))
db.conn.commit()
return 'oder'
if __name__ == "__main__":
app.run(debug=True)
用上下文管理器實現了自定義資料插入的操作, 比用之前方便了很多