Python bsonrpc源碼解讀
bsonrpc 是python中⼀個基於json或bson的遠程過程調⽤的庫,提供瞭服務端與客戶端實現,其底層采⽤的是基於TCP連接的通信。
程序結構
bsonrpc主要包括以下⽂件:
- concurrent.py:針對兩種並發⽅式(threading線程對象、gevent協程對象)涉及的相應組件(Queue,Event,Lock等)提供統⼀的對外的⽣成接⼝:spawn(),new_promise(),new_queue(), new_lock()等;
- definitions.py:定義rpc的消息結構和錯誤編碼;
- dispatcher.py:rpc的處理調度,路由處理(消息對應的處理函數);
- exceptions.py:異常定義;
- framing.py:定義不同類實現JSON RPC 2.0標準中的不同消息結構;
- interfaces.py:定義提供服務的裝飾器;
- misc.py:該⽂件中定義瞭⼀個id⽣成器,從1開始累加。
- options.py:定義配置選項。
- rpc.py:主要為BSONRpc和JSONRpc類的實現;
- socket_queue.py:主要為消息的拆包組包部分;
- util.py:系統⼯具。
本⽂主要描述庫包中對於不同協議的分包組包的處理,涉及到socket_queue.py和framing.py⽂件,主要采⽤的是對象組合的技術。
解讀
socket_queue.py中的SocketQueue類是⽤來處理從socket接收數據,主要的⽅法為_receiver()和put()⽅法,分別對應分包和組包,分包的主要內容如下:
def _receiver(self): bbuffer = b'' while True: try: chunk = self.socket.recv(self.BUFSIZE) # 從socket上接收數據 bbuffer = self._to_queue(bbuffer + chunk) # 數據分包 except DecodingError as e: self._queue.put(e) # 後⾯省略... def _to_queue(self, bbuffer): b_msg, bbuffer = self.codec.extract_message(bbuffer) # 解碼器提取完整的信息 while b_msg is not None: self._queue.put(self.codec.loads(b_msg)) # 解碼後的消息放⼊消息隊列中等待處理 b_msg, bbuffer = self.codec.extract_message(bbuffer) return bbuffer
組包的主要內容如下:
def put(self, item): if self._closed: raise BsonRpcError('Attempt to put items to closed queue.') msg_bytes = self.codec.into_frame(self.codec.dumps(item)) # 組包 with self._lock: self.socket.sendall(msg_bytes)
如上圖所示,程序采⽤的是對象組合的⽅式實現消息分包處理的。對象組合是繼承之外的另⼀種選擇,對象組合要求被組合的對象具有良好定義的接⼝,通過接⼝的⽅式調⽤其他對象的功能,這個也被“⿊箱復⽤”,因為對象的內部細節是不可⻅的。SocketQueue中依賴Codec的extract_message()接⼝⽅法,不⽤關⼼其具體的實現⽅法。具體實現由JSONCodec和BSONCode進⾏實現。JSONCodec中依賴JSONFrame中的extract_message()接⼝⽅法,該接⼝⽅法的實現由JSONFramingNone、JSONFramingNetstring、JSONFramingRFC7464進⾏實現。SocketQueue消息組包過程依賴於into_frame()⽅法,也是通過對象組合實現的。
註:圖中的接⼝為瞭⼤傢容易理解才加上瞭,源碼⾥⾯並沒有。
以上就是Python bsonrpc源碼解讀的詳細內容,更多關於Python bsonrpc源碼的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- Python使用UDP實現720p視頻傳輸的操作
- 消息中間件詳解以及比較選擇
- Python利用tkinter和socket實現端口掃描
- Python3 如何開啟自帶http服務
- SpringBoot集成RabbitMQ和概念介紹