如何用python實現一個HTTP連線池
一. 連線池的原理
首先,HTTP連線是基於TCP連線的,與伺服器之間進行HTTP通訊,本質就是與伺服器之間建立了TCP連線後,相互收發基於HTTP協議的資料包. 因此,如果我們需要頻繁地去請求某個伺服器的資源,我們就可以一直維持與個伺服器的TCP連線不斷開,然後在需要請求資源的時候,把連線拿出來用就行了.
一個專案可能需要與伺服器之間同時保持多個連線,比如一個爬蟲專案,有的執行緒需要請求伺服器的網頁資源,有的執行緒需要請求伺服器的圖片等資源,而這些請求都可以建立在同一條TCP連線上.
因此,我們使用一個管理器來對這些連線進行管理,任何程式需要使用這些連線時,向管理器申請就可以了,等到用完之後再將連線返回給管理器,以供其他程式重複使用,這個管理器就是連線池.
二. 實現程式碼
1. HTTPConnectionPool類
基於上一章的分析,連線池應該是一個收納連線的容器,同時對這些連線有管理能力:
class HTTPConnectionPool: def __init__(self,host: str,port: int = None,max_size: int = None,idle_timeout: int = None) -> None: """ :param host: pass :param port: pass :param max_size: 同時存在的最大連線數,預設None->連線數無限,沒了就建立 :param idle_timeout: 單個連線單次最長空閒時間,超時自動關閉,預設None->不限時 """ self.host = host self.port = port self.max_size = max_size self.idle_timeout = idle_timeout self._lock = threading.Condition() self._pool = [] # 這裡的conn_num指的是總連線數,包括其它執行緒拿出去正在使用的連線 self.conn_num = 0 self.is_closed = False def acquire(self,blocking: bool = True,timeout: int = None) -> WrapperHTTPConnection: ... def release(self,conn: WrapperHTTPConnection) -> None: ...
因此,我們定義這樣一個HTTPConnectionPool類,使用一個列表來儲存可用的連線. 對於外部來說,只需要呼叫這個連線池物件的acquire和release方法就能取得和釋放連線.
2. 執行緒安全地管理連線
對於執行緒池內部來說,至少需要三個關於連線的操作: 從連線池中取得連線,將連線放回連線池,以及建立一個連線:
def _get_connection(self) -> WrapperHTTPConnection: # 這個方法會把連線從_idle_conn移動到_used_conn列表中,並返回這個連線 try: return self._pool.pop() except IndexError: raise EmptyPoolError def _put_connection(self,conn: WrapperHTTPConnection) -> None: self._pool.append(conn) def _create_connection(self) -> WrapperHTTPConnection: self.conn_num += 1 return WrapperHTTPConnection(self,HTTPConnection(self.host,self.port))
對於連線池外部來說,主要有申請連線和釋放連線這兩個操作,實際上這就是個簡單的生產者消費者模型. 考慮到外部可能是多執行緒的環境,我們使用threading.Condition來保證執行緒安全. 關於Condition的資料可以看這裡.
def acquire(self,timeout: int = None) -> WrapperHTTPConnection: if self.is_closed: raise ConnectionPoolClosed with self._lock: if self.max_size is None or not self.is_full(): # 在還能建立新連線的情況下,如果沒有空閒連線,直接建立一個就行了 if self.is_pool_empty(): self._put_connection(self._create_connection()) else: # 不能建立新連線的情況下,如果設定了blocking=False,沒連線就報錯 # 否則,就基於timeout進行阻塞,直到超時或者有可用連線為止 if not blocking: if self.is_pool_empty(): raise EmptyPoolError elif timeout is None: while self.is_pool_empty(): self._lock.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: end_time = time.time() + timeout while self.is_pool_empty(): remaining = end_time - time.time() if remaining <= 0: raise EmptyPoolError self._lock.wait(remaining) # 走到這一步了,池子裡一定有空閒連線 return self._get_connection() def release(self,conn: WrapperHTTPConnection) -> None: if self.is_closed: # 如果這個連線是在連線池關閉後才釋放的,那就不用回連線池了,直接放生 conn.close() return # 實際上,python列表的append操作是執行緒安全的,可以不加鎖 # 這裡呼叫鎖是為了通過notify方法通知其它正在wait的執行緒:現在有連線可用了 with self._lock: if not conn.is_available: # 如果這個連線不可用了,就應該建立一個新連線放進去,因為可能還有其它執行緒在等著連線用 conn.close() self.conn_num -= 1 conn = self._create_connection() self._put_connection(conn) self._lock.notify()
我們首先看看acquire方法,這個方法其實就是在申請到鎖之後呼叫內部的_get_connection方法獲取連線,這樣就執行緒安全了. 需要注意的是,如果當前的條件無法獲取連線,就會呼叫條件變數的wait方法,及時釋放鎖並阻塞住當前執行緒. 然後,當其它執行緒作為生產者呼叫release方法釋放連線時,會觸發條件變數的notify方法,從而喚醒一個阻塞在wait階段的執行緒,即消費者. 這個消費者再從池中取出剛放回去的執行緒,這樣整個生產者消費者模型就運轉起來了.
3. 上下文管理器
對於一個程式來說,它使用連線池的形式是獲取連線->使用連線->釋放連線. 因此,我們應該通過with語句來管理這個連線,以免在程式的最後遺漏掉釋放連線這一步驟.
基於這個原因,我們通過一個WrapperHTTPConnection類來對HTTPConnection進行封裝,以實現上下文管理器的功能. HTTPConnection的程式碼可以看《用python實現一個HTTP客戶端》這篇文章.
class WrapperHTTPConnection: def __init__(self,pool: 'HTTPConnectionPool',conn: HTTPConnection) -> None: self.pool = pool self.conn = conn self.response = None self.is_available = True def __enter__(self) -> 'WrapperHTTPConnection': return self def __exit__(self,*exit_info: Any) -> None: # 如果response沒讀完並且連線需要複用,就棄用這個連線 if not self.response.will_close and not self.response.is_closed(): self.close() self.pool.release(self) def request(self,*args: Any,**kwargs: Any) -> HTTPResponse: self.conn.request(*args,**kwargs) self.response = self.conn.get_response() return self.response def close(self) -> None: self.conn.close() self.is_available = False
同樣的,連線池可能也需要關閉,因此我們給連線池也加上上下文管理器的功能:
class HTTPConnectionPool: ... def close(self) -> None: if self.is_closed: return self.is_closed = True pool,self._pool = self._pool,None for conn in pool: conn.close() def __enter__(self) -> 'HTTPConnectionPool': return self def __exit__(self,*exit_info: Any) -> None: self.close()
這樣,我們就可以通過with語句優雅地管理連線池了:
with HTTPConnectionPool(**kwargs) as pool: with pool.acquire() as conn: res = conn.request('GET','/') ...
4. 定時清理連線
如果一個連線池的所需連線數是隨時間變化的,那麼就會出現一種情況: 在高峰期,我們建立了非常多的連線,然後進入低谷期之後,連線過剩,大量的連線處於空閒狀態,浪費資源. 因此,我們可以設定一個定時任務,定期清理空閒時間過長的連線,減少連線池的資源佔用.
首先,我們需要為連線物件新增一個last_time屬性,每當連線釋放進入連線池後,就修改這個屬性的值為當前時間,這樣我們就能明確知道,連線池內的每個空閒連線空閒了多久:
class WrapperHTTPConnection: ... def __init__(self,conn: HTTPConnection) -> None: ... self.last_time = None class HTTPConnectionPool: ... def _put_connection(self,conn: WrapperHTTPConnection) -> None: conn.last_time = time.time() self._pool.append(conn)
然後,我們通過threading.Timer來實現一個定時任務:
def start_clear_conn(self) -> None: if self.idle_timeout is None: # 如果空閒連線的超時時間為無限,那麼就不應該清理連線 return self.clear_idle_conn() self._clearer = threading.Timer(self.idle_timeout,self.start_clear_conn) self._clearer.start() def stop_clear_conn(self) -> None: if self._clearer is not None: self._clearer.cancel()
threading.Timer只會執行一次定時任務,因此,我們需要在start_clear_conn中不斷地把自己設定為定時任務. 這其實等同於新開了一個執行緒來執行start_clear_conn方法,因此並不會出現遞迴過深問題. 不過需要注意的是,threading.Timer雖然不會阻塞當前執行緒,但是卻會阻止當前執行緒結束,就算把它設定為守護執行緒都不行,唯一可行的辦法就是呼叫stop_clear_conn方法取消這個定時任務.
最後,我們定義clear_idle_conn方法來清理閒置時間超時的連線:
def clear_idle_conn(self) -> None: if self.is_closed: raise ConnectionPoolClosed # 這裡開一個新執行緒來清理空閒連線,避免了阻塞主執行緒導致的定時精度出錯 threading.Thread(target=self._clear_idle_conn).start() def _clear_idle_conn(self) -> None: if not self._lock.acquire(timeout=self.idle_timeout): # 因為是每隔self.idle_timeout秒檢查一次 # 如果過了self.idle_timeout秒還沒申請到鎖,下一次都開始了,本次也就不用繼續了 return current_time = time.time() if self.is_pool_empty(): pass elif current_time - self._pool[-1].last_time >= self.idle_timeout: # 這裡處理下面的二分法沒法處理的邊界情況,即所有連線都閒置超時的情況 self.conn_num -= len(self._pool) self._pool.clear() else: # 通過二分法找出從左往右第一個不超時的連線的指標 left,right = 0,len(self._pool) - 1 while left < right: mid = (left + right) // 2 if current_time - self._pool[mid].last_time >= self.idle_timeout: left = mid + 1 else: right = mid self._pool = self._pool[left:] self.conn_num -= left self._lock.release()
由於我們獲取和釋放連線都是從self._pool的尾部開始操作的,因此self._pool這個容器是一個先進後出佇列,它裡面放著的連線,一定是越靠近頭部的閒置時間越長,從頭到尾閒置時間依次遞減. 基於這個原因,我們使用二分法來找出列表中第一個沒有閒置超時的連線,然後把在它之前的連線一次性刪除,這樣就能達到O(logN)的時間複雜度,算是一種比較高效的方法. 需要注意的是,如果連線池內所有的連線都是超時的,那麼這種方法是刪不乾淨的,需要對這種邊界情況單獨處理.
三. 總結
1. 完整程式碼及分析
這個連線池的完整程式碼如下:
import threading import time from typing import Any from client import HTTPConnection,HTTPResponse class WrapperHTTPConnection: def __init__(self,conn: HTTPConnection) -> None: self.pool = pool self.conn = conn self.response = None self.last_time = time.time() self.is_available = True def __enter__(self) -> 'WrapperHTTPConnection': return self def __exit__(self,**kwargs) self.response = self.conn.get_response() return self.response def close(self) -> None: self.conn.close() self.is_available = False class HTTPConnectionPool: def __init__(self,包括其它執行緒拿出去正在使用的連線 self.conn_num = 0 self.is_closed = False self._clearer = None self.start_clear_conn() def acquire(self,timeout: int = None) -> WrapperHTTPConnection: if self.is_closed: raise ConnectionPoolClosed with self._lock: if self.max_size is None or not self.is_full(): # 在還能建立新連線的情況下,直接建立一個就行了 if self.is_pool_empty(): self._put_connection(self._create_connection()) else: # 不能建立新連線的情況下,沒連線就報錯 # 否則,直到超時或者有可用連線為止 if not blocking: if self.is_pool_empty(): raise EmptyPoolError elif timeout is None: while self.is_pool_empty(): self._lock.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: end_time = time.time() + timeout while self.is_pool_empty(): remaining = end_time - time.time() if remaining <= 0: raise EmptyPoolError self._lock.wait(remaining) # 走到這一步了,池子裡一定有空閒連線 return self._get_connection() def release(self,conn: WrapperHTTPConnection) -> None: if self.is_closed: # 如果這個連線是在連線池關閉後才釋放的,直接放生 conn.close() return # 實際上,可以不加鎖 # 這裡呼叫鎖是為了通過notify方法通知其它正在wait的執行緒:現在有連線可用了 with self._lock: if not conn.is_available: # 如果這個連線不可用了,因為可能還有其它執行緒在等著連線用 conn.close() self.conn_num -= 1 conn = self._create_connection() self._put_connection(conn) self._lock.notify() def _get_connection(self) -> WrapperHTTPConnection: # 這個方法會把連線從_idle_conn移動到_used_conn列表中,並返回這個連線 try: return self._pool.pop() except IndexError: raise EmptyPoolError def _put_connection(self,conn: WrapperHTTPConnection) -> None: conn.last_time = time.time() self._pool.append(conn) def _create_connection(self) -> WrapperHTTPConnection: self.conn_num += 1 return WrapperHTTPConnection(self,self.port)) def is_pool_empty(self) -> bool: # 這裡指的是,空閒可用的連線是否為空 return len(self._pool) == 0 def is_full(self) -> bool: if self.max_size is None: return False return self.conn_num >= self.max_size def close(self) -> None: if self.is_closed: return self.is_closed = True self.stop_clear_conn() pool,None for conn in pool: conn.close() def clear_idle_conn(self) -> None: if self.is_closed: raise ConnectionPoolClosed # 這裡開一個新執行緒來清理空閒連線,避免了阻塞主執行緒導致的定時精度出錯 threading.Thread(target=self._clear_idle_conn).start() def _clear_idle_conn(self) -> None: if not self._lock.acquire(timeout=self.idle_timeout): # 因為是每隔self.idle_timeout秒檢查一次 # 如果過了self.idle_timeout秒還沒申請到鎖,本次也就不用繼續了 return current_time = time.time() if self.is_pool_empty(): pass elif current_time - self._pool[-1].last_time >= self.idle_timeout: # 這裡處理下面的二分法沒法處理的邊界情況,即所有連線都閒置超時的情況 self.conn_num -= len(self._pool) self._pool.clear() else: # 通過二分法找出從左往右第一個不超時的連線的指標 left,len(self._pool) - 1 while left < right: mid = (left + right) // 2 if current_time - self._pool[mid].last_time >= self.idle_timeout: left = mid + 1 else: right = mid self._pool = self._pool[left:] self.conn_num -= left self._lock.release() def start_clear_conn(self) -> None: if self.idle_timeout is None: # 如果空閒連線的超時時間為無限,那麼就不應該清理連線 return self.clear_idle_conn() self._clearer = threading.Timer(self.idle_timeout,self.start_clear_conn) self._clearer.start() def stop_clear_conn(self) -> None: if self._clearer is not None: self._clearer.cancel() def __enter__(self) -> 'HTTPConnectionPool': return self def __exit__(self,*exit_info: Any) -> None: self.close() class EmptyPoolError(Exception): pass class ConnectionPoolClosed(Exception): pass
首先,這個連線池的核心就是對連線進行管理,而這包含取出連線和釋放連線兩個過程. 因此這東西的本質就是一個生產者消費者模型,取出執行緒時是消費者,放入執行緒時是生產者,使用threading自帶的Condition物件就能完美解決執行緒安全問題,使二者協同合作.
解決獲取連線和釋放連線這個問題之後,其實這個連線池就已經能用了. 但是如果涉及到更多細節方面的東西,比如判斷連線是否可用,自動釋放連線,清理閒置連線等等,就需要對這個連線進行封裝,為它新增更多的屬性和方法,這就引入了WrapperHTTPConnection這個類. 實現它的__enter___和__exit__方法之後,就能使用上下文管理器來自動釋放連線. 至於清理閒置連線,通過last_time屬性記錄每個連線的最後釋放時間,然後在連線池中新增一個定時任務就行了.
以上就是如何用python實現一個HTTP連線池的詳細內容,更多關於python 實現一個HTTP連線池的資料請關注我們其它相關文章!