oracle 獲取clob 放入map_python操作mysql和oracle資料庫
阿新 • • 發佈:2021-01-01
技術標籤:oracle 獲取clob 放入map
1、環境配置
安裝所需的庫cx_oracle和pymysql
安裝oracle資料庫client
instantclient-basic-windows.x64-19.8.0.0.0dbru.zip
2、資料庫工具類
import loggingimport cx_Oracleimport pymysqlclass Mysql(object): ''' 操作mysql資料庫 ''' def __init__(self, host, user, password, database=None, port=3306, **kwargs): ''' 初始化連線資料庫 :parameter huang :param host: 地址 :param user:使用者名稱 :param password:密碼 :param database:資料庫名稱 :param port:埠號 :param kwargs: 以下引數 unix_socket=None,charset='', sql_mode=None, read_default_file=None, conv=None, use_unicode=None, client_flag=0, cursorclass=Cursor, init_command=None, connect_timeout=10, ssl=None, read_default_group=None, compress=None, named_pipe=None,autocommit=False, db=None, passwd=None, local_infile=False,max_allowed_packet=16*1024*1024, defer_connect=False,auth_plugin_map=None, read_timeout=None, write_timeout=None,bind_address=None, binary_prefix=False, program_name=None,server_public_key=None ''' logging.info('連線到mysql伺服器...') self.db = pymysql.connect(host, user, password, database, port, **kwargs) logging.info('資料庫連線成功!') def insertdb(self, sql): ''' :param sql: :return: ''' cursor = self.db.cursor() # 使用cursor()方法獲取操作遊標 try: cursor.execute(sql) # 執行sql語句 self.db.commit() # 提交到資料庫執行 except Exception as e: logging.error(e) logging.error('插入資料失敗!') self.db.rollback() # 回滾 finally: cursor.close() def updatedb(self, sql): ''' :param sql: :return: ''' cursor = self.db.cursor() # 使用cursor()方法獲取操作遊標 try: cursor.execute(sql) # 執行SQL語句 self.db.commit() # 提交到資料庫執行 except: logging.error('更新資料失敗!') self.db.rollback() # 發生錯誤時回滾 finally: cursor.close() def update_db_many(self, sql, data): ''' 批量插入 :param sql: :param data:列表字典 :return: example: db = Mysql("localhost", "root", "admin123", 'ccyboa_service', port=3306) sql = ("INSERT INTO `ops_data_bank_copy`(batch_num,data_comment_1,data_decimal_1,data_datetime_5,data_int_1,is_delete) " "VALUES(%s,%s,%s,%s,%s,%s)") data = [ ('test_20111111', 'test_2019', '33004', '2019/01/02', 234, 1), ('test_2011222', 'test_2018', '33001', '2019/01/03', 235, 1), ('test_2011333', 'test_2017', '33002', '2019/01/04', 234, 1), ('test_20114441', 'test_2019', '33004', '2019/01/05', 234, 1), ] db.insert_db_many(sql,data) ''' cursor = self.db.cursor() try: cursor.executemany(sql, data) # 批量執行多條插入SQL語句 self.db.commit() # 提交事務 except Exception as e: logging.error(e) self.db.rollback() # 有異常,回滾事務 raise e finally: cursor.close() def select_db_fetchall(self, sql): ''' 資料庫查詢 :param sql: :return: ''' cursor = self.db.cursor() # sql = "SELECT * FROM ops_data_bank where batch_num like '%20191202'" try: cursor.execute(sql) # 執行sql語句 title_name = [mem[0] for mem in cursor.description] results = cursor.fetchall() # 獲取查詢的所有記錄 results = tuple([title_name] + list(results)) return results except Exception as e: logging.error(e) raise e finally: cursor.close() # 關閉連線 def select_db_fetchone(self, sql): ''' 資料庫查詢 :param sql: :return: ''' cursor = self.db.cursor() try: cursor.execute(sql) # 執行sql語句 title_name = [mem[0] for mem in cursor.description] results = cursor.fetchone() # 獲取查詢的所有記錄 return title_name, results except Exception as e: logging.error(e) raise e finally: cursor.close() # 關閉連線 def select_db_fetchmany(self, sql, size=None, *args): ''' 資料庫結果查詢 :param sql: :param size: 條目 :param args: :return: 資料元組 ''' cursor = self.db.cursor() try: cursor.execute(sql, *args) # 執行sql語句 title_name = [mem[0] for mem in cursor.description] results = cursor.fetchmany(size) # 獲取查詢的所有記錄 results = tuple([title_name] + list(results)) print(results) except Exception as e: logging.error(e) raise e finally: cursor.close() # 關閉連線 def closedb(self): ''' 關閉資料庫 :return: ''' self.db.close() def __del__(self): self.closedb() print("資料庫關閉成功,__del__物件已經銷燬")class Oracle(object): def __init__(self, userName, password, host='127.0.0.1', port='1521', serviceName='orcl'): self.db = None self.db = cx_Oracle.connect(userName, password, '%s:%s/%s' % (host, port, serviceName)) # cursor = self.db.cursor() # cursor.execute() # row = cursor.fetchall() # print(row[0]) def get_db(self): return self.db def select_db_fetchall(self, sql): ''' 資料庫查詢 :param sql: :return: ''' cursor = self.db.cursor() try: cursor.execute(sql) # 執行sql語句 title_name = [mem[0] for mem in cursor.description] results = cursor.fetchall() # 獲取查詢的所有記錄 return title_name, results except Exception as e: logging.error(e) raise e finally: cursor.close() # 關閉連線 def select_db_fetchone(self, sql): ''' 資料庫查詢 :param sql: :return: ''' cursor = self.db.cursor() try: cursor.execute(sql) # 執行sql語句 title_name = [mem[0] for mem in cursor.description] results = cursor.fetchone() # 獲取查詢的所有記錄 return title_name, results except Exception as e: logging.error(e) raise e finally: cursor.close() # 關閉連線 def select_db_fetchmany(self, sql, size=None, *args): ''' 資料庫結果查詢 :param sql: :param size: 條目 :param args: :return: 資料元組 ''' cursor = self.db.cursor() try: cursor.execute(sql, *args) # 執行sql語句 title_name = [mem[0] for mem in cursor.description] results = cursor.fetchmany(size) # 獲取查詢的所有記錄 return title_name, results except Exception as e: logging.error(e) raise e finally: cursor.close() # 關閉連線 def closedb(self): ''' 關閉資料庫 :return: ''' self.db.close() def __del__(self): self.closedb()print("資料庫關閉成功,__del__物件已經銷燬")
3、測試執行
import pandas as pdimport osimport syssys.path.append("../../..")from PythonCode.Common.CommonFunction import readJsonFile, get_appoint_dataTime, getFormatData, PATHROOTfrom PythonCode.Other.SQLOprater.SqlClass import OracletxtSQL = r'D:\TianFuAuto\PythonCode\Other\SQLOprater\config\匯出資料sql.txt'cfgSQL = r'D:\TianFuAuto\PythonCode\Other\SQLOprater\config\sqlConfig.json'first_day_of_next_month = get_appoint_dataTime('first_day_of_next_month', -1)today = getFormatData(format=1, interval=0)yesterday = getFormatData(format=3, interval=-1)class SelectDatafromOracle(): def __init__(self):dic_config=readJsonFile(cfgSQL,encoding='utf-8')#讀取配置,連結資料庫 userName = dic_config.get('userName', '') password = dic_config.get('password', '') host = dic_config.get('host', '') port = dic_config.get('port', '') serviceName = dic_config.get('serviceName', '') sql = dic_config.get('sql', '') msg_dict = dic_config.get('msg') self.conn = Oracle(userName, password, host, port, serviceName) startTime = yesterday endTime = yesterday # next_moth = '%s%s' % (first_day_of_next_month.year, first_day_of_next_month.month) next_moth = first_day_of_next_month.strftime('%Y%m') self.sql_WX_H = sql.format('H', next_moth, msg_dict['H'], startTime, endTime, msg_dict['WX']) self.sql_WX_G = sql.format('G', next_moth, msg_dict['G'], startTime, endTime, msg_dict['WX']) self.sql_WX_GD = sql.format('E', next_moth, msg_dict['E']['GD'], startTime, endTime, msg_dict['WX']) self.sql_WX_ND = sql.format('E', next_moth, msg_dict['E']['ND'], startTime, endTime, msg_dict['WX']) self.sql_ZFB_H = sql.format('H', next_moth, msg_dict['H'], startTime, endTime, msg_dict['ZFB']) self.sql_ZFB_G = sql.format('G', next_moth, msg_dict['G'], startTime, endTime, msg_dict['ZFB']) self.sql_ZFB_GD = sql.format('E', next_moth, msg_dict['E']['GD'], startTime, endTime, msg_dict['ZFB']) self.sql_ZFB_ND = sql.format('E', next_moth, msg_dict['E']['ND'], startTime, endTime, msg_dict['ZFB']) self.fileDir = r'%s\csvFiles\%s\YXXT' % (PATHROOT, today) if not os.path.exists(self.fileDir): os.makedirs(self.fileDir) def selectAndSaveWX(self): result_final = [] title, result_WX_H = self.conn.select_db_fetchall(self.sql_WX_H) print('微信供熱公司資料查詢完成,共%s條資料' % len(result_WX_H)) title, result_WX_G = self.conn.select_db_fetchall(self.sql_WX_G) print('微信供氣公司資料查詢完成,共%s條資料' % len(result_WX_G)) title, result_WX_GD = self.conn.select_db_fetchall(self.sql_WX_GD) print('微信供電公司資料查詢完成,共%s條資料' % len(result_WX_GD)) title, result_WX_ND = self.conn.select_db_fetchall(self.sql_WX_ND) print('微信農電公司資料查詢完成,共%s條資料' % len(result_WX_ND)) for result in result_WX_H + result_WX_G + result_WX_GD + result_WX_ND: result_temp = [] for msg in result: if msg == None: # 替換None值 result_temp.append('') else: result_temp.append(str(msg)) result_final.append(result_temp) fileName = os.path.join(self.fileDir, 'WX_%s.csv' % today) df = pd.DataFrame(data=result_final, columns=title) df.to_csv(fileName, sep='|', index=False) def selectAndSaveZFB(self): result_final = [] title, result_ZFB_H = self.conn.select_db_fetchall(self.sql_ZFB_H) print('支付寶供熱公司資料查詢完成,共%s條資料' % len(result_ZFB_H)) title, result_ZFB_G = self.conn.select_db_fetchall(self.sql_ZFB_G) print('支付寶供氣公司資料查詢完成,共%s條資料' % len(result_ZFB_G)) title, result_ZFB_GD = self.conn.select_db_fetchall(self.sql_ZFB_GD) print('支付寶供電公司資料查詢完成,共%s條資料' % len(result_ZFB_GD)) title, result_ZFB_ND = self.conn.select_db_fetchall(self.sql_ZFB_ND) print('支付寶農電公司資料查詢完成,共%s條資料' % len(result_ZFB_ND)) for result in result_ZFB_H + result_ZFB_G + result_ZFB_GD + result_ZFB_ND: result_temp = [] for msg in result: if msg == None: # 替換None值 result_temp.append('') else: result_temp.append(str(msg)) result_final.append(result_temp) fileName = os.path.join(self.fileDir, 'ZFB_%s.csv' % today) df = pd.DataFrame(data=result_final, columns=title) df.to_csv(fileName, sep='|', index=False)if __name__ == '__main__': sdfo = SelectDatafromOracle() sdfo.selectAndSaveWX() sdfo.selectAndSaveZFB()