Python bsonrpc源碼解讀

bsonrpc 是python中⼀個基於json或bson的遠程過程調⽤的庫,提供瞭服務端與客戶端實現,其底層采⽤的是基於TCP連接的通信。

程序結構

bsonrpc主要包括以下⽂件:

  1. concurrent.py:針對兩種並發⽅式(threading線程對象、gevent協程對象)涉及的相應組件(Queue,Event,Lock等)提供統⼀的對外的⽣成接⼝:spawn(),new_promise(),new_queue(), new_lock()等;
  2. definitions.py:定義rpc的消息結構和錯誤編碼;
  3. dispatcher.py:rpc的處理調度,路由處理(消息對應的處理函數);
  4. exceptions.py:異常定義;
  5. framing.py:定義不同類實現JSON RPC 2.0標準中的不同消息結構;
  6. interfaces.py:定義提供服務的裝飾器;
  7. misc.py:該⽂件中定義瞭⼀個id⽣成器,從1開始累加。
  8. options.py:定義配置選項。
  9. rpc.py:主要為BSONRpc和JSONRpc類的實現;
  10. socket_queue.py:主要為消息的拆包組包部分;
  11. util.py:系統⼯具。

本⽂主要描述庫包中對於不同協議的分包組包的處理,涉及到socket_queue.py和framing.py⽂件,主要采⽤的是對象組合的技術。

解讀

socket_queue.py中的SocketQueue類是⽤來處理從socket接收數據,主要的⽅法為_receiver()和put()⽅法,分別對應分包和組包,分包的主要內容如下:

def _receiver(self):
  bbuffer = b''
  while True:
    try:
      chunk = self.socket.recv(self.BUFSIZE) # 從socket上接收數據
      bbuffer = self._to_queue(bbuffer + chunk) # 數據分包
    except DecodingError as e:
      self._queue.put(e)
    # 後⾯省略...
def _to_queue(self, bbuffer):
  b_msg, bbuffer = self.codec.extract_message(bbuffer) # 解碼器提取完整的信息
  while b_msg is not None:
    self._queue.put(self.codec.loads(b_msg)) # 解碼後的消息放⼊消息隊列中等待處理
    b_msg, bbuffer = self.codec.extract_message(bbuffer)
  return bbuffer

組包的主要內容如下:

def put(self, item):
  if self._closed:
    raise BsonRpcError('Attempt to put items to closed queue.')
  msg_bytes = self.codec.into_frame(self.codec.dumps(item)) # 組包
  with self._lock:
    self.socket.sendall(msg_bytes)

如上圖所示,程序采⽤的是對象組合的⽅式實現消息分包處理的。對象組合是繼承之外的另⼀種選擇,對象組合要求被組合的對象具有良好定義的接⼝,通過接⼝的⽅式調⽤其他對象的功能,這個也被“⿊箱復⽤”,因為對象的內部細節是不可⻅的。SocketQueue中依賴Codec的extract_message()接⼝⽅法,不⽤關⼼其具體的實現⽅法。具體實現由JSONCodec和BSONCode進⾏實現。JSONCodec中依賴JSONFrame中的extract_message()接⼝⽅法,該接⼝⽅法的實現由JSONFramingNone、JSONFramingNetstring、JSONFramingRFC7464進⾏實現。SocketQueue消息組包過程依賴於into_frame()⽅法,也是通過對象組合實現的。

註:圖中的接⼝為瞭⼤傢容易理解才加上瞭,源碼⾥⾯並沒有。

以上就是Python bsonrpc源碼解讀的詳細內容,更多關於Python bsonrpc源碼的資料請關註WalkonNet其它相關文章!

推薦閱讀: