基於python實現rpc遠程過程調用
基於python實現RPC的demo
這是一個遠程過程調用(RPC)的實現demo,可以實現不同的python進程之間通信和互相調用函數,簡單易用,易於擴展。更多功能也可進一步完善,本文介紹瞭該實現的主要思路。
前言
計劃手擼一個rpc甚久瞭,在間歇性push自己下終於完成的差不多瞭。寫這個demo的原因,1)是為瞭學習與思考下這部分主體功能和實現思路,2)是調包時可以毫無心理負擔,並產生一種不過如此的優越感。
實現這部分內容主要依據的還是自己的想法,因此可能會有bug或者有更好的實現方式,僅供學習和參考,完整代碼可參考Gitee鏈接。
實現的時候用的是python2.7,忘記換瞭,下次一定更新。
一、主要內容
所謂RPC,是遠程過程調用(Remote Procedure Call)的簡寫,網上解釋很多,簡單來說,就是在當前進程調用其他進程的函數時,體驗就像是調用本地寫的函數一般。
本文實現的是在本地調用遠端的類class對象的接口,也就是本地的client不實例化類對象,調用的是server端的類對象接口。
為瞭達到讓調用層無須關心底層實現,擁有絲滑般的體驗,就需要以下幾個部分:
- 客戶端需要把類的接口提取出來,並將調用函數事件捕獲存儲起來;服務端需要把類的公有函數作為可遠程調用的接口。
- 客戶端把調用函數的事件(調用的函數,參數)進行序列化並發送給服務端;服務端將客戶端的調用事件反序列化,並執行相應的接口,將返回值發送給客戶端。
- 客戶端與服務端通過某種方式(一般就是網絡socket)進行通信。
在下面時序圖的灰色部分,對於調用方來說是透明的,它的執行結果應該和執行本地的函數時一致的。
二、實現步驟
1. 進程間的通信
本文采用瞭基於TCP的sokcet連接來進行進程之間的通信,更多實現細節可參考之前博客。
在此需要註意:
本文采用瞭select模塊來監聽網絡事件,如果服務端未收到任何的網絡消息會一直阻塞在這兒。如果服務端除瞭提供rpc調用服務之外還需要執行其他邏輯,那麼應當采用非阻塞,輪詢socket的方式來判斷是否有新的網絡事件。
# ServerBase.py def process(self): readable, writable, exceptional = select.select(self.inputs, self.outputs, self.conns.values()) for conn in readable: if conn is self.socket: self._handle_conn() else: self._handle_recv(conn) for conn in writable: pass for conn in exceptional: self._handle_leave(conn)
客戶端的網絡事件本文通過創建新的線程來監聽的。並不會影響客戶端主線程的執行,因此可以盡情的阻塞。部分代碼如下:
# AsynCallback.py class AsyncTaskManager(object): _asy_events = dict() def __init__(self, loop, *args): super(AsyncTaskManager, self).__init__() self._loop_fun = loop def __call__(self, *args, **kwargs): proc = threading.Thread(target=self._exec_loop, args=args, kwargs=kwargs) proc.start() def _exec_loop(self, *args, **kwargs): while True: net_resp = self._loop_fun(*args, **kwargs) for resp in net_resp: asy_event = self._asy_events.pop(resp.rid) asy_event.set()
# Client.py class Client(TaskHandle, ClientBase): @AsyncTaskManager def process(self): super(Client, self).process() _events = [] while self.has_events: event = self.get_next_event() data = event[1] _events.append(self.unpack_respond(data)) return _events
序列化方式,本文采用瞭庫pickle進行序列化與反序列化,使用它的原因是可以將自定義類對象也進行序列化,非常之高級。
2. 異步回調實現思路
對於需要返回值的函數調用,處理起來比較簡單,隻需要將主線程阻塞等待,直至超時或者接收到瞭對應函數的返回值即可。本文采用瞭threading.Event來阻塞與喚醒調用的函數,同時采用瞭裝飾器來實現這功能。若日後有更好的方法,可以輕易進行替換。相關示例代碼如下所示:
@AsyncTaskManager.respond def _handle_response(self, tid): """ 處理有返回值的情況 會阻塞線程直至收到返回值 """ task = self.pop_task(tid) if task.callback: task.callback() return self.pop_respond(tid) @staticmethod def respond(func): @wraps(func) def make_resp(handle, tid): """ 需要註意的是,和裝飾的函數參數含義需一致 """ event = threading.Event() AsyncTaskManager._asy_events[tid] = event event.wait(timeout=TIME_OUT) return func(handle, tid) # 這兒才是真正執行_handle_response的地方 return make_resp
在實際的應用過程中,應有這樣的情況,服務端與客戶端都是獨立的應用,通過rpc函數進行通信和交互,而並不是某方為另外一方提供服務,那麼此時返回值並不必要,隻需要將要做的事通知另一方即可。對於此種情況,可以采用異步回調的方式來告知調用方對應函數執行成功瞭。
在文中依舊采用線程來完成該功能,客戶端調用函數之後創建一個新線程並阻塞住,等待服務端將執行結果發回後再喚醒,如果有回調函數就執行。示例代碼如下:
@AsyncTaskManager.callback def _handle_call_back(self, tid): """ 處理有回調函數的調用 callback會等tid事件調用成功之後 才會回調,且不會有返回值 """ task = self.pop_task(tid) if task.callback: task.callback() @staticmethod def callback(func): @wraps(func) def make_thread(event, *args, **kwargs): event.wait(timeout=TIME_OUT) func(*args, **kwargs) def make_async(handle, tid): """ 註意點同上 """ event = threading.Event() AsyncTaskManager._asy_events[tid] = event _task = threading.Thread(target=lambda: make_thread(event, handle, tid)) return make_async
總結
到此這篇關於基於python實現rpc遠程過程調用的文章就介紹到這瞭,更多相關python rpc遠程調用內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Python Asyncio調度原理詳情
- 徹底弄懂Python中的回調函數(callback)
- python協程與 asyncio 庫詳情
- python在協程中增加任務實例操作
- 熱門問題python爬蟲的效率如何提高