1. 程式人生 > 資料庫 >python3實現從kafka獲取資料,並解析為json格式,寫入到mysql中

python3實現從kafka獲取資料,並解析為json格式,寫入到mysql中

專案需求:將kafka解析來的日誌獲取到資料庫的變更記錄,按照訂單的級別和訂單明細級別寫入資料庫,一條訂單的所有資訊包括各種維度資訊均儲存在一條json中,寫入mysql5.7中。

配置資訊:

[Global]
kafka_server=xxxxxxxxxxx:9092
kafka_topic=mes
consumer_group=test100
passwd = tracking
port = 3306
host = xxxxxxxxxx
user = track
schema = track
dd_socket =
dd_host = xxxxxxxxxxxx
dd_port = 3306
dd_user = xxxxxxxxx
dd_passwd = xxxxxxxx

程式碼又長又醜,半吊子,只完成了面向過程的程式設計,沒做到物件,將就看,有問題可以聯絡我

程式碼:

#encoding=utf-8
import datetime
import configparser
import re
import pymysql
from vertica_python import connect
import vertica_python
import json
from confluent_kafka import Consumer,KafkaError
import csv
import logging
import os
import time
import signal
import sys

#寫日誌
logging.basicConfig(filename=os.path.join(os.getcwd(),'log_tracking.txt'),level=logging.WARN,filemode='a',format='%(asctime)s - %(levelname)s: %(message)s')

def writeErrorLog(errSrc,errType,errMsg):
 try:
  v_log_file = 'err_tracking.log';
  v_file = open(v_log_file,'a')
  v_file.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType +" : " + errMsg + '\n')
  v_file.flush()
 except Exception as data:
  v_err_file = open('err_tracking.log','a')
  v_err_file.write(str(data) + '\n')
  v_err_file.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType + " : " + errMsg + '\n')
  v_err_file.flush()
  v_err_file.close()
 finally:
  v_file.close()

class RH_Consumer:
#讀取配置檔案的配置資訊,並初始化一些類需要的變數
 def __init__(self):
  self.config = configparser.ConfigParser()
  self.config.read('config.ini')
  self.host = self.config.get('Global','host')
  self.user = self.config.get('Global','user')
  self.passwd = self.config.get('Global','passwd')
  self.schema = self.config.get('Global','schema')
  self.port = int(self.config.get('Global','port'))
  self.kafka_server = self.config.get('Global','kafka_server')
  self.kafka_topic = self.config.get('Global','kafka_topic')
  self.consumer_group = self.config.get('Global','consumer_group')
  self.dd_host = self.config.get('Global','dd_host')
  self.dd_user = self.config.get('Global','dd_user')
  self.dd_passwd = self.config.get('Global','dd_passwd')
  self.dd_port = int(self.config.get('Global','dd_port'))
  self.dd_socket = self.config.get('Global','dd_socket')
  self.operation_time = datetime.datetime.now()
  self.stop_flag = 0
  self.src_table_name = []
  self.__init_db()
  self.__init_mes_db()
  self._get_all_src_table()
#連線寫入目標資料庫
 def __init_db(self):
  try:
   self.conn_info = {'host': self.host,'port': self.port,'user': self.user,'password': self.passwd,'db': 'tracking'}
   self.mysql_db = pymysql.connect(**self.conn_info,charset="utf8" )
   self.mysql_cur = self.mysql_db.cursor()
  except Exception as data:
   writeErrorLog('__init_db','Error',str(data))
#連線生產資料庫,用於獲取相關維度資訊
 def __init_mes_db(self):
  try:
   self.mes_mysql_db = pymysql.connect(host=self.dd_host,user=self.dd_user,passwd=self.dd_passwd,port=self.dd_port,unix_socket=self.dd_socket,charset="utf8")
   self.mes_mysql_cur = self.mes_mysql_db.cursor()
  except Exception as data:
   writeErrorLog('__init_db',str(data))

#關閉資料庫
 def _release_db(self):
   self.mysql_cur.close()
   self.mysql_db.close()
   self.mes_mysql_cur.close()
   self.mes_mysql_db.close()
#獲取所有的配置表資訊(需要獲取的表)
 def _get_all_src_table(self):
  try:
   # 獲取table的資訊
   select_src_table_names = "select distinct src_table_name from tracking.tracking_table_mapping_rule"
   self.mysql_cur.execute(select_src_table_names)
   rows = self.mysql_cur.fetchall()
   for item in rows:
    self.src_table_name.append(item[0])
   return self.src_table_name
  except Exception as data:
   writeErrorLog('_get_all_src_table',str(data))
   logging.error('_get_all_src_table: ' + str(data))
#獲取src表的目標表資訊
 def _get_tgt_table_name(self,table_name,table_schema):
  try:
   # 獲取table的資訊(table_name是schema|tablename)
   select_tgt_table_names = "select distinct tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and src_table_schema = '%s'" %(table_name,table_schema)
   self.mysql_cur.execute(select_tgt_table_names)
   rows = self.mysql_cur.fetchall()
   tgt_table_names=[]
   for item in rows:
    tgt_table_names.append(item[0])
   return tgt_table_names
  except Exception as data:
   writeErrorLog('_get_tgt_table_name',str(data))
   logging.error('_get_tgt_table_name: ' + str(data))
# 根據獲取到輸入的table_name,讀取表的配置資訊 會以json格式返回獲取到的資料
 def _get_config(self,tgt_table_name,table_schema):
  try:
   # 獲取table的資訊(table_name是schema|tablename)
   select_table_config = "select coalesce( src_system,'' ) as src_system,coalesce ( src_table_schema,'' ) as src_table_schema,coalesce ( src_table_name,'' ) as src_table_name,coalesce ( tgt_operation,'{}' ) as tgt_operation,active_flag,coalesce ( tgt_system,'' ) as tgt_system,coalesce ( tgt_table_schema,'' ) as tgt_table_schema,coalesce ( tgt_table_name,'' ) as tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and tgt_table_name='%s' and src_table_schema = '%s' " %(table_name,table_schema)
   self.mysql_cur.execute(select_table_config)
   rows = self.mysql_cur.fetchall()
   for item in rows:
    self.src_system = item[0]
    self.src_table_schema = item[1]
    self.src_table_name = item[2]
    self.tgt_operation = item[3]
    self.active_flag = item[4]
    self.tgt_system = item[5]
    self.tgt_table_schema = item[6]
    self.tgt_table_name = item[7]
   #解析出self.tgt_operation 中以後所需要的資料
   self.tgt_operation = eval(self.tgt_operation)
   result_data = {'src_system':self.src_system,'src_table_schema':self.src_table_schema,'src_table_name':self.src_table_name,'tgt_operation':self.tgt_operation,'active_flag':self.active_flag,'tgt_system': self.tgt_system,'tgt_table_schema': self.tgt_table_schema,'tgt_table_name': self.tgt_table_name,#解析出來的self.tgt_operation裡的資訊
       'source_primary_key': self.tgt_operation['source_primary_key'],'source_all_column': self.tgt_operation['source_all_column'],'target_primary_key': self.tgt_operation['target_primary_key'],'target_column': self.tgt_operation['target_column'],'source_level': self.tgt_operation['source_level'] }
   return result_data
  except Exception as data:
   writeErrorLog('_get_config',str(data)+':table is not available')
   logging.error('_get_config: ' + str(data))


#主方法的入口
 def _do(self):
  try:
   #配置consumer的資訊,可以配置很多其他資訊
   c = Consumer({
    'bootstrap.servers': self.kafka_server,'group.id': self.consumer_group,'default.topic.config': {
     'auto.offset.reset': 'smallest','enable.auto.commit': False}
   })
   #定義消費kafka中的主題
   c.subscribe([self.kafka_topic])
   while True:
    msg = c.poll(1.0)
    if msg is None:
     continue
    if msg.error():
     if msg.error().code() == KafkaError._PARTITION_EOF:
      continue
     else:
      print(msg.error())
      break
    text = msg.value().decode(encoding="utf-8")
   
   # kfk_text = eval(text)
    kfk_text = json.loads(text)
   #此處判斷kfk資料是否在配置表中,如果在則進行下一步,如果不在則忽略
    #新增異常處理目的是為了如果這條資料寫入有問題,就不commit,方便下次處理還可以繼續消費
    try:
     kfk_table = kfk_text['table']
     if kfk_table in ['order_mails'] :
      print(type(text),text)
      logging.warning('-------------- start exec table time : ' + str(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()))+'---------------------')
      kfk_text = str(kfk_text)
      kfk_text = kfk_text.replace(": None",": ''")
      kfk_text = eval(kfk_text)
      kfk_datas = kfk_text['data']
      kfk_type = kfk_text['type']
      kfk_old = kfk_text['old']
      logging.warning(' table_name: '+ str(kfk_table)+ ' table_type : ' + kfk_type)
      if kfk_type == 'UPDATE':
       continue
       print('update')
       for i,data in enumerate(kfk_datas):
        kfk_text['data'] = eval("["+str(data)+"]")
        kfk_text['old'] = eval("[" + str(kfk_old[i]) + "]")
        self._get_rh_from_kafka(kfk_text)
      else:
       print('insert')
       for data in kfk_datas:
        kfk_text['data'] = eval("["+str(data)+"]")
        print(type(kfk_text),kfk_text)
        self._get_rh_from_kafka(kfk_text)
      logging.warning('----------------end exec table time : ' + str(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()))+'---------------')
     c.commit()
    except Exception as data:
     writeErrorLog('_do','exce data Error',str(data))
     logging.error('_do: ' + str(data))
    #如果停止程式
    if self.stop_flag == 1:
     self._exit_consumer()
   c.close()
  except Exception as data:
   print(data)
   writeErrorLog('_do',str(data))
   logging.error('_do: ' + str(data))
 def _trans_path(self,tgt_path):
  new_tgt_path=tgt_path.replace('.','\".\"').replace('$\".','$.')+'\"'
  return new_tgt_path


#此方法用來獲取kafka中的資料,
 def _get_rh_from_kafka(self,kfk_text):
  try:
   # 解析獲取到的kfk中的資料流
   self.kfk_tb_schema = kfk_text["database"]#schema
   self.kfk_tb_name = kfk_text["table"]#table_name
   self.kfk_data = kfk_text['data'][0]#data
   self.kfk_type = kfk_text['type']#資料型別type
   self.kfk_es = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float(kfk_text['es'] / 1000)))#資料表更的時間

   # 獲取kfk傳遞過來src表的配置資訊,讀取配置表資訊-----可能為空 需要新增判斷
   tgt_table_names=self._get_tgt_table_name(self.kfk_tb_name,self.kfk_tb_schema)
   if len(tgt_table_names) != 0:
    for tgt_table_name_for_config in tgt_table_names:
     tb_config = self._get_config(self.kfk_tb_name,tgt_table_name_for_config,self.kfk_tb_schema)
     tgt_pk_key = tb_config['target_primary_key']#目標表的主鍵(order_no/order_item_id)
     tgt_schema = tb_config['tgt_table_schema']#目標表的schema
     tgt_table_name = tb_config['tgt_table_name']#目標表的名稱(目前只有兩個目標表tracking_order,tracking_order_item)
     src_table_name = tb_config['src_table_name']#源表的名稱(schema|table_name)
     src_table_schema = tb_config['src_table_schema']
     tgt_columns = tb_config['target_column']#獲取插入到目標表中欄位的配置資訊(例如該表在order_info的插入路徑等配置資訊)
     src_level = tb_config['source_level']#源表的level,目前有三種root,leaf,father
     src_pk_key = tb_config['source_primary_key']#源表的主鍵
     src_pk_value = self.kfk_data[src_pk_key]#源表的主鍵值(從kfk中獲取到)
     tgt_operation=tb_config['tgt_operation']#源表的其他配置,在下面處理時候再進行解析

  #處理的邏輯是,將表型別分為三類,root,leaf,father分開處理,分別處理其insert,update和delete的操作
     if self.kfk_type == 'INSERT': # 判斷kfk的操作型別是INSERT,UPDATE,DELETE
      if src_level == 'root': # 判斷該資料是否是root表
       tgt_pk_value = self.kfk_data[tgt_pk_key]#如果是root表,則獲取目標表表的主鍵的值(和src_pk_value的值相同)
       for item in tgt_columns: # item取值範圍:order_info、order_progress等迴圈插入列,按照配置分別寫入,因為可能一張表在不同列中插入位置不同
        tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" #拼成如下形式,目的為了_get_data_from_kfk傳入引數,例如{"order_info": ["order_no","cust_no"]}
        tgt_column = eval(tgt_column) # 將字串轉換成dict型別
        if str(tgt_columns[item]['target_path'])=='{}':
         logging.warning(str(item)+" is null,please check")
        else:
         tgt_path = list(tgt_columns[item]['target_path'].values())[0]#表在配置中,寫入目標表的路徑
         (table_insert_data,table_insert_data_for_leaf,insert_data,catalog_type) = self._get_data_from_kfk(kfk_text,tgt_column,src_table_name,tgt_pk_value) #呼叫方法,返回三種格式的json,為了不同的寫入方式傳參
         #呼叫將kfk中資料入庫的方法
         self._insert_data_from_kfk_for_root(tgt_schema,tgt_pk_key,tgt_pk_value,item,table_insert_data,tgt_path)#將kfk中主資料寫入資料庫
         self._insert_father_data(src_table_schema,tgt_path,catalog_type,tgt_table_name_for_config)#將主資料涉及到父表寫入
  #子表insert思路:通過配置表找到上層關聯表的鍵值,通過鍵值到資料庫中查詢到子表屬於的記錄(order_no/order_item_id)的值,從而可以確認子表的寫入的絕對路徑(拼上表名稱或者是拼上鍵對應值),然後按照路徑寫入,補全父表
      elif src_level == 'leaf': # 判斷kfk的操作型別是INSERT,UPDATE,DELETE
       parent_pk_info=tgt_operation['parent_pk_key']
       for item in tgt_columns: # item取值範圍:order_info、order_progress、order_operation、order_adjudgement
        tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # #拼成如下形式,目的為了_get_data_from_kfk傳入引數,例如{"order_info": ["order_no","cust_no"]}
        tgt_column = eval(tgt_column) # 將字串轉換成dict型別
        if str(tgt_columns[item]['target_path'])=='{}':#因為子節點可能不會每一列都會配置寫入資訊(這個是不是不判斷也可以,只要不配置即可,如果判斷,root中也需要判斷嗎?)
         logging.warning(str(item) + " is null,please check")
        else:
         tgt_path = list(tgt_columns[item]['target_path'].keys())[0]#獲取寫入的路徑
         (table_insert_data,src_pk_value) # #呼叫方法,返回三種格式的json,為了不同的寫入方式傳參
         (parent_tgt_path,tgt_pk_value_new) = self._get_tgt_info_for_leaf(item,tgt_schema,parent_pk_info,self.kfk_data)#獲取子節點表的需要寫入的目標表的主鍵的值和上一層的寫入真實絕對路徑
         tgt_path_true=parent_tgt_path+"."+src_table_name#獲取子表寫入的絕對路徑(一直到子表的表名的路徑)
         self._insert_data_from_kfk_for_leaf(tgt_schema,tgt_pk_value_new,tgt_path_true,src_pk_value,insert_data) # 將從kafka獲取的資料入庫
         tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'#獲取子表寫入的絕對路徑(一直到子表的主鍵值的路徑)
         self._insert_father_data(src_table_name,tgt_path_new,tgt_table_name_for_config)#遞迴,寫入子表的父表資訊
      elif src_level == 'father':#針對父表資料在主表和子表資料之後產生的情況
       for item in tgt_columns: # item取值範圍:order_info、order_progress、order_operation、order_adjudgement
        tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column例如{"order_info": ["order_no","cust_no"]}
        tgt_column = eval(tgt_column) # 拼接目標列和目標列的值的資訊
        if str(tgt_columns[item]['target_path']) == '{}':
         logging.warning(str(item) + " is null,please check")
        else:
         tgt_paths = list(tgt_columns[item]['target_path'].values())
         (table_insert_data,src_pk_value) # 從kafka獲取的需要插入的json串
         if 'product' in src_table_name.lower():
          catalog_type='PRODUCT'
         elif 'service' in src_table_name.lower():
          catalog_type='SERVICE'
         else:
          catalog_type='0'
         for tgt_path in tgt_paths:
          tgt_info_for_father = self._get_tgt_info_for_father(tgt_path,src_pk_key,catalog_type)
          if len(tgt_info_for_father)==0:
           logging.warning('can not available the data of the root and leaf table ')
          else:
           for i in range(len(tgt_info_for_father)):
            tgt_pk_value_new = tgt_info_for_father[i][0]
            tgt_path_new = ('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:]
            self._insert_data_from_db(tgt_schema,insert_data)
            self._insert_father_data(src_table_name,tgt_table_name_for_config)

     elif self.kfk_type == 'UPDATE':#update處理方式
  #主表update思路 :找到更新的記錄,將需要更新的欄位按照配置的路徑更新(主表的路徑不存在多層),再補全父表,寫入歷史紀錄
      if src_level == 'root': # 判斷是否是root表
       tgt_pk_value = self.kfk_data[tgt_pk_key]##如果是root表,則獲取目標表表的主鍵的值(和src_pk_value的值相同)
       for item in tgt_columns: # item取值範圍:order_info、order_progress、order_operation、order_adjudgement
        tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column例如{"order_info": ["order_no","cust_no"]}
        tgt_column = eval(tgt_column) # 拼接目標列和目標列的值的資訊
        if str(tgt_columns[item]['target_path'])=='{}':
         logging.warning(str(item) + " is null,please check")
        else:
         update_columns = kfk_text['old'][0]#獲取kfk中變更資訊
         tgt_path = list(tgt_columns[item]['target_path'].values())[0]
         (table_insert_data,tgt_pk_value)
         self._update_data(tgt_schema,update_columns,src_table_schema)#更新資料
         #將變更歷史寫入
         if 'alter_column' in list(tgt_columns[item].keys()):
          record_history_column = tgt_columns[item]['alter_column']
          self._insert_history_data(update_columns,record_history_column,self.kfk_es,tgt_pk_value)
         else:
          logging.warning(str(item) + " alter_column is not available")
  #子表update思路:通過配置表找到上層關聯表的鍵值,通過鍵值到資料庫中查詢到子表屬於的記錄(order_no/order_item_id)的值,從而可以確認子表的寫入的絕對路徑(拼上表名稱或者是拼上鍵對應值),然後按照路徑更新對飲的欄位,補全父表
      elif src_level == 'leaf': ## 判斷是否是root表
       parent_pk_info=tgt_operation['parent_pk_key']
       for item in tgt_columns: # item取值範圍:order_info、order_progress、order_operation、order_adjudgement
        tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column例如{"order_info": ["order_no",please check")
        else:
         update_columns = kfk_text['old'][0] # 獲取到變更資訊
         tgt_path = list(tgt_columns[item]['target_path'].keys())[0]
         (table_insert_data,src_pk_value) # 從kafka獲取的需要插入的json串
         (parent_tgt_path,self.kfk_data) # 獲取子表上一層主鍵路徑
         tgt_path_true=parent_tgt_path+"."+src_table_name##獲取子表寫入的絕對路徑(一直到子表的表名的路徑)
         tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'#獲取子表寫入的絕對路徑(一直到子表的主鍵值)
         self._update_data(tgt_schema,src_table_schema)
         if 'alter_column' in list(tgt_columns[item].keys()):
          record_history_column = tgt_columns[item]['alter_column']
          self._insert_history_data(update_columns,tgt_pk_value_new)
         else:
          logging.warning(str(item) + " alter_column is not available")
  #父表更新的思路:從配置表獲取所有目標路徑,迴圈每一個路徑,通過模糊匹配找到所有的目標主鍵值及準確路徑,然後一條條更新,並將涉及的下一級資訊補全
      elif src_level == 'father': # 判斷該資料是否是kfk入庫資訊如果不是就pass
       for item in tgt_columns: # item取值範圍:order_info、order_progress、order_operation、order_adjudgement
        tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column例如{"order_info": ["order_no",please check")
        else:
         update_columns = kfk_text['old'][0] # 獲取到變更資訊
         tgt_paths = list(tgt_columns[item]['target_path'].values())
         (table_insert_data,catalog_type)
          for i in range(len(tgt_info_for_father)):
           tgt_pk_value_new = tgt_info_for_father[i][0]
           tgt_path_new = ('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:]
           self._update_data(tgt_schema,src_table_schema)
     #刪除操作思路:root表直接刪除所有的記錄,leaf刪除按照路徑刪除目標,再加上判斷如果子節點中沒有資料,將對應的表名的欄位刪除
     elif self.kfk_type == 'DELETE':
      if src_level == 'root':
       tgt_pk_value = self.kfk_data[tgt_pk_key]
       self._delete_data_for_root(tgt_pk_key,tgt_table_name)
      elif src_level == 'leaf': #
       parent_pk_info = tgt_operation['parent_pk_key']
       for item in tgt_columns: # item取值範圍:order_info、order_progress、order_operation、order_adjudgement
        tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column例如{"order_info": ["order_no",please check")
        else:
         tgt_path = list(tgt_columns[item]['target_path'].keys())[0]
         (parent_tgt_path,self.kfk_data)#獲取子表上一層主鍵路徑
         tgt_path_true=parent_tgt_path+"."+src_table_name#獲取子表上一層表的路徑
         tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'
         self._delete_data_for_leaf(tgt_schema,tgt_path_true)
  except Exception as data:
   writeErrorLog('_get_rh_from_kafka',str(data))
   logging.error('_get_rh_from_kafka: ' + str(data))

 def _get_tgt_info_for_father(self,catalog_type):
  try:
   tgt_path_true = tgt_path + "." + src_pk_key
   if catalog_type=='0':
    select_sql_for_father="select "+tgt_pk_key+",json_search("+tgt_column+",\'all\',\'"+src_pk_value+"\',null,\'"+tgt_path_true+"\') from "+tgt_schema+"."+tgt_table_name+" where json_extract(json_extract("+tgt_column+",\'"+tgt_path_true+"\'),\'$[0]\')=\'"+src_pk_value+"\';"
   else:
    select_sql_for_father = "select " + tgt_pk_key + ",json_search(" + tgt_column + ",\'" + src_pk_value + "\',\'" + tgt_path_true + "\') from " + tgt_schema + "." + tgt_table_name + " where json_extract(json_extract(" + tgt_column + ",\'" + tgt_path_true + "\'),\'$[0]\')=\'" + src_pk_value + "\' and json_extract(" + tgt_column + ",\'$." +tgt_table_name+".type=\'"+catalog_type+"\';"
   self.mysql_cur.execute(select_sql_for_father)
   tgt_info_for_father=self.mysql_cur.fetchall()
   return tgt_info_for_father
  except Exception as data:
   writeErrorLog('_get_tgt_info_for_father',str(data))
   logging.error('_get_tgt_info_for_father: ' + str(data))


 def _delete_data_for_root(self,tgt_table_name):
  try:
   delete_sql="delete from "+tgt_schema+"."+tgt_table_name+" where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';"
   self.mysql_cur.execute(delete_sql)
   self.mysql_db.commit()
  except Exception as data:
   writeErrorLog('_delete_data_for_root',str(data))
   logging.error('_delete_data_for_root: ' + str(data))

 def _delete_data_for_leaf(self,tgt_path_true):
  try:
   delete_sql="update "+tgt_schema+"."+tgt_table_name+" set "+tgt_column+"=json_remove("+tgt_column+",\'"+tgt_path+"\') where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';"
   self.mysql_cur.execute(delete_sql)
   self.mysql_db.commit()
   select_sql="select json_extract("+tgt_column+",\'"+tgt_path_true+"\') from "+tgt_schema+"."+tgt_table_name+" where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';"
   self.mysql_cur.execute(select_sql)
   tgt_column_value=self.mysql_cur.fetchall()[0][0]
   if tgt_column_value==r'{}':
    table_delete_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_remove(" + tgt_column + ",\'" + tgt_path_true + "\') where " + tgt_pk_key + "=\'" + str(tgt_pk_value) + "\';"
    self.mysql_cur.execute(table_delete_sql)
    self.mysql_db.commit()
  except Exception as data:
   writeErrorLog('_delete_data_for_leaf',str(data))
   logging.error('_delete_data_for_leaf: ' + str(data))

 def _insert_history_data(self,data_time,tgt_pk_value):
  try:
   update_columns_key=list(update_columns.keys())
   for item in record_history_column:
    if item in update_columns_key:
     tgt_path_for_column = tgt_path + '.alter_data.' + item
     tgt_path_for_alter = tgt_path + '.alter_data'
     select_sql_for_alter_column_path = 'select json_extract(' + tgt_column + ',\'' + tgt_path_for_column + '\')' + ' from ' + tgt_schema + '.' + tgt_table_name + ' where ' + tgt_pk_key + '=\'' + str(tgt_pk_value) + '\';'
     select_sql_for_alter_path = 'select json_extract(' + tgt_column + ',\'' + tgt_path_for_alter + '\')' + ' from ' + tgt_schema + '.' + tgt_table_name + ' where ' + tgt_pk_key + '=\'' + str(tgt_pk_value) + '\';'
     self.mysql_cur.execute(select_sql_for_alter_column_path)
     tgt_path_vlaue_for_column = self.mysql_cur.fetchall()
     self.mysql_cur.execute(select_sql_for_alter_path)
     tgt_path_vlaue_for_alter = self.mysql_cur.fetchall()
     old_data = update_columns[item]
     new_data = eval(insert_data)[item]
     if tgt_path_vlaue_for_alter[0][0]==None:
      history_data = '{\"' + item + '\":[{\"old_data\":\"' + str(old_data) + '\",\"new_data\":\"' + str(new_data) + '\",\"time\":\"' + data_time + '\"}]}'
      insert_sql = "update "+tgt_schema + "." + tgt_table_name + " set " + tgt_column +"=json_insert("+tgt_column+",\'"+tgt_path_for_alter+"\',cast(\'"+history_data+"\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';"
     else:
      if tgt_path_vlaue_for_column[0][0]==None:
       history_data='[{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}]'
       insert_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path_for_column + "\',cast(\'" + history_data + "\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';"
      else:
       history_data='{\"old_data\":\"'+str(old_data)+'\",\"time\":\"'+data_time+'\"}'
       insert_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_array_append(" + tgt_column + ",cast(\'" + history_data + "\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';"
    self.mysql_cur.execute(insert_sql)
    self.mysql_db.commit()
  except Exception as data:
   writeErrorLog('_insert_history_data',str(data))
   logging.error('_insert_history_data: ' + str(data))

#將kfk中的資料,進行轉換,轉換成不同的寫入方式需要的json格式
 def _get_data_from_kfk(self,text,src_table_pk_value):
  try:
   tgt_column_json = tgt_column #傳入的目標表的列名稱
   tgt_column_key = ''
   for key in tgt_column_json:#迴圈tgt_column中的key值
    json_column_key = '{'
    for item in tgt_column_json[key]:
     json_column_key += '"' + item + '":"' + text['data'][0][item].replace('"',r'\\"') + '",'
     tgt_column_item = json_column_key[:-1]
    tgt_column_key += tgt_column_item + '},'
    if 'type' in text['data'][0]:
     catalog_type=text['data'][0]['type']
    else:
     catalog_type='0'
   table_insert_data = '{\"' + src_table_name + '\":' + tgt_column_key[:-1] + '}'#拼接成如下帶有表名和主鍵值格式{"order":{"order_no":"100"}}
   insert_data = tgt_column_key[:-1]#拼接成如下不帶表名和不帶主鍵值的格式{"order_no":"100"}
   table_insert_data_for_leaf = '{\"' + src_table_pk_value + '\":'+insert_data+'}'#拼接成如下帶有主鍵值格式的{"100":{"order_no":"100"}}
   print(insert_data)
   return (table_insert_data,catalog_type)#返回資料
  except Exception as data:
   writeErrorLog('_get_data_from_kfk',str(data))
   logging.error('_get_data_from_kfk: ' + str(data))


 def _insert_data_from_kfk_for_root(self,tgt_table_pk,tgt_table_value,tgt_path):
  try:
   #先判斷主鍵是否存在,如果存在則插入其他資料,如果不存在,則先插入主鍵資訊
   select_tb_count = 'select count(*) from ' + tgt_schema +"."+tgt_table_name + ' where ' + tgt_table_pk + '=\'' + tgt_table_value + '\';'
   #判斷列中是否存在資料
   select_tb_column_count ='select case when coalesce(' + tgt_column + ',\'\') = \'\' then 1 else 0 end from ' + tgt_schema +"."+tgt_table_name + ' where ' + tgt_table_pk + '=\'' + tgt_table_value + '\';'
   self.mysql_cur.execute(select_tb_count)
   tb_count = self.mysql_cur.fetchall()
   self.mysql_cur.execute(select_tb_column_count)
   tb_column_count = self.mysql_cur.fetchall()
   #判斷是否存在資料,如果不存在,則先插入主鍵(order_no/order_item_id)再將資料寫入到列中
   if tb_count[0][0] == 0:
    insert_pk_sql = "insert into " + tgt_schema+"."+tgt_table_name + "(" + tgt_table_pk + ") values ('" + tgt_table_value + "')"
    self.mysql_cur.execute(insert_pk_sql)
    self.mysql_db.commit()
    update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "= cast('" + table_insert_data +"' as json) where " + tgt_table_pk + "= '"+ tgt_table_value + "';"
   else:
    #如果主鍵存在,列為空,則需要 直接 寫入帶有table_name格式的json
    if tb_column_count[0][0]==1:#當目標欄位為空
     update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "= cast('" + table_insert_data + "' as json) where " + tgt_table_pk + "= '" + tgt_table_value + "';"
    else:
     #如果主鍵存在,列不為空,則需要使用json_insert方法寫入帶有table_name格式的json
     update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path + "\',cast(\'" + table_insert_data + "\' as json)) where " + tgt_table_pk + "=\'" + tgt_table_value + "\';"
   self.mysql_cur.execute(update_sql)
   self.mysql_db.commit()
  except Exception as data:
   writeErrorLog('_insert_data_from_kfk_for_root',str(data))
   logging.error('_insert_data_from_kfk_for_root: ' + str(data))

 def _get_tgt_pk_value_for_leaf(self,parent_pk_value):
  try:
   select_tgt_pk_sql = "select " + tgt_table_pk + " from " + tgt_schema + "." + tgt_table_name + " where json_extract(" + tgt_column + ",\'" + tgt_path + "\')=\'" + parent_pk_value + "\';"
   self.mysql_cur.execute(select_tgt_pk_sql)
   tgt_pk_value = self.mysql_cur.fetchall()[0][0]
   return tgt_pk_value
  except Exception as data:
   writeErrorLog('_get_tgt_pk_value_for_leaf',str(data))
   logging.error('_get_tgt_pk_value_for_leaf: ' + str(data))

#獲取子節點表的需要寫入的目標表的主鍵的值和上一層的寫入真實絕對路徑
 def _get_tgt_info_for_leaf(self,kafka_data):
  try:
   if_tgt_path='.'.join(tgt_path.split('.')[:-1])
   i=0
   json_search_sql=''
   where_sql=''
   if if_tgt_path=='$':
    for parent_pk_key in list(parent_pk_info.keys()):
     parent_pk_value = kafka_data[parent_pk_info[parent_pk_key]]
     json_search_sql += ",'one','" + str(parent_pk_value) + "','" + tgt_path + "." + parent_pk_key + "') as tgt_path" + str(i)
     where_sql += " tgt_path" + str(i) + " is not null and"
     i = i + 1
   else:
    for parent_pk_key in list(parent_pk_info.keys()):
     parent_pk_value = kafka_data[parent_pk_info[parent_pk_key]]
     json_search_sql += ",'" + tgt_path + ".*." + parent_pk_key + "') as tgt_path" + str(i)
     where_sql += " tgt_path" + str(i) + " is not null and"
     i = i + 1
   select_sql = "select "+tgt_pk_key+",tgt_path0 from (select "+tgt_pk_key+json_search_sql+" from " + tgt_schema + "." + tgt_table_name +") t where "+where_sql[:-4]+";"
   self.mysql_cur.execute(select_sql)
   rows=self.mysql_cur.fetchall()[0]
   tgt_path_new = ('.'.join(rows[1].split('.')[:-1]))[1:]
   tgt_pk_value_new=rows[0]
   return (tgt_path_new,tgt_pk_value_new)
  except Exception as data:
   writeErrorLog('_get_tgt_info_for_leaf',str(data))
   logging.error('_get_tgt_info_for_leaf: ' + str(data))

 def _insert_data_from_kfk_for_leaf(self,insert_data):
  try:
   select_tb_column_key = 'select case when coalesce(json_extract(' + tgt_column + ',\'' + tgt_path + '\'),\'\') = \'\' then 1 else 0 end from ' + tgt_schema + "." + tgt_table_name + ' where ' + tgt_table_pk + '=\'' + str(tgt_table_value) + '\';'
   self.mysql_cur.execute(select_tb_column_key)
   column_key_data = self.mysql_cur.fetchall()
   if column_key_data[0][0] == 1:# 當主鍵存在並且目標欄位不為空路徑不存在,
    tgt_path_new = tgt_path
    tgt_insert_data=table_insert_data_for_leaf
   else:
    tgt_path_new=tgt_path+r'.\"'+str(src_pk_value)+r'\"'
    tgt_insert_data=insert_data
   update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path_new + "\',cast(\'" + tgt_insert_data + "\' as json)) where " + tgt_table_pk + "=\'" + str(tgt_table_value) + "\';"
   self.mysql_cur.execute(update_sql)
   self.mysql_db.commit()
  except Exception as data:
   writeErrorLog('_insert_data_from_kfk_for_leaf',str(data))
   logging.error('_insert_data_from_kfk_for_leaf: ' + str(data))

#將父表資料寫入(父表資料從生產庫中獲取,按照對應的配置路徑寫入資料庫中)
 def _insert_father_data(self,src_table_schema,scr_table_name,src_path,root_pk_value,tgt_table_name_for_config):
  try:
   src_config_data=self._get_config(scr_table_name,src_table_schema)#獲取初始表的配置資訊(此處獲取是為了遞迴時候傳入下一層的表名,獲取對應的配置資訊)
   src_foreign_info=src_config_data['target_column'][tgt_column]['source_foreign_info']#從資料庫配置表中獲取source_foreign_info的資訊,也就是外來鍵的資訊,包括外來鍵,外來鍵的表,以及外來鍵表中的主鍵名稱
   if len(json.dumps(src_foreign_info))==2:#當沒有外來鍵的時候,配置表只存在‘{}'長度為2,就不需要向下遞迴執行,對應的source_foreign_info=[],長度為2
    logging.warning(scr_table_name+" :Recursive over")
   else:
    for src_pk_key in src_foreign_info:#獲取當前表與下層父表的關聯鍵(例如customer表的配置獲取到org_id,"source_foreign_info": {"org_id": {"customer.organization": "org_id"}})
     foreign_table_name_tmp=list(src_foreign_info[src_pk_key].keys())[0] #獲取外來鍵對應的表名foreign_table_name(organization),(每次傳入的key對應一個外來鍵表,只存在一個列,order_info,所以取第一個元素即可)
     foreign_table_schema=foreign_table_name_tmp.split('.')[0]
     foreign_table_name_tmp=foreign_table_name_tmp.split('.')[1]
     if '#' in foreign_table_name_tmp:
      foreign_table_name = foreign_table_name_tmp.replace('#',catalog_type).lower()
     else:
      foreign_table_name = foreign_table_name_tmp
     foreign_table_pk_key=list(src_foreign_info[src_pk_key].values())[0]#獲取外來鍵對應的表的關聯鍵foreign_table_key,即org_id
     foreign_datas = self._get_config(foreign_table_name,foreign_table_schema)#獲取外來鍵表的配置資訊,以便下面獲取配置表的資訊
     foreign_column = foreign_datas['target_column'][tgt_column]#獲取要插入的目標表列是order_info/order_progress)(organization寫入目標表的列的配置資訊)
     foreign_schema = foreign_datas['src_table_schema']#獲取表的schema(organization的原始src schema)
     foreign_table_pk_value = eval(str(insert_data))[src_pk_key] # 獲取外來鍵對應的value(即organization在kfk資料中對應的值)
     #獲取外來鍵對應表的配置資訊(寫入資料庫需要用)
     tgt_schema=foreign_datas['tgt_table_schema']
     tgt_table_name=foreign_datas['tgt_table_name']
     tgt_pk_key=foreign_datas['target_primary_key']
     tgt_pk_value=root_pk_value#目標表主鍵的值
     #獲取資料,並在其中獲取後,寫入資料庫(此處部分引數是為了給insert服務)
     for foreign_path in foreign_column['target_path']:
      src_tgt_path=list(foreign_path.keys())[0]
      foreign_tgt_path = list(foreign_path.values())[0]
      if re.sub('.\"\S*?\"',r'*',src_path) ==src_tgt_path and re.sub('.\"\S*?\"',src_path)+'.'+src_pk_key==foreign_tgt_path:
       next_src_path=src_path+'.'+src_pk_key
       next_insert_data=self._get_data_from_db(foreign_column,foreign_table_name,foreign_schema,foreign_table_pk_key,foreign_table_pk_value,next_src_path)
       self._insert_father_data(foreign_table_schema,next_insert_data,next_src_path,tgt_table_name_for_config)
      else:
       logging.warning(foreign_table_name + ' :have no next level')
  except Exception as data:
   writeErrorLog('_insert_father_data',str(data))
   logging.error('_insert_father_data: ' + str(data))


#從資料庫中獲取資料,並將獲取到的資料,直接插入資料庫中,返回遞迴需要使用的資料
 def _get_data_from_db(self,src_tgt_column,src_table_pk_key,src_table_pk_value,tgt_path):
  try:
   result_data = '{'
   src_column=src_tgt_column['source_column']#讀取需要獲取的欄位
   if len(src_column)==0:
    logging(str(src_column)+ ' length equal 0 error ')
   else:
    for item in src_column:#拼接好sql語句,獲取資料
     select_sql1 = 'concat(\''
     select_sql1 += u'"' + item + '":"\',coalesce(' + item + ',\'\'),\'",'
     select_sql1 = select_sql1[:-1] + '\')'
     select_sql = "select " + select_sql1 + " from " + src_table_schema + "." + src_table_name + " where " + src_table_pk_key + "=\'" + src_table_pk_value + "\';"
     #使用execute方法執行SQL語句
     self.mes_mysql_cur.execute(select_sql)
     # 使用 fetchone() 方法獲取一條資料
     data = self.mes_mysql_cur.fetchall()
     if len(data) == 0:
      result_data += ''
     else:
      result_data+=data[0][0]+','
    if result_data != '{':
     tgt_value=result_data[:-1] + '}'
    else:
     tgt_value = result_data+'\"'+src_table_pk_key+'\":\"'+src_table_pk_value+'\"}'
    self._insert_data_from_db(tgt_schema,tgt_value) # 將獲取到的父表資料寫入資料庫
   return tgt_value#返回寫入的資料,和真是的寫入路徑(因為路徑在配置表中層數多的是用*代替的,不是真正的絕對路徑,這裡返回的是絕對路徑)
  except Exception as data:
   writeErrorLog('_get_data_from_db',str(data))
   logging.error('_get_data_from_db: ' + str(data))
#將父表寫入資料庫
 def _insert_data_from_db(self,tgt_value):
  try:
   insert_sql="update "+ tgt_schema+"."+tgt_table_name +" set "+ tgt_column+"=json_replace("+tgt_column+",\'"+tgt_path+"\',cast(\'"+tgt_value+"\' as json)) where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';"
   # self.mysql_cur.execute(insert_sql.encode("utf-8").decode("latin1"))
   self.mysql_cur.execute(insert_sql)
   self.mysql_db.commit()
  except Exception as data:
   writeErrorLog('_insert_data_from_db',str(data))
   logging.error('_insert_data_from_db: ' + str(data))

 # 當變更資料為外來鍵時,補全外來鍵對應的資訊
 def _update_data(self,src_table_schema):
  try:
   # 判斷是否涉及外來鍵資訊,判斷變更的欄位是否在外來鍵資訊裡,將在的組成新的外來鍵json,在呼叫_get_data_from_db進行更新資料
   insert_data = json.loads(insert_data)
   for update_column in update_columns:#
    if update_column in list(insert_data.keys()):
     update_column_data = '\"' + insert_data[update_column] + '\"'
     tgt_path = src_path + '.' + update_column
     self._insert_data_from_db(tgt_schema,update_column_data)
   self._insert_father_data(src_table_schema,tgt_table_name_for_config)
  except Exception as data:
   writeErrorLog('_update_data',str(data))
   logging.error('_update_data: ' + str(data))

#退出消費訊息
 def _exit_consumer(self):
  self._release_db()
  sys.exit()

def exit_program(signum,frame):
 logging.info("Received Signal: %s at frame: %s" % (signum,frame))
 p.stop_flag = 1

def main():
 #例項化物件
 p = RH_Consumer()
 signal.signal(signal.SIGTERM,exit_program)
 # while True:
 p._do()
main()

以上這篇python3實現從kafka獲取資料,並解析為json格式,寫入到mysql中就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。