1. 程式人生 > 程式設計 >解決python ThreadPoolExecutor 執行緒池中的異常捕獲問題

解決python ThreadPoolExecutor 執行緒池中的異常捕獲問題

問題

最近寫了涉及執行緒池及執行緒的 python 指令碼,執行過程中發現一個有趣的現象,執行緒池中的工作執行緒出現問題,引發了異常,但是主執行緒沒有捕獲異常,還在發現 BUG 之前一度以為執行緒池程式碼正常返回。

先說重點

這裡主要想介紹 python concurrent.futuresthread.ThreadPoolExecutor 執行緒池中的 worker 引發異常的時候,並不會直接向上拋起異常,而是需要主執行緒通過呼叫concurrent.futures.Future.exception(timeout=None) 方法主動獲取 worker 的異常。

問題重現及解決

引子

問題主要由這樣一段程式碼引起的:

def thread_executor():
 logger.info("I am slave. I am working. I am going to sleep 3s")
 sleep(3)
 logger.info("Exit thread executor")


def main():
 thread_obj = threading.Thread(target=thread_executor)
 while True:
  logger.info("Master starts thread worker")

  try:
   # 工作執行緒由於某種異常而結束並退出了,想重啟工作執行緒的工作,但又不想重複建立執行緒
   thread_obj.start() # 這一行會報錯,同一執行緒不能重複啟動
  except Exception as e:
   logger.error("Master start thread error",exc_info=True)
   raise e

  logger.info("Master is going to sleep 5s")
  sleep(5)

上面這段程式碼的功能如註釋中解釋的,主要要實現類似生產者消費者的功能,工作執行緒一直去生產資源,主執行緒去消費工作執行緒生產的資源。但是工作執行緒由於異常推出了,想重新啟動生產工作。顯然,這個程式碼會報錯。

執行結果:

thread: MainThread [INFO] Master starts thread worker
thread: Thread-1 [INFO] I am slave. I am working. I am going to sleep 3s
thread: MainThread [INFO] Master is going to sleep 5s
thread: Thread-1 [INFO] Exit thread executor because of some exception
thread: MainThread [INFO] Master starts thread worker
thread: MainThread [ERROR] Master start thread error
Traceback (most recent call last):
File "xxx.py",line 47,in main
 thread_obj.start()
File "E:\anaconda\lib\threading.py",line 843,in start
 raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once
Traceback (most recent call last):
File "xxx.py",line 56,in <module>
 main()
File "xxx.py",line 50,in main
 raise e
File "xxx.py",in start
 raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once

切入正題

然而指令碼還有其他業務程式碼要執行,所以需要把上面的資源生產和消費的程式碼放到一個執行緒裡完成,所以引入執行緒池來執行這段程式碼:

def thread_executor():
 while True:
  logger.info("I am slave. I am working. I am going to sleep 3s")
  sleep(3)
  logger.info("Exit thread executor because of some exception")
  break


def main():
 thread_obj = threading.Thread(target=thread_executor)
 while True:
  logger.info("Master starts thread worker")

  # 工作執行緒由於某種異常而結束並退出了,想重啟工作執行緒的工作,但又不想重複建立執行緒
  # 沒有想到這裡會有異常
  thread_obj.start() # 這一行會報錯,同一執行緒不能重複啟動

  logger.info("Master is going to sleep 5s")
  sleep(5)


def thread_pool_main():
 thread_obj = ThreadPoolExecutor(max_workers=1,thread_name_prefix="WorkExecutor")
 logger.info("Master ThreadPool Executor starts thread worker")
 thread_obj.submit(main)

 while True:
  logger.info("Master ThreadPool Executor is going to sleep 5s")
  sleep(5)

if __name__ == '__main__':
 thread_pool_main()

程式碼執行結果如下:

INFO [thread: MainThread] Master ThreadPool Executor starts thread worker
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3s
INFO [thread: WorkExecutor_0] Master is going to sleep 5s
INFO [thread: Thread-1] Exit thread executor because of some exception
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s

... ...

顯然,由上面的結果,線上程池 worker 執行到 INFO [thread: WorkExecutor_0] Master starts thread worker 的時候,是會有異常產生的,但是整個程式碼並沒有拋棄任何異常。

解決方法

發現上面的 bug 後,想線上程池 worker 出錯的時候,把異常記錄到日誌。查閱資料,要獲取執行緒池的異常資訊,需要呼叫 concurrent.futures.Future.exception(timeout=None) 方法,為了記錄日誌,這裡加了執行緒池執行結束的回撥函式。同時,日誌中記錄異常資訊,用了 logging.exception() 方法。

def thread_executor():
 while True:
  logger.info("I am slave. I am working. I am going to sleep 3s")
  sleep(3)
  logger.info("Exit thread executor because of some exception")
  break


def main():
 thread_obj = threading.Thread(target=thread_executor)
 while True:
  logger.info("Master starts thread worker")

  # 工作執行緒由於某種異常而結束並退出了,想重啟工作執行緒的工作,但又不想重複建立執行緒
  # 沒有想到這裡會有異常
  thread_obj.start() # 這一行會報錯,同一執行緒不能重複啟動

  logger.info("Master is going to sleep 5s")
  sleep(5)


def thread_pool_callback(worker):
 logger.info("called thread pool executor callback function")
 worker_exception = worker.exception()
 if worker_exception:
  logger.exception("Worker return exception: {}".format(worker_exception))


def thread_pool_main():
 thread_obj = ThreadPoolExecutor(max_workers=1,thread_name_prefix="WorkExecutor")
 logger.info("Master ThreadPool Executor starts thread worker")
 thread_pool_exc = thread_obj.submit(main)
 thread_pool_exc.add_done_callback(thread_pool_callback)
 # logger.info("thread pool exception: {}".format(thread_pool_exc.exception()))

 while True:
  logger.info("Master ThreadPool Executor is going to sleep 5s")
  sleep(5)


if __name__ == '__main__':
 thread_pool_main()

程式碼執行結果:

INFO [thread: MainThread] Master ThreadPool Executor starts thread worker
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3s
INFO [thread: WorkExecutor_0] Master is going to sleep 5s
INFO [thread: Thread-1] Exit thread executor because of some exception
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: WorkExecutor_0] called thread pool executor callback function
ERROR [thread: WorkExecutor_0] Worker return exception: threads can only be started once
Traceback (most recent call last):
File "E:\anaconda\lib\concurrent\futures\thread.py",line 57,in run
 result = self.fn(*self.args,**self.kwargs)
File "xxxx.py",line 46,in main
 thread_obj.start() # 這一行會報錯,同一執行緒不能重複啟動
File "E:\anaconda\lib\threading.py",in start
 raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
... ...

最終的寫法

其實,上面寫法中,想重複利用一個執行緒去實現生產者執行緒的實現方法是有問題的,在此處,一般情況下,執行緒執行結束後,執行緒資源會被會被作業系統,所以執行緒不能被重複呼叫 start() 。

解決python ThreadPoolExecutor 執行緒池中的異常捕獲問題

解決python ThreadPoolExecutor 執行緒池中的異常捕獲問題

一種可行的實現方式就是,用執行緒池替代。當然,這樣做得注意上面提到的執行緒池執行體的異常捕獲問題。

def thread_executor():
 while True:
  logger.info("I am slave. I am working. I am going to sleep 3s")
  sleep(3)
  logger.info("Exit thread executor because of some exception")
  break

def executor_callback(worker):
 logger.info("called worker callback function")
 worker_exception = worker.exception()
 if worker_exception:
  logger.exception("Worker return exception: {}".format(worker_exception))
  # raise worker_exception


def main():
 slave_thread_pool = ThreadPoolExecutor(max_workers=1,thread_name_prefix="SlaveExecutor")
 restart_flag = False
 while True:
  logger.info("Master starts thread worker")

  if not restart_flag:
   restart_flag = not restart_flag
   logger.info("Restart Slave work")
  slave_thread_pool.submit(thread_executor).add_done_callback(executor_callback)

  logger.info("Master is going to sleep 5s")
  sleep(5)

總結

這個問題主要還是因為對 Python 的 concurrent.futuresthread.ThreadPoolExecutor 不夠了解導致的,接觸這個包是在書本上,但是書本沒完全介紹包的全部 API 及用法,所以程式碼產生異常情況後,DEBUG 了許久在真正找到問題所在。查閱 python docs 後才對其完整用法有所認識,所以,以後學習新的 python 包的時候還是可以查一查官方文件的。

參考資料

英文版: docs of python concurrent.futures

中文版: python docs concurrent.futures — 啟動並行任務

exception(timeout=None)

返回由呼叫引發的異常。如果呼叫還沒完成那麼這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError 將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None,那麼等待時間就沒有限制。

如果 futrue 在完成前被取消則 CancelledError 將被觸發。

如果呼叫正常完成那麼返回 None。

add_done_callback(fn)

附加可呼叫 fn 到期程。當期程被取消或完成執行時,將會呼叫 fn,而這個期程將作為它唯一的引數。

加入的可呼叫物件總被屬於新增它們的程序中的執行緒按加入的順序呼叫。如果可呼叫物件引發一個 Exception 子類,它會被記錄下來並被忽略掉。如果可呼叫物件引發一個 BaseException 子類,這個行為沒有定義。

如果期程已經完成或已取消,fn 會被立即呼叫。

以上這篇解決python ThreadPoolExecutor 執行緒池中的異常捕獲問題就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。