1. 程式人生 > 其它 >oracle 獲取clob 放入map_python操作mysql和oracle資料庫

oracle 獲取clob 放入map_python操作mysql和oracle資料庫

技術標籤:oracle 獲取clob 放入map

1、環境配置

安裝所需的庫cx_oracle和pymysql

ca18165723edae0dc7169c03901743fa.png

安裝oracle資料庫client

instantclient-basic-windows.x64-19.8.0.0.0dbru.zip

2、資料庫工具類

import loggingimport cx_Oracleimport pymysqlclass Mysql(object):    '''    操作mysql資料庫    '''    def __init__(self, host, user, password, database=None, port=3306, **kwargs):        '''        初始化連線資料庫        :parameter huang        :param host: 地址        :param user:使用者名稱        :param password:密碼        :param database:資料庫名稱        :param port:埠號        :param kwargs: 以下引數                 unix_socket=None,charset='', sql_mode=None,                 read_default_file=None, conv=None, use_unicode=None,                 client_flag=0, cursorclass=Cursor, init_command=None,                 connect_timeout=10, ssl=None, read_default_group=None,                 compress=None, named_pipe=None,autocommit=False, db=None,                 passwd=None, local_infile=False,max_allowed_packet=16*1024*1024,                 defer_connect=False,auth_plugin_map=None, read_timeout=None,                 write_timeout=None,bind_address=None, binary_prefix=False,                 program_name=None,server_public_key=None        '''        logging.info('連線到mysql伺服器...')        self.db = pymysql.connect(host, user, password, database, port, **kwargs)        logging.info('資料庫連線成功!')    def insertdb(self, sql):        '''        :param sql:        :return:        '''        cursor = self.db.cursor()  # 使用cursor()方法獲取操作遊標        try:            cursor.execute(sql)  # 執行sql語句            self.db.commit()  # 提交到資料庫執行        except Exception as e:            logging.error(e)            logging.error('插入資料失敗!')            self.db.rollback()  # 回滾        finally:            cursor.close()    def updatedb(self, sql):        '''        :param sql:        :return:        '''        cursor = self.db.cursor()  # 使用cursor()方法獲取操作遊標        try:            cursor.execute(sql)  # 執行SQL語句            self.db.commit()  # 提交到資料庫執行        except:            logging.error('更新資料失敗!')            self.db.rollback()  # 發生錯誤時回滾        finally:            cursor.close()    def update_db_many(self, sql, data):        '''            批量插入        :param sql:        :param data:列表字典        :return:            example:                db = Mysql("localhost", "root", "admin123", 'ccyboa_service', port=3306)                sql = ("INSERT INTO `ops_data_bank_copy`(batch_num,data_comment_1,data_decimal_1,data_datetime_5,data_int_1,is_delete) "                       "VALUES(%s,%s,%s,%s,%s,%s)")                data = [                    ('test_20111111', 'test_2019', '33004', '2019/01/02', 234, 1),                    ('test_2011222', 'test_2018', '33001', '2019/01/03', 235, 1),                    ('test_2011333', 'test_2017', '33002', '2019/01/04', 234, 1),                    ('test_20114441', 'test_2019', '33004', '2019/01/05', 234, 1),                ]                db.insert_db_many(sql,data)        '''        cursor = self.db.cursor()        try:            cursor.executemany(sql, data)  # 批量執行多條插入SQL語句            self.db.commit()  # 提交事務        except Exception as e:            logging.error(e)            self.db.rollback()  # 有異常,回滾事務            raise e        finally:            cursor.close()    def select_db_fetchall(self, sql):        '''        資料庫查詢        :param sql:        :return:        '''        cursor = self.db.cursor()        # sql = "SELECT * FROM ops_data_bank where batch_num like '%20191202'"        try:            cursor.execute(sql)  # 執行sql語句            title_name = [mem[0] for mem in cursor.description]            results = cursor.fetchall()  # 獲取查詢的所有記錄            results = tuple([title_name] + list(results))            return results        except Exception as e:            logging.error(e)            raise e        finally:            cursor.close()  # 關閉連線    def select_db_fetchone(self, sql):        '''        資料庫查詢        :param sql:        :return:        '''        cursor = self.db.cursor()        try:            cursor.execute(sql)  # 執行sql語句            title_name = [mem[0] for mem in cursor.description]            results = cursor.fetchone()  # 獲取查詢的所有記錄            return title_name, results        except Exception as e:            logging.error(e)            raise e        finally:            cursor.close()  # 關閉連線    def select_db_fetchmany(self, sql, size=None, *args):        '''        資料庫結果查詢        :param sql:        :param size: 條目        :param args:        :return: 資料元組        '''        cursor = self.db.cursor()        try:            cursor.execute(sql, *args)  # 執行sql語句            title_name = [mem[0] for mem in cursor.description]            results = cursor.fetchmany(size)  # 獲取查詢的所有記錄            results = tuple([title_name] + list(results))            print(results)        except Exception as e:            logging.error(e)            raise e        finally:            cursor.close()  # 關閉連線    def closedb(self):        '''        關閉資料庫        :return:        '''        self.db.close()    def __del__(self):        self.closedb()        print("資料庫關閉成功,__del__物件已經銷燬")class Oracle(object):    def __init__(self, userName, password, host='127.0.0.1', port='1521', serviceName='orcl'):        self.db = None        self.db = cx_Oracle.connect(userName,                                    password,                                    '%s:%s/%s' % (host, port, serviceName))        # cursor = self.db.cursor()        # cursor.execute()        # row = cursor.fetchall()        # print(row[0])    def get_db(self):        return self.db    def select_db_fetchall(self, sql):        '''        資料庫查詢        :param sql:        :return:        '''        cursor = self.db.cursor()        try:            cursor.execute(sql)  # 執行sql語句            title_name = [mem[0] for mem in cursor.description]            results = cursor.fetchall()  # 獲取查詢的所有記錄            return title_name, results        except Exception as e:            logging.error(e)            raise e        finally:            cursor.close()  # 關閉連線    def select_db_fetchone(self, sql):        '''        資料庫查詢        :param sql:        :return:        '''        cursor = self.db.cursor()        try:            cursor.execute(sql)  # 執行sql語句            title_name = [mem[0] for mem in cursor.description]            results = cursor.fetchone()  # 獲取查詢的所有記錄            return title_name, results        except Exception as e:            logging.error(e)            raise e        finally:            cursor.close()  # 關閉連線    def select_db_fetchmany(self, sql, size=None, *args):        '''        資料庫結果查詢        :param sql:        :param size: 條目        :param args:        :return: 資料元組        '''        cursor = self.db.cursor()        try:            cursor.execute(sql, *args)  # 執行sql語句            title_name = [mem[0] for mem in cursor.description]            results = cursor.fetchmany(size)  # 獲取查詢的所有記錄            return title_name, results        except Exception as e:            logging.error(e)            raise e        finally:            cursor.close()  # 關閉連線    def closedb(self):        '''        關閉資料庫        :return:        '''        self.db.close()    def __del__(self):        self.closedb()print("資料庫關閉成功,__del__物件已經銷燬")

3、測試執行

import pandas as pdimport osimport syssys.path.append("../../..")from PythonCode.Common.CommonFunction import readJsonFile, get_appoint_dataTime, getFormatData, PATHROOTfrom PythonCode.Other.SQLOprater.SqlClass import OracletxtSQL = r'D:\TianFuAuto\PythonCode\Other\SQLOprater\config\匯出資料sql.txt'cfgSQL = r'D:\TianFuAuto\PythonCode\Other\SQLOprater\config\sqlConfig.json'first_day_of_next_month = get_appoint_dataTime('first_day_of_next_month', -1)today = getFormatData(format=1, interval=0)yesterday = getFormatData(format=3, interval=-1)class SelectDatafromOracle():    def __init__(self):dic_config=readJsonFile(cfgSQL,encoding='utf-8')#讀取配置,連結資料庫        userName = dic_config.get('userName', '')        password = dic_config.get('password', '')        host = dic_config.get('host', '')        port = dic_config.get('port', '')        serviceName = dic_config.get('serviceName', '')        sql = dic_config.get('sql', '')        msg_dict = dic_config.get('msg')        self.conn = Oracle(userName, password, host, port, serviceName)        startTime = yesterday        endTime = yesterday        # next_moth = '%s%s' % (first_day_of_next_month.year, first_day_of_next_month.month)        next_moth = first_day_of_next_month.strftime('%Y%m')        self.sql_WX_H = sql.format('H', next_moth, msg_dict['H'], startTime, endTime, msg_dict['WX'])        self.sql_WX_G = sql.format('G', next_moth, msg_dict['G'], startTime, endTime, msg_dict['WX'])        self.sql_WX_GD = sql.format('E', next_moth, msg_dict['E']['GD'], startTime, endTime, msg_dict['WX'])        self.sql_WX_ND = sql.format('E', next_moth, msg_dict['E']['ND'], startTime, endTime, msg_dict['WX'])        self.sql_ZFB_H = sql.format('H', next_moth, msg_dict['H'], startTime, endTime, msg_dict['ZFB'])        self.sql_ZFB_G = sql.format('G', next_moth, msg_dict['G'], startTime, endTime, msg_dict['ZFB'])        self.sql_ZFB_GD = sql.format('E', next_moth, msg_dict['E']['GD'], startTime, endTime, msg_dict['ZFB'])        self.sql_ZFB_ND = sql.format('E', next_moth, msg_dict['E']['ND'], startTime, endTime, msg_dict['ZFB'])        self.fileDir = r'%s\csvFiles\%s\YXXT' % (PATHROOT, today)        if not os.path.exists(self.fileDir):            os.makedirs(self.fileDir)    def selectAndSaveWX(self):        result_final = []        title, result_WX_H = self.conn.select_db_fetchall(self.sql_WX_H)        print('微信供熱公司資料查詢完成,共%s條資料' % len(result_WX_H))        title, result_WX_G = self.conn.select_db_fetchall(self.sql_WX_G)        print('微信供氣公司資料查詢完成,共%s條資料' % len(result_WX_G))        title, result_WX_GD = self.conn.select_db_fetchall(self.sql_WX_GD)        print('微信供電公司資料查詢完成,共%s條資料' % len(result_WX_GD))        title, result_WX_ND = self.conn.select_db_fetchall(self.sql_WX_ND)        print('微信農電公司資料查詢完成,共%s條資料' % len(result_WX_ND))        for result in result_WX_H + result_WX_G + result_WX_GD + result_WX_ND:            result_temp = []            for msg in result:                if msg == None:  # 替換None值                    result_temp.append('')                else:                    result_temp.append(str(msg))            result_final.append(result_temp)        fileName = os.path.join(self.fileDir, 'WX_%s.csv' % today)        df = pd.DataFrame(data=result_final, columns=title)        df.to_csv(fileName, sep='|', index=False)    def selectAndSaveZFB(self):        result_final = []        title, result_ZFB_H = self.conn.select_db_fetchall(self.sql_ZFB_H)        print('支付寶供熱公司資料查詢完成,共%s條資料' % len(result_ZFB_H))        title, result_ZFB_G = self.conn.select_db_fetchall(self.sql_ZFB_G)        print('支付寶供氣公司資料查詢完成,共%s條資料' % len(result_ZFB_G))        title, result_ZFB_GD = self.conn.select_db_fetchall(self.sql_ZFB_GD)        print('支付寶供電公司資料查詢完成,共%s條資料' % len(result_ZFB_GD))        title, result_ZFB_ND = self.conn.select_db_fetchall(self.sql_ZFB_ND)        print('支付寶農電公司資料查詢完成,共%s條資料' % len(result_ZFB_ND))        for result in result_ZFB_H + result_ZFB_G + result_ZFB_GD + result_ZFB_ND:            result_temp = []            for msg in result:                if msg == None:  # 替換None值                    result_temp.append('')                else:                    result_temp.append(str(msg))            result_final.append(result_temp)        fileName = os.path.join(self.fileDir, 'ZFB_%s.csv' % today)        df = pd.DataFrame(data=result_final, columns=title)        df.to_csv(fileName, sep='|', index=False)if __name__ == '__main__':    sdfo = SelectDatafromOracle()    sdfo.selectAndSaveWX()    sdfo.selectAndSaveZFB()