Python探針完成調用庫的數據提取
1.簡單粗暴的方法–對mysql庫進行封裝
要統計一個執行過程, 就需要知道這個執行過程的開始位置和結束位置, 所以最簡單粗暴的方法就是基於要調用的方法進行封裝,在框架調用MySQL庫和MySQL庫中間實現一個中間層, 在中間層完成耗時統計,如:
# 偽代碼 def my_execute(conn, sql, param): # 針對MySql庫的統計封裝組件 with MyTracer(conn, sql, param): # 以下為正常使用MySql庫的代碼 with conn.cursor as cursor: cursor.execute(sql, param) ...
看樣子實現起來非常不錯, 而且更改非常方便, 但由於是在最頂層的API上進行修改, 其實是非常不靈活的, 同時在cursor.execute裡會進行一些預操作, 如把sql和param進行拼接, 調用nextset清除當前遊標的數據等等。我們最後拿到的數據如時間耗時也是不準確的, 同時也沒辦法得到一些詳細的元數據, 如錯誤碼等等.
如果要拿到最直接有用的數據,就隻能去改源代碼, 然後再調用源代碼瞭, 但是如果每個庫都需要改源代碼才能統計, 那也太麻煩瞭, 好在Python也提供瞭一些類似探針的接口, 可以通過探針把庫的源碼進行替換完成我們的代碼.
2.Python的探針
在Python中可以通過sys.meta_path來實現import hook的功能, 當執行 import 相關操作時, 會根據sys.meta_path定義的對象對import相關庫進行更改.sys.meta_path中的對象需要實現一個find_module方法, 這個find_module方法返回None或一個實現瞭load_module方法的對象, 我們可以通過這個對象, 針對一些庫在import時, 把相關的方法進行替換, 簡單用法如下,通過hooktime.sleep讓他在sleep的時候能打印消耗的時間.
import importlib import sys from functools import wraps def func_wrapper(func): """這裡通過一個裝飾器來達到貍貓換太子和獲取數據的效果""" @wraps(func) def wrapper(*args, **kwargs): # 記錄開始時間 start = time.time() result = func(*args, **kwargs) # 統計消耗時間 end = time.time() print(f"speed time:{end - start}") return result return wrapper class MetaPathFinder: def find_module(self, fullname, path=None): # 執行時可以看出來在import哪些模塊 print(f'find module:{path}:{fullname}') return MetaPathLoader() class MetaPathLoader: def load_module(self, fullname): # import的模塊都會存放在sys.modules裡面, 通過判斷可以減少重復import if fullname in sys.modules: return sys.modules[fullname] # 防止遞歸調用 finder = sys.meta_path.pop(0) # 導入 module module = importlib.import_module(fullname) if fullname == 'time': # 替換函數 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return module sys.meta_path.insert(0, MetaPathFinder()) if __name__ == '__main__': import time time.sleep(1) # 輸出示例: # find module:datetime # find module:time # load module:time # find module:math # find module:_datetime # speed time:1.00073385238647468
3.制作探針模塊
瞭解完瞭主要流程後, 可以開始制作自己的探針模塊瞭, 由於示例隻涉及到aiomysql模塊, 那麼在MetaPathFinder.find_module中需要隻對aiomysql模塊進行處理, 其他的先忽略. 然後我們需要確定我們要把aiomysql的哪個功能給替換, 從業務上來說, 一般情況下我們隻要cursor.execute, cursor.fetchone, cursor.fetchall, cursor.executemany這幾個主要的操作,所以需要深入cursor看看如何去更改代碼, 後者重載哪個函數.
先cursor.execute的源碼(cursor.executemanay也類似), 發現會先調用self.nextset的方法, 把上個請求的數據先拿完, 再合並sql語句, 最後通過self._query進行查詢:
async def execute(self, query, args=None): """Executes the given operation Executes the given operation substituting any markers with the given parameters. For example, getting all rows where id is 5: cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,)) :param query: ``str`` sql statement :param args: ``tuple`` or ``list`` of arguments for sql query :returns: ``int``, number of rows that has been produced of affected """ conn = self._get_db() while (await self.nextset()): pass if args is not None: query = query % self._escape_args(args, conn) await self._query(query) self._executed = query if self._echo: logger.info(query) logger.info("%r", args) return self._rowcount
再看cursor.fetchone的源碼(cursor.fetchall也類似), 發現其實是從緩存中獲取數據,
這些數據在執行cursor.execute中就已經獲取瞭:
def fetchone(self): """Fetch the next row """ self._check_executed() fut = self._loop.create_future() if self._rows is None or self._rownumber >= len(self._rows): fut.set_result(None) return fut result = self._rows[self._rownumber] self._rownumber += 1 fut = self._loop.create_future() fut.set_result(result) return fut
綜合上面的分析, 我們隻要對核心的方法self._query進行重載即可拿到我們要的數據, 從源碼中我們可以知道, 我們能獲取到傳入self._query的self和sql參數, 根據self又能獲取到查詢的結果, 同時我們通過裝飾器能獲取到運行的時間, 要的數據基本都到齊瞭,
按照思路修改後的代碼如下:
import importlib import time import sys from functools import wraps from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING from types import ModuleType if TYPE_CHECKING: import aiomysql def func_wrapper(func: Callable): @wraps(func) async def wrapper(*args, **kwargs) -> Any: start: float = time.time() func_result: Any = await func(*args, **kwargs) end: float = time.time() # 根據_query可以知道, 第一格參數是self, 第二個參數是sql self: aiomysql.Cursor = args[0] sql: str = args[1] # 通過self,我們可以拿到其他的數據 db: str = self._connection.db user: str = self._connection.user host: str = self._connection.host port: str = self._connection.port execute_result: Tuple[Tuple] = self._rows # 可以根據自己定義的agent把數據發送到指定的平臺, 然後我們就可以在平臺上看到對應的數據或進行監控瞭, # 這裡隻是打印一部分數據出來 print({ "sql": sql, "db": db, "user": user, "host": host, "port": port, "result": execute_result, "speed time": end - start }) return func_result return cast(Callable, wrapper) class MetaPathFinder: @staticmethod def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]: if fullname == 'aiomysql': # 隻有aiomysql才進行hook return MetaPathLoader() else: return None class MetaPathLoader: @staticmethod def load_module(fullname: str): if fullname in sys.modules: return sys.modules[fullname] # 防止遞歸調用 finder: "MetaPathFinder" = sys.meta_path.pop(0) # 導入 module module: ModuleType = importlib.import_module(fullname) # 針對_query進行hook module.Cursor._query = func_wrapper(module.Cursor._query) sys.meta_path.insert(0, finder) return module async def test_mysql() -> None: import aiomysql pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='123123', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() if __name__ == '__main__': sys.meta_path.insert(0, MetaPathFinder()) import asyncio asyncio.run(test_mysql()) # 輸出示例: # 可以看出sql語句與我們輸入的一樣, db, user, host, port等參數也是, 還能知道執行的結果和運行時間 # {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}
這個例子看來很不錯, 但是需要在調用的入口處顯式調用該邏輯, 通常一個項目可能有幾個入口, 每個入口都顯示調用該邏輯會非常麻煩, 而且必須先調用我們的hook邏輯後才能import, 這樣就得訂好引入規范, 不然就可能出現部分地方hook不成功, 如果能把引入hook這個邏輯安排在解析器啟動後馬上執行, 就可以完美地解決這個問題瞭. 查閱瞭一翻資料後發現,python解釋器初始化的時候會自動import PYTHONPATH下存在的sitecustomize和usercustomize模塊, 我們隻要創建該模塊, 並在模塊裡面寫入我們的 替換函數即可。
. ├── __init__.py ├── hook_aiomysql.py ├── sitecustomize.py └── test_auto_hook.py
hook_aiomysql.py是我們制作探針的代碼為例子, 而sitecustomize.py存放的代碼如下, 非常簡單, 就是引入我們的探針代碼, 並插入到sys.meta_path:
import sys from hook_aiomysql import MetaPathFinder sys.meta_path.insert(0, MetaPathFinder())
test_auto_hook.py則是測試代碼:
import asyncio from hook_aiomysql import test_mysql asyncio.run(test_mysql())
接下來隻要設置PYTHONPATH並運行我們的代碼即可(如果是項目的話一般交由superisor啟動,則可以在配置文件中設置好PYTHONPATH):
(.venv) ➜ python_hook git:(master) ✗ export PYTHONPATH=. (.venv) ➜ python_hook git:(master) ✗ python test_auto_hook.py {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}
4.直接替換方法
可以看到上面的方法很好的運行瞭, 而且可以很方便的嵌入到我們的項目中, 但是依賴與sitecustomize.py文件很難讓他抽離成一個第三方的庫, 如果要抽離成第三方的庫就得考慮看看有沒有其他的方法。在上面介紹MetaPathLoader時說到瞭sys.module, 在裡面通過sys.modules來減少重復引入:
class MetaPathLoader: def load_module(self, fullname): # import的模塊都會存放在sys.modules裡面, 通過判斷可以減少重復import if fullname in sys.modules: return sys.modules[fullname] # 防止遞歸調用 finder = sys.meta_path.pop(0) # 導入 module module = importlib.import_module(fullname) if fullname == 'time': # 替換函數 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return module
這個減少重復引入的原理是, 每次引入一個模塊後, 他就會存放在sys.modules, 如果是重復引入, 就會直接刷新成最新引入的模塊。上面之所以會考慮到減少重復import是因為我們不會在程序運行時升級第三方庫的依賴。利用到我們可以不考慮重復引入同名不同實現的模塊, 以及sys.modules會緩存引入模塊的特點, 我們可以把上面的邏輯簡化成引入模塊->替換當前模塊方法為我們修改的hook方法。
import time from functools import wraps from typing import Any, Callable, Tuple, cast import aiomysql def func_wrapper(func: Callable): """和上面一樣的封裝函數, 這裡簡單略過""" # 判斷是否hook過 _IS_HOOK: bool = False # 存放原來的_query _query: Callable = aiomysql.Cursor._query # hook函數 def install_hook() -> None: _IS_HOOK = False if _IS_HOOK: return aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query) _IS_HOOK = True # 還原到原來的函數方法 def reset_hook() -> None: aiomysql.Cursor._query = _query _IS_HOOK = False
代碼簡單明瞭,接下來跑一跑剛才的測試:
import asyncio import aiomysql from demo import install_hook, reset_hook async def test_mysql() -> None: pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() print("install hook") install_hook() asyncio.run(test_mysql()) print("reset hook") reset_hook() asyncio.run(test_mysql()) print("end")
通過測試輸出可以發現我們的邏輯的正確的, install hook後能出現我們提取的元信息, 而reset後則不會打印原信息
install hook {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875} reset hook end
5.總結
得益於Python動態語言的特性, 我們可以很容易的為第三方庫實現鉤子方法,上面說的兩種方法中, 第二種方法非常簡單, 但在自己項目中最好還是采用第一種方法, 因為Python是通過一行一行代碼進行掃描執行的, 第二種方法隻能放在入口代碼中, 並且要在被hook的對象實例化之前執行, 不然就會實現hook失敗的現象, 而第一種方法除瞭麻煩外,基本上能躲避所有坑。
到此這篇關於Python探針完成調用庫的數據提取的文章就介紹到這瞭,更多相關 Python探針 內容請搜索LevelAH以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持LevelAH!
推薦閱讀:
- Python入門基礎之import機制
- Python接入MySQL實現增刪改查的實戰記錄
- 深入瞭解python裝飾器
- python進階之協程你瞭解嗎
- 詳解Python如何利用pymysql封裝項目通用的連接和查詢