1. 程式人生 > >python + requests實現的介面自動化框架

python + requests實現的介面自動化框架

1、首先,我們先來理一下思路。

正常的介面測試流程是什麼?

腦海裡的反應是不是這樣的:

確定測試介面的工具 —> 配置需要的介面引數 —> 進行測試 —> 檢查測試結果(有的需要資料庫輔助) —> 生成測試報告(html報告)

那麼,我們就根據這樣的過程來一步步搭建我們的框架。在這個過程中,我們需要做到業務和資料的分離,這樣才能靈活,達到我們寫框架的目的。只要好好做,一定可以成功。這也是我當初對自己說的。

接下來,我們來進行結構的劃分。

我的結構是這樣的,大家可以參考下:

common:存放一些共通的方法

result:執行過程中生成的資料夾,裡面存放每次測試的結果

testCase:用於存放具體的測試case

estFile:存放測試過程中用到的檔案,包括上傳的檔案,測試用例以及     資料庫的sql語句

caselist:txt檔案,配置每次執行的case名稱

config:配置一些常量,例如資料庫的相關資訊,介面的相關資訊等

readConfig: 用於讀取config配置檔案中的內容

runAll:用於執行case

既然整體結構有了劃分,接下來就該一步步的填充整個框架了,首先,我們先來看看config.ini和readConfig.py兩個檔案,從他們入手,個人覺得比較容易走下去噠。

我們來看下檔案的內容是什麼樣子的:

[DATABASE]
host = 50.23.190.57
username = xxxxxx
password = ******
port = 3306
database = databasename

[HTTP]
# 介面的url
baseurl = http://xx.xxxx.xx 
port = 8080
timeout = 1.0

[EMAIL]
mail_host = smtp.163.com
mail_user = 
[email protected]
mail_pass = ********* mail_port = 25 sender = [email protected] receiver = [email protected]/[email protected] subject = python content = "All interface test has been complited\nplease read the report file about the detile of result in the attachment." testuser = Someone on_off = 1

相信大家都知道這樣的配置檔案,沒錯,所有一成不變的東西,我們都可以放到這裡來。哈哈,怎麼樣,不錯吧。

現在,我們已經做好了固定的“倉庫”。來儲存我們平時不動的東西,那麼,我們要怎麼把它拿出來為我所用呢?這時候,readConfig.py檔案出世了,它成功的幫我們解決了這個問題,下面就讓我們來一睹它的廬山真面目吧。

import os
import codecs
import configparser

proDir = os.path.split(os.path.realpath(__file__))[0]
configPath = os.path.join(proDir, "config.ini")

class ReadConfig:
    def __init__(self):
        fd = open(configPath)
        data = fd.read()

        #  remove BOM
        if data[:3] == codecs.BOM_UTF8:
            data = data[3:]
            file = codecs.open(configPath, "w")
            file.write(data)
            file.close()
        fd.close()

        self.cf = configparser.ConfigParser()
        self.cf.read(configPath)

    def get_email(self, name):
        value = self.cf.get("EMAIL", name)
        return value

    def get_http(self, name):
        value = self.cf.get("HTTP", name)
        return value

    def get_db(self, name):
        value = self.cf.get("DATABASE", name)
        return value

話不多說,我們先來看下common到底有哪些東西。

既然配置檔案和讀取配置檔案我們都已經完成了,也看到了common裡的內容,接下來就可以寫common裡的共通方法了,從哪個下手呢?今天,我們就來翻“Log.py”的牌吧,因為它是比較獨立的,我們單獨跟他打交道,也為了以後它能為我們服務打下良好基礎。

這裡呢,我想跟大家多說兩句,對於這個log檔案呢,我給它單獨啟用了一個執行緒,這樣在整個執行過程中,我們在寫log的時候也會比較方便,看名字大家也知道了,這裡就是我們對輸出的日誌的所有操作了,主要是對輸出格式的規定,輸出等級的定義以及其他一些輸出的定義等等。總之,你想對log做的任何事情,都可以放到這裡來。我們來看下程式碼,沒有比這個更直接有效的了。

import logging
from datetime import datetime
import threading

首先,我們要像上面那樣,引入需要的模組,才能進行接下來的操作。

class Log:
    def __init__(self):
        global logPath, resultPath, proDir
        proDir = readConfig.proDir
        resultPath = os.path.join(proDir, "result")
        # create result file if it doesn't exist
        if not os.path.exists(resultPath):
            os.mkdir(resultPath)
        # defined test result file name by localtime
        logPath = os.path.join(resultPath, str(datetime.now().strftime("%Y%m%d%H%M%S")))
        # create test result file if it doesn't exist
        if not os.path.exists(logPath):
            os.mkdir(logPath)
        # defined logger
        self.logger = logging.getLogger()
        # defined log level
        self.logger.setLevel(logging.INFO)

        # defined handler
        handler = logging.FileHandler(os.path.join(logPath, "output.log"))
        # defined formatter
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        # defined formatter
        handler.setFormatter(formatter)
        # add handler
        self.logger.addHandler(handler)

現在,我們建立了上面的Log類,在__init__初始化方法中,我們進行了log的相關初始化操作。

class MyLog:
    log = None
    mutex = threading.Lock()

    def __init__(self):
        pass

    @staticmethod
    def get_log():

        if MyLog.log is None:
            MyLog.mutex.acquire()
            MyLog.log = Log()
            MyLog.mutex.release()

        return MyLog.log

我們繼續搭建,這次要做的,是configHttp.py的內容,下面是介面檔案中主要部分的內容,讓我們一起來看看吧。

import requests
import readConfig as readConfig
from common.Log import MyLog as Log

localReadConfig = readConfig.ReadConfig()

class ConfigHttp:
    def __init__(self):
        global host, port, timeout
        host = localReadConfig.get_http("baseurl")
        port = localReadConfig.get_http("port")
        timeout = localReadConfig.get_http("timeout")
        self.log = Log.get_log()
        self.logger = self.log.get_logger()
        self.headers = {}
        self.params = {}
        self.data = {}
        self.url = None
        self.files = {}

    def set_url(self, url):
        self.url = host + url

    def set_headers(self, header):
        self.headers = header

    def set_params(self, param):
        self.params = param

    def set_data(self, data):
        self.data = data

    def set_files(self, file):
        self.files = file

    # defined http get method
    def get(self):
        try:
            response = requests.get(self.url, params=self.params, headers=self.headers, timeout=float(timeout))
            # response.raise_for_status()
            return response
        except TimeoutError:
            self.logger.error("Time out!")
            return None

    # defined http post method
    def post(self):
        try:
            response = requests.post(self.url, headers=self.headers, data=self.data, files=self.files, timeout=float(timeout))
            # response.raise_for_status()
            return response
        except TimeoutError:
            self.logger.error("Time out!")
            return None
  • get方法

        介面測試中見到最多的就是get方法和post方法,其中,get方法用於獲取介面的測試,說白了,就是說,使用get的介面,都不會對後臺資料進行更改,而且get方法在傳遞引數後,url的格式是這樣的:http://介面地址?key1=value1&key2=value2

對於requests提供的get方法,有幾個常用的引數:

url:顯而易見,就是介面的地址url啦

headers:定製請求頭(headers),例如:content-type = application/x-www-form-urlencoded

params:用於傳遞測試介面所要用的引數,這裡我們用python中的字典形式(key:value)進行引數的傳遞。

timeout:設定介面連線的最大時間(超過該時間會丟擲超時錯誤)

url=‘http://api.shein.com/v2/member/logout’
header={‘content-type’: application/x-www-form-urlencoded}
param={‘user_id’: 123456,‘email’: [email protected]}
timeout=0.5
requests.get(url, headers=header, params=param, timeout=timeout)
  • post方法

        與get方法類似,只要設定好對應的引數,就可以了。下面就直接舉個栗子,直接上程式碼吧:

url=‘http://api.shein.com/v2/member/login’
header={‘content-type’: application/x-www-form-urlencoded}
data={‘email’: [email protected],‘password’: 123456}
timeout=0.5
requests.post(url, headers=header, data=data, timeout=timeout)

怎麼樣,是不是也很簡單啊。這裡我們需要說明一下,post方法中的引數,我們不在使用params進行傳遞,而是改用data進行傳遞了。

依然只說常用的返回值的操作。

text:獲取介面返回值的文字格式

json():獲取介面返回值的json()格式

status_code:返回狀態碼(成功為:200)

headers:返回完整的請求頭資訊(headers['name']:返回指定的headers內容)

encoding:返回字元編碼格式

url:返回介面的完整url地址

以上這些,就是常用的方法啦,大家可自行取之。

關於失敗請求丟擲異常,我們可以使用“raise_for_status()”來完成,那麼,當我們的請求發生錯誤時,就會丟擲異常。在這裡提醒下各位朋友,如果你的介面,在地址不正確的時候,會有相應的錯誤提示(有時也需要進行測試),這時,千萬不能使用這個方法來丟擲錯誤,因為python自己在連結介面時就已經把錯誤丟擲,那麼,後面你將無法測試期望的內容。而且程式會直接在這裡當掉,以錯誤來計。

common.py裡的內容。

import os
from xlrd import open_workbook
from xml.etree import ElementTree as ElementTree
from common.Log import MyLog as Log

localConfigHttp = configHttp.ConfigHttp()
log = Log.get_log()
logger = log.get_logger()

# 從excel檔案中讀取測試用例
def get_xls(xls_name, sheet_name):
    cls = []
    # get xls file's path
    xlsPath = os.path.join(proDir, "testFile", xls_name)
    # open xls file
    file = open_workbook(xlsPath)
    # get sheet by name
    sheet = file.sheet_by_name(sheet_name)
    # get one sheet's rows
    nrows = sheet.nrows
    for i in range(nrows):
        if sheet.row_values(i)[0] != u'case_name':
            cls.append(sheet.row_values(i))
    return cls

# 從xml檔案中讀取sql語句
database = {}
def set_xml():
    if len(database) == 0:
        sql_path = os.path.join(proDir, "testFile", "SQL.xml")
        tree = ElementTree.parse(sql_path)
        for db in tree.findall("database"):
            db_name = db.get("name")
            # print(db_name)
            table = {}
            for tb in db.getchildren():
                table_name = tb.get("name")
                # print(table_name)
                sql = {}
                for data in tb.getchildren():
                    sql_id = data.get("id")
                    # print(sql_id)
                    sql[sql_id] = data.text
                table[table_name] = sql
            database[db_name] = table

def get_xml_dict(database_name, table_name):
    set_xml()
    database_dict = database.get(database_name).get(table_name)
    return database_dict

def get_sql(database_name, table_name, sql_id):
    db = get_xml_dict(database_name, table_name)
    sql = db.get(sql_id)
    return sql

 common的兩大主要內容:

  1. 我們利用xml.etree.Element來對xml檔案進行操作,然後通過我們自定義的方法,根據傳遞不同的引數取得不(想)同(要)的值。
  2. 利用xlrd來操作excel檔案,注意啦,我們是用excel檔案來管理測試用例的 

excel檔案:

 xml檔案: 

資料庫和傳送郵件(也可根據需要,不寫該部分內容):

import pymysql
import readConfig as readConfig
from common.Log import MyLog as Log

localReadConfig = readConfig.ReadConfig()

class MyDB:
    global host, username, password, port, database, config
    host = localReadConfig.get_db("host")
    username = localReadConfig.get_db("username")
    password = localReadConfig.get_db("password")
    port = localReadConfig.get_db("port")
    database = localReadConfig.get_db("database")
    config = {
        'host': str(host),
        'user': username,
        'passwd': password,
        'port': int(port),
        'db': database
    }

    def __init__(self):
        self.log = Log.get_log()
        self.logger = self.log.get_logger()
        self.db = None
        self.cursor = None

    def connectDB(self):
        try:
            # connect to DB
            self.db = pymysql.connect(**config)
            # create cursor
            self.cursor = self.db.cursor()
            print("Connect DB successfully!")
        except ConnectionError as ex:
            self.logger.error(str(ex))

    def executeSQL(self, sql, params):
        self.connectDB()
        # executing sql
        self.cursor.execute(sql, params)
        # executing by committing to DB
        self.db.commit()
        return self.cursor

    def get_all(self, cursor):
        value = cursor.fetchall()
        return value

    def get_one(self, cursor):
        value = cursor.fetchone()
        return value

    def closeDB(self):
        self.db.close()
        print("Database closed!")

郵件程式碼:

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime
import threading
import readConfig as readConfig
from common.Log import MyLog
import zipfile
import glob

localReadConfig = readConfig.ReadConfig()

class Email:
    def __init__(self):
        global host, user, password, port, sender, title, content
        host = localReadConfig.get_email("mail_host")
        user = localReadConfig.get_email("mail_user")
        password = localReadConfig.get_email("mail_pass")
        port = localReadConfig.get_email("mail_port")
        sender = localReadConfig.get_email("sender")
        title = localReadConfig.get_email("subject")
        content = localReadConfig.get_email("content")
        self.value = localReadConfig.get_email("receiver")
        self.receiver = []
        # get receiver list
        for n in str(self.value).split("/"):
            self.receiver.append(n)
        # defined email subject
        date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        self.subject = title + " " + date
        self.log = MyLog.get_log()
        self.logger = self.log.get_logger()
        self.msg = MIMEMultipart('mixed')

    def config_header(self):
        self.msg['subject'] = self.subject
        self.msg['from'] = sender
        self.msg['to'] = ";".join(self.receiver)

    def config_content(self):
        content_plain = MIMEText(content, 'plain', 'utf-8')
        self.msg.attach(content_plain)

    def config_file(self):
        # if the file content is not null, then config the email file
        if self.check_file():

            reportpath = self.log.get_result_path()
            zippath = os.path.join(readConfig.proDir, "result", "test.zip")
            # zip file
            files = glob.glob(reportpath + '\*')
            f = zipfile.ZipFile(zippath, 'w', zipfile.ZIP_DEFLATED)
            for file in files:
                f.write(file)
            f.close()

            reportfile = open(zippath, 'rb').read()
            filehtml = MIMEText(reportfile, 'base64', 'utf-8')
            filehtml['Content-Type'] = 'application/octet-stream'
            filehtml['Content-Disposition'] = 'attachment; filename="test.zip"'
            self.msg.attach(filehtml)

    def check_file(self):
        reportpath = self.log.get_report_path()
        if os.path.isfile(reportpath) and not os.stat(reportpath) == 0:
            return True
        else:
            return False

    def send_email(self):
        self.config_header()
        self.config_content()
        self.config_file()
        try:
            smtp = smtplib.SMTP()
            smtp.connect(host)
            smtp.login(user, password)
            smtp.sendmail(sender, self.receiver, self.msg.as_string())
            smtp.quit()
            self.logger.info("The test report has send to developer by email.")
        except Exception as ex:
            self.logger.error(str(ex))

class MyEmail:
    email = None
    mutex = threading.Lock()

    def __init__(self):
        pass

    @staticmethod
    def get_email():

        if MyEmail.email is None:
            MyEmail.mutex.acquire()
            MyEmail.email = Email()
            MyEmail.mutex.release()
        return MyEmail.email


if __name__ == "__main__":
    email = MyEmail.get_email()

 入口程式碼:

import unittest
import HTMLTestRunner

    def set_case_list(self):
        fb = open(self.caseListFile)
        for value in fb.readlines():
            data = str(value)
            if data != '' and not data.startswith("#"):
                self.caseList.append(data.replace("\n", ""))
        fb.close()

    def set_case_suite(self):
        self.set_case_list()
        test_suite = unittest.TestSuite()
        suite_model = []

        for case in self.caseList:
            case_file = os.path.join(readConfig.proDir, "testCase")
            print(case_file)
            case_name = case.split("/")[-1]
            print(case_name+".py")
            discover = unittest.defaultTestLoader.discover(case_file, pattern=case_name + '.py', top_level_dir=None)
            suite_model.append(discover)

        if len(suite_model) > 0:
            for suite in suite_model:
                for test_name in suite:
                    test_suite.addTest(test_name)
        else:
            return None
        return test_suite

    def run(self):
        try:
            suit = self.set_case_suite()
            if suit is not None:
                logger.info("********TEST START********")
                fp = open(resultPath, 'wb')
                runner = HTMLTestRunner.HTMLTestRunner(stream=fp, title='Test Report', description='Test Description')
                runner.run(suit)
            else:
                logger.info("Have no case to test.")
        except Exception as ex:
            logger.error(str(ex))
        finally:
            logger.info("*********TEST END*********")
            # send test report by email
            if int(on_off) == 0:
                self.email.send_email()
            elif int(on_off) == 1:
                logger.info("Doesn't send report email to developer.")
            else:
                logger.info("Unknow state.")

   

result資料夾會在首次執行case時生成,並且以後的測試結果都會被儲存在該資料夾下,同時每次測試的資料夾都是用系統時間命名,裡面包含了兩個檔案,log檔案和測試報告。

testCase資料夾下,存放我們寫的具體的測試case。所有的case名稱都要以test開頭來命名,這是因為,unittest在進行測試時會自動匹配testCase資料夾下面所有test開頭的.py檔案

 testFile資料夾下,放置我們測試時用來管理測試用例的excel檔案和用於資料庫查詢的sql語句的xml檔案。

最後就是caselist.txt檔案了