1. 程式人生 > >第6課:datetime模塊、操作數據庫、__name__、redis、mock接口

第6課:datetime模塊、操作數據庫、__name__、redis、mock接口

connect items deb abs man serve ask rac 操作數

1. datetime模塊
import datetime

print(datetime.datetime.today())  # 當前時間 2018-01-23 17:22:35.739667
print(datetime.datetime.now())  # 和today一樣 2018-01-23 17:22:35.739667

print(datetime.datetime.today().strftime(‘%H:%M:%S‘))  # 按指定格式格式化好的時間字符串
print(datetime.datetime.today() + datetime.timedelta(-3))  # 取三天前的 三天後的(3) 2018-01-20 17:25:37.470085
print(datetime.date.today())  # 取當天的日期 2018-01-23
2. 操作mysql數據庫
import pymysql
import config
# 1. 連接mysql,ip 端口號 用戶名 密碼 數據庫
# 2. 建立遊標
# 3. 執行sql
# 4. 獲取結果
# 5. 關閉遊標
# 6. 關閉連接

# charset必須寫utf8,寫utf-8會報錯
conn = pymysql.connect(host=config.host, port=config.port, user=config.user, passwd=config.passwd,
                       db=config.dbname, charset=‘utf8‘)
cur = conn.cursor()  # 建立遊標,這種方式獲取到的返回結果是元組
# cur = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 這種方式獲取到的結果是字典類型的
sql = ‘select * from bt_stu limit 2;‘
# sql = "insert into bt_stu values(560, ‘qq‘, 1, 18101000001, ‘中國‘, 1);"
cur.execute(sql)  # 執行sql
conn.commit()
res = cur.fetchall()  # 獲取sql語句執行的結果,是一個二維的元組
# res1 = cur.fetchone()  # 只獲取一條結果,它的結果是一個一維元組
print(res)
# print(res1)
#
# cur.scroll(0, mode=‘absolute‘)  # 移動遊標,絕對路徑,移至最前面
# cur.scroll(0, mode=‘relative‘)  # 移動遊標,相對於當前位置
cur.close()
conn.close()
3. if __name__ == ‘__main__‘: # 用法
a.py

print("outside: ", __name__)  # 別的文件導入這個文件,運行的結果是當前文件名tools

if __name__ == ‘__main__‘:  # 別人導入這個文件時,這個函數不執行,
    print(‘main:‘, __name__)  # 運行當前文件時,結果是__main__

  

b.py
# import一個文件實質是把這個文件運行了一遍 # import的文件中如果有if __name__ == ‘__main__‘,導入這個文件時時不執行這個函數 import a # 運行此文件輸出結果是 a
4. redis操作
1.  關系型數據庫:如mysql Oracle sqlserver     
非關系型數據庫:

  key-value格式的
  memcahe # 存在內存中
  redis # 存在內存中
  mongodb # 數據存在磁盤中
2.   redis的安裝:pip install redis
3.   
import redis
# redis 只有密碼,沒有用戶名
# 字符串類型
r = redis.Redis(host=‘211.149.***.**‘, port=6379, password=‘******‘, db=1)  # 端口默認6379
# r.set(‘qxy_session‘, ‘201801211506‘) # set數據
# print(r.get(‘qxy_session‘))  # redis取出來的數據都是bytes類型的 b‘201801211506‘
# print(r.get(‘qxy_session‘).decode())  # 所以需要用decode方法轉成字符串 201801211506
# r.delete(‘qxy_session‘)  # 刪除一個
# r.setex(‘qxy‘, ‘hahaha‘, 20)  # 可以指定key的失效時間,單位是秒
# set get delete setex 都是針對string類型的 k - v
# 這種寫法是有層級的
r.set(‘qxy:test1‘, ‘沒交作業‘)
r.set(‘qxy:test2‘, ‘交了作業‘)
print(r.keys())  # 獲取所有的key
print(r.keys(‘qxy*‘))  # 以txz開頭的key
print(r.type(‘qxy:test1‘))  # 獲取key的類型


# hash類型
r.hset(‘qxy_sessions‘, ‘q1‘, ‘1‘)  # 插入數據
r.hset(‘qxy_sessions‘, ‘q2‘, ‘2‘)
r.hset(‘qxy_sessions‘, ‘q3‘, ‘3‘)
print(r.hget(‘qxy_sessions‘, ‘q1‘).decode())  # 獲取某條數據
print(r.hgetall(‘qxy_sessions‘))  # 獲取hash類型中所有的類型
all_data = {}
for k,v in r.hgetall(‘qxy_sessions‘).items():
    k = k.decode()
    v = v.decode()
    all_data[k] = v
print(all_data)
# hash類型沒有過期時間

練習題

import redis
# 將redis中db1的數據遷移至db8中
r = redis.Redis(host=‘211.149.***.**‘, port=6379, password=‘******‘, db=1)
r_new = redis.Redis(host=‘211.149.***.**‘, port=6379, password=‘******‘, db=8)
for k in r.keys(‘‘):
    if r.type(r.keys()) == b‘string‘:  # 或者用decode()
        v = r.get(k)
        r_new.set(k, v)
        print(v.decode())
    elif r.type(r.keys()) == b‘hash‘:
        keys = r.hgetall(k)
        for kk, vv in keys.items():
            r_new.hset(k, kk, vv)
5. 開發接口
  1. mock(模擬)接口的用處

   1) 暫時代替第三方接口

   2) 輔助測試:用來代替沒有開發好的接口

      3) 查看數據

2. 需先安裝flask模塊:pip install flask

import flask
from conf import config
import json
from lib.tools import op_mysql
# import tools #  tools.op_mysql()

# 接口,後臺服務

server = flask.Flask(__name__)


@server.route(‘/get_user‘, methods=[‘get‘, ‘post‘])   # 這句話表示這個函數變身為接口
def get_all_user():
    sql = ‘select * from users;‘
    response = op_mysql(host=config.HOST, user=config.USER, password=config.PASSWORD, db=config.DBNAME, port=config.PORT,
                        charset=‘utf8‘, sql=sql)
    res = json.dumps(response, ensure_ascii=False, indent=4)
    return res


@server.route(‘/add_user‘, methods=[‘post‘])
def add_users():
    user = flask.request.values.get(‘user‘)
    passwd = flask.request.values.get(‘passwd‘)
    print(user, passwd)
    if user and passwd:
        sql = "insert into users values(‘%s‘,‘%s‘);" % (user, passwd)
        op_mysql(host=config.HOST, user=config.USER, password=config.PASSWORD, db=config.DBNAME, port=config.PORT,
                 charset=‘utf8‘, sql=sql)
        response = {‘code‘: 200, ‘msg‘: ‘操作成功‘}
    else:
        response = {‘code‘: 503, ‘msq‘: ‘必填參數未填‘}
    return json.dumps(response, ensure_ascii=False)


# host=‘0.0.0.0‘ 代表一個局域網內的所有人都可以訪問;加上debug:不需要重啟服務
server.run(port=8888, host=‘0.0.0.0‘, debug=True) 

在postman中訪問這兩個接口

技術分享圖片

技術分享圖片

第6課:datetime模塊、操作數據庫、__name__、redis、mock接口