Python 中創建 PostgreSQL 數據庫連接池

習慣於使用數據庫之前都必須創建一個連接池,即使是單線程的應用,隻要有多個方法中需用到數據庫連接,建立一兩個連接的也會考慮先池化他們。連接池的好處多多

  • 1) 如果反復創建連接相當耗時,
  • 2) 對於單個連接一路用到底的應用,有連接池時避免瞭數據庫連接對象傳來傳去,
  • 3) 忘記關連接瞭,連接池幸許還能幫忙在一定時長後關掉,當然密集取連接的應用勢將耗盡連接,
  • 4) 一個應用打開連接的數量是可控的

接觸到 Python 後,在使用 PostgreSQL 也自然而然的考慮創建連接池,使用時從池中取,用完後還回去,而不是每次需要連接時創建一個物理的。Python 連接 PostgreSQL 是主要有兩個包, py-postgresql psycopg2 , 而本文的實例將使用後者。

Psycopg psycopg2.pool 模塊中提供瞭兩個連接池的實現在,它們都繼承自 psycopg2.pool.AbstractConnectionPool,

該抽象類的基本方法是

  • getconn(key=None): 獲取連接
  • putconn(conn, key=None, close=False): 歸還連接
  • closeall(): 關閉連接池中的所有連接

兩個連接池的實現類是

  • psycopg2.pool.SimpleConnectionPool(minconn, maxconn, *args, **kwars) : 給單線程應用用的
  • psycopg2.pool.ThreadedConnectionPool(minconn, maxconn, *args, **kwars) : 多線程時更安全,其實就是在 getconn() putconn() 時加瞭鎖來控制

所以最安全保險的做法還是使用 ThreadedConnectionPool, 在單線程應用中, SimpleConnectionPool  也不見得比 ThreadedConnectionPool 效率高多少。

下面來看一個具體的連接池實現,其中用到瞭 Context Manager, 使用時結合 with 鍵字更方便,用完後不用顯式的調用 putconn() 歸還連接

db_helper.py

from psycopg2 import pool
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager
import atexit


class DBHelper:
    def __init__(self):
        self._connection_pool = None

    def initialize_connection_pool(self):
        db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5'
        self._connection_pool = pool.ThreadedConnectionPool(1, 3,db_dsn)

    @contextmanager
    def get_resource(self, autocommit=True):
        if self._connection_pool is None:
            self.initialize_connection_pool()

        conn = self._connection_pool.getconn()
        conn.autocommit = autocommit
        cursor = conn.cursor(cursor_factory=RealDictCursor)
        try:
            yield cursor, conn
        finally:
            cursor.close()
            self._connection_pool.putconn(conn)

    def shutdown_connection_pool(self):
        if self._connection_pool is not None:
            self._connection_pool.closeall()


db_helper = DBHelper()


@atexit.register
def shutdown_connection_pool():
    db_helper.shutdown_connection_pool()
from psycopg2 import pool

from psycopg2 . extras import RealDictCursor

from contextlib import contextmanager

import atexit

class DBHelper :

     def __init__ ( self ) :

         self . _connection_pool = None

     def initialize_connection_pool ( self ) :

         db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5'

         self . _connection_pool = pool . ThreadedConnectionPool ( 1 , 3 , db_dsn )

     @ contextmanager

     def get_resource ( self , autocommit = True ) :

         if self . _connection_pool is None :

             self . initialize_connection_pool ( )

         conn = self . _connection_pool . getconn ( )

         conn . autocommit = autocommit

         cursor = conn . cursor ( cursor_factory = RealDictCursor )

         try :

             yield cursor , conn

         finally :

             cursor . close ( )

             self . _connection_pool . putconn ( conn )

     def shutdown_connection_pool ( self ) :

         if self . _connection_pool is not None :

             self . _connection_pool . closeall ( )

db_helper = DBHelper ( )

@ atexit . register

def shutdown_connection_pool ( ) :

     db_helper . shutdown_connection_pool ( )

幾點說明:

  • 隻在第一次調用 get_resource() 時創建連接池,而不是在 from db_helper import db_helper  引用時就創建連接池
  • Context Manager 返回瞭兩個對象,cursor connection, 需要用  connection 管理事物時用它
  • 默認時 cursor 返回的記錄是字典,而非數組
  • 默認時連接為自動提交
  • 最後的 @atexit.register 那個  ShutdownHook 可能有點多餘,在進程退出時連接也被關閉,TIME_WAIT 時間應該會稍長些

使用方式:

如果不用事物

from db_helper import db_helper


with db_helper.get_resource() as (cursor, _):
    cursor.execute('select * from users')
    for record in cursor.fetchall():
        ... process record, record['name'] ...
from db_helper import db_helper

with db_helper . get_resource ( ) as ( cursor , _ ) :

     cursor . execute ( 'select * from users' )

     for record in cursor . fetchall ( ) :

         . . . process record , record [ 'name' ] . . .

如果需要用到事物

with db_helper.get_resource(autocommit=False) as (cursor, _):
    try:
        cursor.execute('update users set name = %s where id = %s', ('new_name', 1))
        cursor.execute('delete from orders where user_id = %s', (1,))
        conn.commit()
    except:
        conn.rollback()
with db_helper . get_resource ( autocommit = False ) as ( cursor , _ ) :

     try :

         cursor . execute ( 'update users set name = %s where id = %s' , ( 'new_name' , 1 ) )

         cursor . execute ( 'delete from orders where user_id = %s' , ( 1 , ) )

         conn . commit ( )

     except :

         conn . rollback ( )

在寫作本文時,查看 psycopg 的官網時,發現 Psycopg 3.0 正式版在 2021-10-13 日發佈瞭( Psycopg 3.0 released ), 更好的支持 async。在 Psycopg2 2.2 版本時就開始支持異步瞭。而且還註意到 Psycopg 的主要部分是用 C 實現的,才使得它效率比較高,也難怪經常用 pip install psycopg2 安裝不成功,而要用 pip install psycopg2-binary 來安裝的原因。

在創建連接池時加上參數 keepalivesXxx 能讓服務器及時斷掉死鏈接,否則在 Linux 下默認要 2 個小時後才斷開。死鏈接的情況發生在客戶端異常退出(如斷電)時先前建立的鏈接就變為死鏈接瞭。

 pool.ThreadedConnectionPool(1, 3, db_dsn, keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) 


PostgreSQL 服務端會對連接在空閑 tcp_keepalives_idle 秒後,主動發送tcp_keepalives_count 個 tcp_keeplive 偵測包,每個偵探包在 tcp_keepalives_interval 秒內都沒有回應,就認為是死連接,於是切斷它。

到此這篇關於Python 中創建 PostgreSQL 數據庫連接池的文章就介紹到這瞭,更多相關PostgreSQL Python內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: