1. 程式人生 > >python重試庫retryiny原始碼剖析

python重試庫retryiny原始碼剖析

  上篇博文介紹了常見需要進行請求重試的場景,本篇博文試著剖析有名的python第三方庫retrying原始碼。

   在剖析其原始碼之前,有必要講一下retrying的用法,方便理解。

   安裝:

  pip install retrying

  或者

  easy_install retrying

  一些用法例項如下:

#example 1
from retrying import retry

@retry
def never_give_up_never_surrender():
     
print "一直重試且兩次重試之間無需等待"
#example 2
from retrying import retry

@retry(stop_max_attempt_number=7)
def stop_after_7_attempts():
    print "重試七次後停止"
#example 3
from retrying import retry

@retry(stop_max_delay=10000)
def stop_after_10_s():
    print "十秒之後停止重試"
#example 4
from retrying import
retry @retry(wait_fixed=2000) def wait_2_s(): print "每次重試間隔兩秒"
#example 5
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000)
def wait_random_1_to_2_s():
    print "每次重試隨機等待1到2秒"
#example 6
from retrying import retry

@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000)
def wait_exponential_1000(): print "指數退避,每次重試等待 2^x * 1000 毫秒,上限是10秒,達到上限後每次都等待10秒"

 

#example 7
def retry_if_io_error(exception):
    """Return True if we should retry (in this case when it's an IOError), False otherwise"""
    return isinstance(exception, IOError)

@retry(retry_on_exception=retry_if_io_error)
def might_io_error():
    print "IO異常則重試,並且將其它異常丟擲"

@retry(retry_on_exception=retry_if_io_error, wrap_exception=True)
def only_raise_retry_error_when_not_io_error():
    print "IO異常則重試,並且將其它異常用RetryError物件包裹"
#exampe 8,根據返回結果判斷是否重試
def retry_if_result_none(result):
    """Return True if we should retry (in this case when result is None), False otherwise"""
    return result is None

@retry(retry_on_result=retry_if_result_none)
def might_return_none():
    print "若返回結果為None則重試"

  上面八個例子是retrying的用法,只需在要重試的方法上加上@retry註解,並以相應的條件為引數即可,那麼@retry背後到底是如何實現的呢?下面給出@retry註解實現的方法。

 1 #裝飾器模式,對需要重試的函式,利用retry註解返回
 2 def retry(*dargs, **dkw):
 3     """
 4     Decorator function that instantiates the Retrying object
 5     @param *dargs: positional arguments passed to Retrying object
 6     @param **dkw: keyword arguments passed to the Retrying object
 7     """
 8     # support both @retry and @retry() as valid syntax
 9     #當用法為@retry不帶括號時走這條路徑,dargs[0]為retry註解的函式,返回函式物件wrapped_f
10     if len(dargs) == 1 and callable(dargs[0]):
11         def wrap_simple(f):
12 
13             @six.wraps(f)#註解用於將函式f的簽名複製到新函式wrapped_f
14             def wrapped_f(*args, **kw):
15                 return Retrying().call(f, *args, **kw)
16 
17             return wrapped_f
18 
19         return wrap_simple(dargs[0])
20 
21     else:#當用法為@retry()帶括號時走這條路徑,返回函式物件wrapped_f
22         def wrap(f):
23 
24             @six.wraps(f)#註解用於將函式f的簽名複製到新函式wrapped_f
25             def wrapped_f(*args, **kw):
26                 return Retrying(*dargs, **dkw).call(f, *args, **kw)
27 
28             return wrapped_f
29 
30         return wrap

  當用@retry標記函式時,例如例項1,其實執行了

never_give_up_never_surrender = retry(never_give_up_never_surrender)

  此時的never_give_up_never_surrender函式實際上是10-19行返回的wrapped_f函式,後續對never_give_up_never_surrender函式的呼叫都是呼叫的14行的wrapped_f函式。

當使用@retry()或者帶引數的@retry(params)時,如例項2,實際執行了:

stop_after_7_attempts = retry(stop_max_attempt_number)(stop_after_7_attempts)

  此時的stop_after_7_attempts函式實際上是22-29行的wrapped_f函式,後續對stop_after_7_attempts函式的呼叫都是對25行的wrapped_f函式呼叫。

可以看到實際上@retry將對需要重試的函式呼叫轉化為對Retrying類中call函式的呼叫,重試邏輯也在這個函式實現,實現對邏輯程式碼的無侵入,程式碼如下:

 

 1 def call(self, fn, *args, **kwargs):
 2         start_time = int(round(time.time() * 1000))
 3         attempt_number = 1
 4         while True:
 5             #_before_attempts為@retry傳進來的before_attempts,在每次呼叫函式前執行一些操作
 6             if self._before_attempts:
 7                 self._before_attempts(attempt_number)
 8 
 9             try:#Attempt將函式執行結果或者異常資訊以及執行次數作為內部狀態,用True或False標記是內部存的值正常執行結果還是異常
10                 attempt = Attempt(fn(*args, **kwargs), attempt_number, False)
11             except:
12                 tb = sys.exc_info()#獲取異常堆疊資訊,sys.exc_info()返回type(異常型別), value(異常說明), traceback(traceback物件,包含更豐富的資訊)
13                 attempt = Attempt(tb, attempt_number, True)
14 
15             if not self.should_reject(attempt):#根據本次執行結果或異常型別判斷是否應該停止
16                 return attempt.get(self._wrap_exception)
17             
18             if self._after_attempts:#_after_attempts為@retry傳進來的after_attempts,在每次呼叫函式後執行一些操作
19                 self._after_attempts(attempt_number)
20             
21             delay_since_first_attempt_ms = int(round(time.time() * 1000)) - start_time
22             if self.stop(attempt_number, delay_since_first_attempt_ms):#根據重試次數和延遲判斷是否應該停止
23                 if not self._wrap_exception and attempt.has_exception:
24                     # get() on an attempt with an exception should cause it to be raised, but raise just in case
25                     raise attempt.get()
26                 else:
27                     raise RetryError(attempt)
28             else:#不停止則等待一定時間,延遲時間根據wait函式返回值和_wait_jitter_max計算
29                 sleep = self.wait(attempt_number, delay_since_first_attempt_ms)
30                 if self._wait_jitter_max:
31                     jitter = random.random() * self._wait_jitter_max
32                     sleep = sleep + max(0, jitter)
33                 time.sleep(sleep / 1000.0)
34 
35             attempt_number += 1 #進行下一輪重試

  9-13行將函式執行返回結果或異常存入Attempt物件attempt中,Attempt類如下:

class Attempt(object):
    """
    An Attempt encapsulates a call to a target function that may end as a
    normal return value from the function or an Exception depending on what
    occurred during the execution.
    """
    #value值為函式返回結果或異常,根據has_exception判斷
    def __init__(self, value, attempt_number, has_exception):
        self.value = value
        self.attempt_number = attempt_number
        self.has_exception = has_exception
    #返回函式執行結果或異常,並根據wrap_exception引數對異常用RetryError包裹
    def get(self, wrap_exception=False):
        """
        Return the return value of this Attempt instance or raise an Exception.
        If wrap_exception is true, this Attempt is wrapped inside of a
        RetryError before being raised.
        """
        if self.has_exception:
            if wrap_exception:
                raise RetryError(self)
            else:#重新構造原異常丟擲
                six.reraise(self.value[0], self.value[1], self.value[2])
        else:
            return self.value

    def __repr__(self):
        if self.has_exception:
            return "Attempts: {0}, Error:\n{1}".format(self.attempt_number, "".join(traceback.format_tb(self.value[2])))
        else:
            return "Attempts: {0}, Value: {1}".format(self.attempt_number, self.value)

  15行根據should_reject函式的返回值判斷是否停止重試,程式碼如下:

 def should_reject(self, attempt):
        reject = False
        #假如異常在retry_on_exception引數中返回True,則重試,預設不傳異常引數時,發生異常一直重試
        if attempt.has_exception:
            reject |= self._retry_on_exception(attempt.value[1])
        else:#假如函式返回結果在retry_on_result引數函式中為True,則重試
            reject |= self._retry_on_result(attempt.value) 

        return reject

 

  22行根據重試次數和延遲判斷是否應該停止重試,self.stop的賦值程式碼在建構函式中,程式碼片段如下:

        stop_funcs = []
        if stop_max_attempt_number is not None:
            stop_funcs.append(self.stop_after_attempt)

        if stop_max_delay is not None:
            stop_funcs.append(self.stop_after_delay)

        if stop_func is not None:
            self.stop = stop_func

        elif stop is None:#執行次數和延遲任何一個達到限制則停止
            self.stop = lambda attempts, delay: any(f(attempts, delay) for f in stop_funcs)

        else:
            self.stop = getattr(self, stop)


def stop_after_attempt(self, previous_attempt_number, delay_since_first_attempt_ms):
        """Stop after the previous attempt >= stop_max_attempt_number."""
        return previous_attempt_number >= self._stop_max_attempt_number

    def stop_after_delay(self, previous_attempt_number, delay_since_first_attempt_ms):
        """Stop after the time from the first attempt >= stop_max_delay."""
        return delay_since_first_attempt_ms >= self._stop_max_delay

  29-33行等待一段時間再次重試,其中延遲時間重點是根據29行的wait函式計算,wait函式在建構函式中賦值,程式碼片段如下:

wait_funcs = [lambda *args, **kwargs: 0]
        if wait_fixed is not None:
            wait_funcs.append(self.fixed_sleep)

        if wait_random_min is not None or wait_random_max is not None:
            wait_funcs.append(self.random_sleep)

        if wait_incrementing_start is not None or wait_incrementing_increment is not None:
            wait_funcs.append(self.incrementing_sleep)

        if wait_exponential_multiplier is not None or wait_exponential_max is not None:
            wait_funcs.append(self.exponential_sleep)

        if wait_func is not None:
            self.wait = wait_func

        elif wait is None:#返回幾個函式的最大值,作為等待時間
            self.wait = lambda attempts, delay: max(f(attempts, delay) for f in wait_funcs)

        else:
            self.wait = getattr(self, wait)

  其中最值得研究的是指數退避延遲時間計算方法,函式為exponential_sleep,程式碼如下:

def exponential_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):
        exp = 2 ** previous_attempt_number 
        result = self._wait_exponential_multiplier * exp #延遲時間為_wait_exponential_multiplier*2^x
        if result > self._wait_exponential_max:#假如大於退避上限_wait_exponential_max,則result為上限值
            result = self._wait_exponential_max
        if result < 0:
            result = 0
        return result