基於python實現rpc遠程過程調用

基於python實現RPC的demo

這是一個遠程過程調用(RPC)的實現demo,可以實現不同的python進程之間通信和互相調用函數,簡單易用,易於擴展。更多功能也可進一步完善,本文介紹瞭該實現的主要思路。

前言

計劃手擼一個rpc甚久瞭,在間歇性push自己下終於完成的差不多瞭。寫這個demo的原因,1)是為瞭學習與思考下這部分主體功能和實現思路,2)是調包時可以毫無心理負擔,並產生一種不過如此的優越感。
實現這部分內容主要依據的還是自己的想法,因此可能會有bug或者有更好的實現方式,僅供學習和參考,完整代碼可參考Gitee鏈接。
實現的時候用的是python2.7,忘記換瞭,下次一定更新。

一、主要內容

所謂RPC,是遠程過程調用(Remote Procedure Call)的簡寫,網上解釋很多,簡單來說,就是在當前進程調用其他進程的函數時,體驗就像是調用本地寫的函數一般。
本文實現的是在本地調用遠端的類class對象的接口,也就是本地的client不實例化類對象,調用的是server端的類對象接口。
為瞭達到讓調用層無須關心底層實現,擁有絲滑般的體驗,就需要以下幾個部分:

  • 客戶端需要把類的接口提取出來,並將調用函數事件捕獲存儲起來;服務端需要把類的公有函數作為可遠程調用的接口。
  • 客戶端把調用函數的事件(調用的函數,參數)進行序列化並發送給服務端;服務端將客戶端的調用事件反序列化,並執行相應的接口,將返回值發送給客戶端。
  • 客戶端與服務端通過某種方式(一般就是網絡socket)進行通信。

在下面時序圖的灰色部分,對於調用方來說是透明的,它的執行結果應該和執行本地的函數時一致的。

二、實現步驟

1. 進程間的通信

本文采用瞭基於TCP的sokcet連接來進行進程之間的通信,更多實現細節可參考之前博客。
在此需要註意:

本文采用瞭select模塊來監聽網絡事件,如果服務端未收到任何的網絡消息會一直阻塞在這兒。如果服務端除瞭提供rpc調用服務之外還需要執行其他邏輯,那麼應當采用非阻塞,輪詢socket的方式來判斷是否有新的網絡事件。

# ServerBase.py
def process(self):
    readable, writable, exceptional = select.select(self.inputs, self.outputs, self.conns.values())
    for conn in readable:
        if conn is self.socket:
            self._handle_conn()
        else:
            self._handle_recv(conn)
    for conn in writable:
        pass
    for conn in exceptional:
        self._handle_leave(conn)

客戶端的網絡事件本文通過創建新的線程來監聽的。並不會影響客戶端主線程的執行,因此可以盡情的阻塞。部分代碼如下:

# AsynCallback.py
class AsyncTaskManager(object):
    _asy_events = dict()

    def __init__(self, loop, *args):
        super(AsyncTaskManager, self).__init__()
        self._loop_fun = loop

    def __call__(self, *args, **kwargs):
        proc = threading.Thread(target=self._exec_loop, args=args, kwargs=kwargs)
        proc.start()

    def _exec_loop(self, *args, **kwargs):
        while True:
            net_resp = self._loop_fun(*args, **kwargs)
            for resp in net_resp:
                asy_event = self._asy_events.pop(resp.rid)
                asy_event.set()
# Client.py
class Client(TaskHandle, ClientBase):

    @AsyncTaskManager
    def process(self):
        super(Client, self).process()
        _events = []
        while self.has_events:
            event = self.get_next_event()
            data = event[1]
            _events.append(self.unpack_respond(data))
        return _events

序列化方式,本文采用瞭庫pickle進行序列化與反序列化,使用它的原因是可以將自定義類對象也進行序列化,非常之高級。

2. 異步回調實現思路

對於需要返回值的函數調用,處理起來比較簡單,隻需要將主線程阻塞等待,直至超時或者接收到瞭對應函數的返回值即可。本文采用瞭threading.Event來阻塞與喚醒調用的函數,同時采用瞭裝飾器來實現這功能。若日後有更好的方法,可以輕易進行替換。相關示例代碼如下所示:

@AsyncTaskManager.respond
def _handle_response(self, tid):
    """ 處理有返回值的情況
    會阻塞線程直至收到返回值
    """
    task = self.pop_task(tid)
    if task.callback:
        task.callback()
    return self.pop_respond(tid)

@staticmethod
def respond(func):
    @wraps(func)
    def make_resp(handle, tid):
        """ 需要註意的是,和裝飾的函數參數含義需一致 """
        event = threading.Event()
        AsyncTaskManager._asy_events[tid] = event
        event.wait(timeout=TIME_OUT)
        return func(handle, tid)    # 這兒才是真正執行_handle_response的地方
    return make_resp

在實際的應用過程中,應有這樣的情況,服務端與客戶端都是獨立的應用,通過rpc函數進行通信和交互,而並不是某方為另外一方提供服務,那麼此時返回值並不必要,隻需要將要做的事通知另一方即可。對於此種情況,可以采用異步回調的方式來告知調用方對應函數執行成功瞭。

在文中依舊采用線程來完成該功能,客戶端調用函數之後創建一個新線程並阻塞住,等待服務端將執行結果發回後再喚醒,如果有回調函數就執行。示例代碼如下:

@AsyncTaskManager.callback
def _handle_call_back(self, tid):
    """ 處理有回調函數的調用
    callback會等tid事件調用成功之後 才會回調,且不會有返回值
    """
    task = self.pop_task(tid)
    if task.callback:
        task.callback()
        
@staticmethod
def callback(func):
    @wraps(func)
    def make_thread(event, *args, **kwargs):
        event.wait(timeout=TIME_OUT)
        func(*args, **kwargs)

    def make_async(handle, tid):
        """ 註意點同上 """
        event = threading.Event()
        AsyncTaskManager._asy_events[tid] = event
        _task = threading.Thread(target=lambda: make_thread(event, handle, tid))

    return make_async

總結

到此這篇關於基於python實現rpc遠程過程調用的文章就介紹到這瞭,更多相關python rpc遠程調用內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: