1. 程式人生 > 程式設計 >如何用python實現一個HTTP連線池

如何用python實現一個HTTP連線池

一. 連線池的原理

  首先,HTTP連線是基於TCP連線的,與伺服器之間進行HTTP通訊,本質就是與伺服器之間建立了TCP連線後,相互收發基於HTTP協議的資料包. 因此,如果我們需要頻繁地去請求某個伺服器的資源,我們就可以一直維持與個伺服器的TCP連線不斷開,然後在需要請求資源的時候,把連線拿出來用就行了.

如何用python實現一個HTTP連線池

  一個專案可能需要與伺服器之間同時保持多個連線,比如一個爬蟲專案,有的執行緒需要請求伺服器的網頁資源,有的執行緒需要請求伺服器的圖片等資源,而這些請求都可以建立在同一條TCP連線上.

  因此,我們使用一個管理器來對這些連線進行管理,任何程式需要使用這些連線時,向管理器申請就可以了,等到用完之後再將連線返回給管理器,以供其他程式重複使用,這個管理器就是連線池.

如何用python實現一個HTTP連線池

二. 實現程式碼

1. HTTPConnectionPool類

  基於上一章的分析,連線池應該是一個收納連線的容器,同時對這些連線有管理能力:

class HTTPConnectionPool:

  def __init__(self,host: str,port: int = None,max_size: int = None,idle_timeout: int = None) -> None:
    """
    :param host: pass
    :param port: pass
    :param max_size: 同時存在的最大連線數,預設None->連線數無限,沒了就建立
    :param idle_timeout: 單個連線單次最長空閒時間,超時自動關閉,預設None->不限時
    """
    self.host = host
    self.port = port
    self.max_size = max_size
    self.idle_timeout = idle_timeout
    self._lock = threading.Condition()
    self._pool = []
    # 這裡的conn_num指的是總連線數,包括其它執行緒拿出去正在使用的連線
    self.conn_num = 0
    self.is_closed = False

  def acquire(self,blocking: bool = True,timeout: int = None) -> WrapperHTTPConnection:
    ...

  def release(self,conn: WrapperHTTPConnection) -> None:
    ...

  因此,我們定義這樣一個HTTPConnectionPool類,使用一個列表來儲存可用的連線. 對於外部來說,只需要呼叫這個連線池物件的acquire和release方法就能取得和釋放連線.

2. 執行緒安全地管理連線

  對於執行緒池內部來說,至少需要三個關於連線的操作: 從連線池中取得連線,將連線放回連線池,以及建立一個連線:

def _get_connection(self) -> WrapperHTTPConnection:
  # 這個方法會把連線從_idle_conn移動到_used_conn列表中,並返回這個連線
  try:
    return self._pool.pop()
  except IndexError:
    raise EmptyPoolError


def _put_connection(self,conn: WrapperHTTPConnection) -> None:
  self._pool.append(conn)


def _create_connection(self) -> WrapperHTTPConnection:
  self.conn_num += 1
  return WrapperHTTPConnection(self,HTTPConnection(self.host,self.port))

  對於連線池外部來說,主要有申請連線和釋放連線這兩個操作,實際上這就是個簡單的生產者消費者模型. 考慮到外部可能是多執行緒的環境,我們使用threading.Condition來保證執行緒安全. 關於Condition的資料可以看這裡.

def acquire(self,timeout: int = None) -> WrapperHTTPConnection:
  if self.is_closed:
    raise ConnectionPoolClosed
  with self._lock:
    if self.max_size is None or not self.is_full():
      # 在還能建立新連線的情況下,如果沒有空閒連線,直接建立一個就行了
      if self.is_pool_empty():
        self._put_connection(self._create_connection())
    else:
      # 不能建立新連線的情況下,如果設定了blocking=False,沒連線就報錯
      # 否則,就基於timeout進行阻塞,直到超時或者有可用連線為止
      if not blocking:
        if self.is_pool_empty():
          raise EmptyPoolError
      elif timeout is None:
        while self.is_pool_empty():
          self._lock.wait()
      elif timeout < 0:
        raise ValueError("'timeout' must be a non-negative number")
      else:
        end_time = time.time() + timeout
        while self.is_pool_empty():
          remaining = end_time - time.time()
          if remaining <= 0:
            raise EmptyPoolError
          self._lock.wait(remaining)
    # 走到這一步了,池子裡一定有空閒連線
    return self._get_connection()


def release(self,conn: WrapperHTTPConnection) -> None:
  if self.is_closed:
    # 如果這個連線是在連線池關閉後才釋放的,那就不用回連線池了,直接放生
    conn.close()
    return
  # 實際上,python列表的append操作是執行緒安全的,可以不加鎖
  # 這裡呼叫鎖是為了通過notify方法通知其它正在wait的執行緒:現在有連線可用了
  with self._lock:
    if not conn.is_available:
      # 如果這個連線不可用了,就應該建立一個新連線放進去,因為可能還有其它執行緒在等著連線用       conn.close()       self.conn_num -= 1
      conn = self._create_connection()
    self._put_connection(conn)
    self._lock.notify()

  我們首先看看acquire方法,這個方法其實就是在申請到鎖之後呼叫內部的_get_connection方法獲取連線,這樣就執行緒安全了. 需要注意的是,如果當前的條件無法獲取連線,就會呼叫條件變數的wait方法,及時釋放鎖並阻塞住當前執行緒. 然後,當其它執行緒作為生產者呼叫release方法釋放連線時,會觸發條件變數的notify方法,從而喚醒一個阻塞在wait階段的執行緒,即消費者. 這個消費者再從池中取出剛放回去的執行緒,這樣整個生產者消費者模型就運轉起來了.

3. 上下文管理器

  對於一個程式來說,它使用連線池的形式是獲取連線->使用連線->釋放連線. 因此,我們應該通過with語句來管理這個連線,以免在程式的最後遺漏掉釋放連線這一步驟.

  基於這個原因,我們通過一個WrapperHTTPConnection類來對HTTPConnection進行封裝,以實現上下文管理器的功能. HTTPConnection的程式碼可以看《用python實現一個HTTP客戶端》這篇文章.

class WrapperHTTPConnection:

  def __init__(self,pool: 'HTTPConnectionPool',conn: HTTPConnection) -> None:
    self.pool = pool
    self.conn = conn
    self.response = None
    self.is_available = True

  def __enter__(self) -> 'WrapperHTTPConnection':
    return self

  def __exit__(self,*exit_info: Any) -> None:
    # 如果response沒讀完並且連線需要複用,就棄用這個連線
    if not self.response.will_close and not self.response.is_closed():
      self.close()
    self.pool.release(self)

  def request(self,*args: Any,**kwargs: Any) -> HTTPResponse:
    self.conn.request(*args,**kwargs)
    self.response = self.conn.get_response()
    return self.response

  def close(self) -> None:
    self.conn.close()
    self.is_available = False

  同樣的,連線池可能也需要關閉,因此我們給連線池也加上上下文管理器的功能:

class HTTPConnectionPool:
  ...

  def close(self) -> None:
    if self.is_closed:
      return
    self.is_closed = True
    pool,self._pool = self._pool,None
    for conn in pool:
      conn.close()

  def __enter__(self) -> 'HTTPConnectionPool':
    return self

  def __exit__(self,*exit_info: Any) -> None:
    self.close()

  這樣,我們就可以通過with語句優雅地管理連線池了:

with HTTPConnectionPool(**kwargs) as pool:
  with pool.acquire() as conn:
    res = conn.request('GET','/')
    ...

4. 定時清理連線

  如果一個連線池的所需連線數是隨時間變化的,那麼就會出現一種情況: 在高峰期,我們建立了非常多的連線,然後進入低谷期之後,連線過剩,大量的連線處於空閒狀態,浪費資源. 因此,我們可以設定一個定時任務,定期清理空閒時間過長的連線,減少連線池的資源佔用.

  首先,我們需要為連線物件新增一個last_time屬性,每當連線釋放進入連線池後,就修改這個屬性的值為當前時間,這樣我們就能明確知道,連線池內的每個空閒連線空閒了多久:

class WrapperHTTPConnection:
  ...

  def __init__(self,conn: HTTPConnection) -> None:
    ...
    self.last_time = None


class HTTPConnectionPool:
  ...

  def _put_connection(self,conn: WrapperHTTPConnection) -> None:
    conn.last_time = time.time()
    self._pool.append(conn)

  然後,我們通過threading.Timer來實現一個定時任務:

def start_clear_conn(self) -> None:
  if self.idle_timeout is None:
    # 如果空閒連線的超時時間為無限,那麼就不應該清理連線
    return
  self.clear_idle_conn()
  self._clearer = threading.Timer(self.idle_timeout,self.start_clear_conn)
  self._clearer.start()


def stop_clear_conn(self) -> None:
  if self._clearer is not None:
    self._clearer.cancel()

  threading.Timer只會執行一次定時任務,因此,我們需要在start_clear_conn中不斷地把自己設定為定時任務. 這其實等同於新開了一個執行緒來執行start_clear_conn方法,因此並不會出現遞迴過深問題. 不過需要注意的是,threading.Timer雖然不會阻塞當前執行緒,但是卻會阻止當前執行緒結束,就算把它設定為守護執行緒都不行,唯一可行的辦法就是呼叫stop_clear_conn方法取消這個定時任務.

  最後,我們定義clear_idle_conn方法來清理閒置時間超時的連線:

def clear_idle_conn(self) -> None:
  if self.is_closed:
    raise ConnectionPoolClosed
  # 這裡開一個新執行緒來清理空閒連線,避免了阻塞主執行緒導致的定時精度出錯
  threading.Thread(target=self._clear_idle_conn).start()


def _clear_idle_conn(self) -> None:
  if not self._lock.acquire(timeout=self.idle_timeout):
    # 因為是每隔self.idle_timeout秒檢查一次
    # 如果過了self.idle_timeout秒還沒申請到鎖,下一次都開始了,本次也就不用繼續了
    return
  current_time = time.time()
  if self.is_pool_empty():
    pass
  elif current_time - self._pool[-1].last_time >= self.idle_timeout:
    # 這裡處理下面的二分法沒法處理的邊界情況,即所有連線都閒置超時的情況
    self.conn_num -= len(self._pool)
    self._pool.clear()
  else:
    # 通過二分法找出從左往右第一個不超時的連線的指標
    left,right = 0,len(self._pool) - 1
    while left < right:
      mid = (left + right) // 2
      if current_time - self._pool[mid].last_time >= self.idle_timeout:
        left = mid + 1
      else:
        right = mid
    self._pool = self._pool[left:]
    self.conn_num -= left
  self._lock.release()

  由於我們獲取和釋放連線都是從self._pool的尾部開始操作的,因此self._pool這個容器是一個先進後出佇列,它裡面放著的連線,一定是越靠近頭部的閒置時間越長,從頭到尾閒置時間依次遞減. 基於這個原因,我們使用二分法來找出列表中第一個沒有閒置超時的連線,然後把在它之前的連線一次性刪除,這樣就能達到O(logN)的時間複雜度,算是一種比較高效的方法. 需要注意的是,如果連線池內所有的連線都是超時的,那麼這種方法是刪不乾淨的,需要對這種邊界情況單獨處理.

三. 總結

1. 完整程式碼及分析

  這個連線池的完整程式碼如下:

import threading
import time
from typing import Any

from client import HTTPConnection,HTTPResponse


class WrapperHTTPConnection:

  def __init__(self,conn: HTTPConnection) -> None:
    self.pool = pool
    self.conn = conn
    self.response = None
    self.last_time = time.time()
    self.is_available = True

  def __enter__(self) -> 'WrapperHTTPConnection':
    return self

  def __exit__(self,**kwargs)
    self.response = self.conn.get_response()
    return self.response

  def close(self) -> None:
    self.conn.close()
    self.is_available = False


class HTTPConnectionPool:

  def __init__(self,包括其它執行緒拿出去正在使用的連線
    self.conn_num = 0
    self.is_closed = False
    self._clearer = None
    self.start_clear_conn()

  def acquire(self,timeout: int = None) -> WrapperHTTPConnection:
    if self.is_closed:
      raise ConnectionPoolClosed
    with self._lock:
      if self.max_size is None or not self.is_full():
        # 在還能建立新連線的情況下,直接建立一個就行了
        if self.is_pool_empty():
          self._put_connection(self._create_connection())
      else:
        # 不能建立新連線的情況下,沒連線就報錯
        # 否則,直到超時或者有可用連線為止
        if not blocking:
          if self.is_pool_empty():
            raise EmptyPoolError
        elif timeout is None:
          while self.is_pool_empty():
            self._lock.wait()
        elif timeout < 0:
          raise ValueError("'timeout' must be a non-negative number")
        else:
          end_time = time.time() + timeout
          while self.is_pool_empty():
            remaining = end_time - time.time()
            if remaining <= 0:
              raise EmptyPoolError
            self._lock.wait(remaining)
      # 走到這一步了,池子裡一定有空閒連線
      return self._get_connection()

  def release(self,conn: WrapperHTTPConnection) -> None:
    if self.is_closed:
      # 如果這個連線是在連線池關閉後才釋放的,直接放生
      conn.close()
      return
    # 實際上,可以不加鎖
    # 這裡呼叫鎖是為了通過notify方法通知其它正在wait的執行緒:現在有連線可用了
    with self._lock:
      if not conn.is_available:
        # 如果這個連線不可用了,因為可能還有其它執行緒在等著連線用
        conn.close()
        self.conn_num -= 1
        conn = self._create_connection()
      self._put_connection(conn)
      self._lock.notify()

  def _get_connection(self) -> WrapperHTTPConnection:
    # 這個方法會把連線從_idle_conn移動到_used_conn列表中,並返回這個連線
    try:
      return self._pool.pop()
    except IndexError:
      raise EmptyPoolError

  def _put_connection(self,conn: WrapperHTTPConnection) -> None:
    conn.last_time = time.time()
    self._pool.append(conn)

  def _create_connection(self) -> WrapperHTTPConnection:
    self.conn_num += 1
    return WrapperHTTPConnection(self,self.port))

  def is_pool_empty(self) -> bool:
    # 這裡指的是,空閒可用的連線是否為空
    return len(self._pool) == 0

  def is_full(self) -> bool:
    if self.max_size is None:
      return False
    return self.conn_num >= self.max_size

  def close(self) -> None:
    if self.is_closed:
      return
    self.is_closed = True
    self.stop_clear_conn()
    pool,None
    for conn in pool:
      conn.close()

  def clear_idle_conn(self) -> None:
    if self.is_closed:
      raise ConnectionPoolClosed
    # 這裡開一個新執行緒來清理空閒連線,避免了阻塞主執行緒導致的定時精度出錯
    threading.Thread(target=self._clear_idle_conn).start()

  def _clear_idle_conn(self) -> None:
    if not self._lock.acquire(timeout=self.idle_timeout):
      # 因為是每隔self.idle_timeout秒檢查一次
      # 如果過了self.idle_timeout秒還沒申請到鎖,本次也就不用繼續了
      return
    current_time = time.time()
    if self.is_pool_empty():
      pass
    elif current_time - self._pool[-1].last_time >= self.idle_timeout:
      # 這裡處理下面的二分法沒法處理的邊界情況,即所有連線都閒置超時的情況
      self.conn_num -= len(self._pool)
      self._pool.clear()
    else:
      # 通過二分法找出從左往右第一個不超時的連線的指標
      left,len(self._pool) - 1
      while left < right:
        mid = (left + right) // 2
        if current_time - self._pool[mid].last_time >= self.idle_timeout:
          left = mid + 1
        else:
          right = mid
      self._pool = self._pool[left:]
      self.conn_num -= left
    self._lock.release()

  def start_clear_conn(self) -> None:
    if self.idle_timeout is None:
      # 如果空閒連線的超時時間為無限,那麼就不應該清理連線
      return
    self.clear_idle_conn()
    self._clearer = threading.Timer(self.idle_timeout,self.start_clear_conn)
    self._clearer.start()

  def stop_clear_conn(self) -> None:
    if self._clearer is not None:
      self._clearer.cancel()

  def __enter__(self) -> 'HTTPConnectionPool':
    return self

  def __exit__(self,*exit_info: Any) -> None:
    self.close()


class EmptyPoolError(Exception):
  pass


class ConnectionPoolClosed(Exception):
  pass

  首先,這個連線池的核心就是對連線進行管理,而這包含取出連線和釋放連線兩個過程. 因此這東西的本質就是一個生產者消費者模型,取出執行緒時是消費者,放入執行緒時是生產者,使用threading自帶的Condition物件就能完美解決執行緒安全問題,使二者協同合作.

  解決獲取連線和釋放連線這個問題之後,其實這個連線池就已經能用了. 但是如果涉及到更多細節方面的東西,比如判斷連線是否可用,自動釋放連線,清理閒置連線等等,就需要對這個連線進行封裝,為它新增更多的屬性和方法,這就引入了WrapperHTTPConnection這個類. 實現它的__enter___和__exit__方法之後,就能使用上下文管理器來自動釋放連線. 至於清理閒置連線,通過last_time屬性記錄每個連線的最後釋放時間,然後在連線池中新增一個定時任務就行了.

以上就是如何用python實現一個HTTP連線池的詳細內容,更多關於python 實現一個HTTP連線池的資料請關注我們其它相關文章!