Python網絡編程之ZeroMQ知識總結

一、ZeroMQ概述

  •  ZeroMQ(又名ØMQ,MQ,或zmq)像一個可嵌入的網絡庫,但其作用就像一個並發框架。
  • ZeroMQ類似於標準Berkeley套接字,其提供瞭各種傳輸工具,如進程內、進程間、TCP和組播中進行原子消息傳送的套接字
  • 可以使用各種模式實現N對N的套接字連接,這些模式包括:發佈-訂閱、任務分配、請求-應答。
  • ZeroMQ的速度足夠快,因此可充當集群產品的結構。
  • ZeroMQ的異步I/O模型提供瞭可擴展的多核應用程序,用異步消息來處理任務
  • ZeroMQ核心由C語言編寫,支持C、C++、java、python等多種編程語言的API,並可運行在大多數操作系統上

總結以下:ØMQ (ZeroMQ) 是一個基於消息隊列的多線程網絡庫,它封裝瞭網絡通信、消息隊列、線程調度等功能,向上層提供簡潔的API,應用程序通過加載庫文件,調用API函數來實現高性能網絡通信。

看起來有些抽象,下面我們結合ZeroMQ 的 Python 封裝———— pyzmp,用實例看一下ZeroMQ的三種最基本的工作模式。

二、安裝

安裝方法

pip install pyzmq

查看是否安裝成功

>>> import zmq
>>> print(zmq.__version__)
22.0.3

三、Request-Reply (請求響應模式)

3.1 Request-Reply模式概述:

  • 消息雙向的,有來有往。
  • Client請求的消息,Server必須答復給Client。
  • Client在請求後,Server必須回響應,註意:Server不返回響應會報錯。
  • Server和Client都可以是1:N的模型。通常把1認為是Server,N認為是Client。
  • 更底層的端點地址是對上層隱藏的,每個請求都隱含回應地址,而應用則不關心它。
  • ZMQ 可以很好的支持路由功能(實現路由功能的組件叫做 Device),把 1:N 擴展為 N:M(隻需要加入若幹路由節點)。

Request-Reply流程

3.2 Client端python實現

#client.py

import zmq

context = zmq.Context()

#  Socket to talk to server
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
socket.send(b"Hello")
#  Get the reply.
message = socket.recv()
print(f"Received reply [ {message} ]")

3.3 Server端python實現

#server.py
import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:
    #  Wait for next request from client
    message = socket.recv()
    print("Received request: %s" % message)

    #  Do some 'work'
    time.sleep(1)

    #  Send reply back to client
    socket.send(b"World")
  • 啟動client.py 首先會打印Connecting to hello world server… 但不會受到任何消息。
  • 然後啟動server.py ,客戶端收到來自客戶端的request: b’Hello’
  • 此時client端收到來自server端的 reply: [ b’World’ ]
python client.py 
Connecting to hello world server…
Received reply [ b'World' ]
python server.py 
Received request: b'Hello'

可以試一下,多運行幾個client.py,看看情況是什麼樣的。

Server和Client都可以是1:N的模型

四、Publish/Subscribe(訂閱-發佈模式 )

4.1 Pub-Subs模式概述:

  • 消息單向,有去無回
  • 一個發佈端,多個訂閱端;發佈端隻管產生數據,發佈端發佈一條消息,可被多個訂閱端同時收到。
  • 發佈者不必關心訂閱者的加入和離開,消息會以 1:N 的方式擴散到每個訂閱者。
  • 廣播所有client,沒有隊列緩存,斷開連接數據將永遠丟失。
  • 如果Publish端開始發佈信息時,Subscribe端尚未連接進來,則這些信息會被直接丟棄。
  • PUB和SUB誰bind誰connect並無嚴格要求(雖本質並無區別),但仍建議PUB使用bind,SUB使用connect
  • 使用SUB設置一個訂閱時,必須使用zmq_setsockopt()對消息進行過濾

Pub-Subs模式流程

這裡直接引用官方文檔的例子:

發佈者:類似於一個天氣更新服務器,向訂閱者發送天氣更新,內容包括郵政編碼、溫度、濕度等信息

#Publisher.py
import zmq
from random import randrange


context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

while True:
    zipcode = randrange(1, 100000)
    temperature = randrange(-80, 135)
    relhumidity = randrange(10, 60)

    socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))

訂閱者:它監聽發佈者更新的數據流,過濾隻接收與特定郵政編碼相關的天氣信息,默認接收接收10條數據

#Subscribe.py 
import sys
import zmq


#  Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting updates from weather server...")
socket.connect("tcp://localhost:5556")

# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"

# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
    zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)

# Process 5 updates
total_temp = 0
for update_nbr in range(5):
    string = socket.recv_string()
    zipcode, temperature, relhumidity = string.split()
    total_temp += int(temperature)

print(
    "Average temperature for zipcode '%s' was %dF"
    % (zip_filter, total_temp / (update_nbr + 1))
)

消息會以 1:N 的方式擴散到每個訂閱者

五、Push/Pull(流水線模式)

5.1 流水線模式概述:

  • 主要用於多任務並行。
  • 消息單向,有去無回。
  • Push的任何一個消息,始終隻會有一個Pull端收到消息。
  • Push 端還是 Pull 端都可以做 server,bind 到某個地址等待對方訪問。
  • 如果有多個PULL端同時連接到PUSH端,則PUSH端會在內部做一個負載均衡,采用平均分配的算法,將所有消息均衡發佈到PULL端上。
  • 由三部分組成,Push進行數據推送,work進行數據緩存,Pull進行數據競爭獲取處理。
  • 存在一個數據緩存和處理負載,當連接被斷開,數據不會丟失,重連後數據繼續發送到對端。

Push/Pull模式流程

ventilator 使用的是 SOCKET_PUSH,將任務分發到 Worker 節點上。Worker 節點上,使用 SOCKET_PULL 從上遊接受任務,並使用 SOCKET_PUSH 將結果匯集到 Sink。值得註意的是,任務的分發的時候也同樣有一個負載均衡的路由功能,worker 可以隨時自由加入,ventilator 可以均衡將任務分發出去。

Push/Pull模式還是蠻常用的,這裡我們主要測試一下它的負載均衡。

5.2 Ventilator

# ventilator.py
import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")

while True:
    socket.send(b"test")
    print("已發送")
    time.sleep(1)

5.3 worker

# worker.py
import zmq

context = zmq.Context()

recive = context.socket(zmq.PULL)
recive.connect('tcp://127.0.0.1:5557')

sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5558')

while True:
    data = recive.recv()
    print("work1 正在轉發...")
    sender.send(data)

5.4 sink

# sink.py
import zmq
import sys

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5558")

while True:
    response = socket.recv()
    print("response: %s" % response)

打開4個Terminal,分別運行

python sink.py
python worker.py
python worker.py
python ventilator.py

采用平均分配的算法,將所有消息均衡發佈到PULL端上

六、總結

消息模型可以根據需要組合使用,後續的代理模式和路由模式等都是在三種基本模式上面的擴展或變異。

到此這篇關於Python網絡編程之ZeroMQ知識總結的文章就介紹到這瞭,更多相關Python ZeroMQ知識總結內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: