python中進程間通信及設置狀態量控制另一個進程

一、python中進程間通信

業務場景:在當前遇到的業務場景中,我們需要啟一個間隔任務,這個間隔任務跑一個算法,然後把算法的結果進行一些處理,並入庫。任務目前間隔是一小時,算法運行時間要50多分鐘,留給結果處理的時間並不多,所以有可能會出現超時。目前來說,優化方向在算法上會更為合理,因為結果處理本來就不用很多時間。但是在這個業務場景下,想要把結果處理的時間進行無限壓縮,壓縮到0,其實也是可以實現的,說是壓縮為0,實際上就是在算法執行完成後,再啟一個進程去處理,這樣就不會由於需要進行數據處理而影響到算法的運行,將算法和結果處理分為兩個獨立的進程去處理。在最開始的程序中,是把算法運行和結果處理作為一個周期,而現在是把算法運行和結果處理分為兩個周期去處理。

技術實現方案:

啟動二個進程,其中一個運行算法,在算法運行結束後,發送一個狀態值到另外一個進程,另外一個進程在收到狀態量後啟動數據處理即可。兩個進程間互不影響即可。其實也相當於算法進程控制數據處理進程

測試場景構造代碼:

from multiprocessing import Process,Pipe
import time
import sys
import os
def send_message(conn):
    for i in range(1000):
        print('send_message:%d'%i)
        print(os.getpid())
        conn.send(i)
        time.sleep(3)
def send_message1(conn):
    # for i in range(1000):
    print(conn.recv())
    while True:
        if conn.recv() % 5 == 0:
            print(' today is nice day')
        time.sleep(1)
if __name__ == '__main__':
        #創建一個進程通信管道
    left,right = Pipe()
    t1 = Process(target=send_message,args=(left,))
    t2 = Process(target=send_message1,args=(right,))
    t1.start()
    t2.start()

在這個案例場景下有一些需要註意的點:

  • 一、time.sleep()的問題,睡眠指定時間,總是會出錯,具體的出錯原因到現在也沒有找到,這是原來出現的問題,在這裡沒有做長時間的測試,所以不一定會出現,但是還是要註意
  • 二、代碼實現中與上述的描述差異有一些,如未啟用調度任務,隻是啟瞭一個間隔運行的任務。
  • 三、數據處理進程一直處理空跑狀態,會造成資源的浪費(更合理的應該是形成阻塞狀態,但是對於阻塞狀態的構造缺乏認知,所以先犧牲資源
  • 四、在上述描述的需求中,在算法運行及數據處理的上一節點還有一個調度任務在控制,這裡未做出體現,其實應該把定時任務和數據處理作為兩個周期獨立出來才更符合上述描述中的需求。

二、設置狀態量控制另一個進程

 業務場景:在當前遇到的業務場景中,我們需要啟一個間隔任務,這個間隔任務跑一個算法,然後把算法的結果進行一些處理,並入庫。任務目前間隔是一小時,算法運行時間要50多分鐘,留給結果處理的時間並不多,所以有可能會出現超時。目前來說,優化方向在算法上會更為合理,因為結果處理本來就不用很多時間。但是在這個業務場景下,想要把結果處理的時間進行無限壓縮,壓縮到0,其實也是可以實現的,說是壓縮為0,實際上就是在算法執行完成後,再啟一個進程去處理,這樣就不會由於需要進行數據處理而影響到算法的運行,將算法和結果處理分為兩個獨立的進程去處理。在最開始的程序中,是把算法運行和結果處理作為一個周期,而現在是把算法運行和結果處理分為兩個周期去處理。

上面的解決方案中隻涉及到瞭啟用兩個進程去運行兩個任務,並未涉及到啟用定時任務框架,所以可能會顯得和上述的業務場景不一致,所以在這裡重新解決一下。上面也是沒有問題的,隻是把定時任務框架也作為一個任務去處理即可。然後在定時任務運行完程後,向另外一個進程傳入一個參數,作為啟動另一個進程的狀態量即可。當然,在這裡,兩個進程還是完全占滿的,即處理阻塞狀態。對於資源的利用還是沒有完全達到最好。後續再考慮使用進程池的方式,看是否可以讓其中的一個進程運行完後直接釋放資源。

技術解決方案如下:

from multiprocessing import Process,Pipe
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# schedule = BackgroundScheduler()
schedule = BlockingScheduler(timezone="Asia/Shanghai")
# schedule = AsyncIOScheduler(timezone="Asia/Shanghai")
def algorithm(conn):
    print('start_run')
    conn.send('please run')
    # time.sleep(5)
def worth_result(conn):
    while True:
        if conn.recv() == 'please run':
            print(conn.recv() + ' very nice!')
def time_job(conns):
    schedule.add_job(func=algorithm,trigger='interval',seconds=5,args=(conns,))
    schedule.start()
if __name__ == '__main__':
    left,right = Pipe()
    t1 = Process(target=time_job,args=(left,))
    t2 = Process(target=worth_result,args=(right,))
    t1.start()
    t2.start()

在這裡還有一些點需要說明,定時任務選擇那一種類型其實都沒有關系,阻塞和非阻塞其實沒有關系,因為我們在這裡是直接啟瞭兩個進程,每個進程間是相互獨立的,並非是在定時任務下啟用的兩個進程,所以不會影響的。

關於這個解決方案還有的問題:

  • 一、上述所說,兩個進程是占滿的,所以對於資源來說,兩個進程的利用率一直很高
  • 二、擴展性不足,如果在這個程序中還有其他需要處理的過程,就需要再添加進程,或者把他添加到當前的進程之下,代碼重構會比較麻煩一些
  • 三、整個任務的控制不足,需要加以完善。比如對於運行狀態一些控制及查看,一般程序如果運行時間較長的話,我們應該添加這樣的接口,否則啟動後如果沒有出結果,我們是不知道其運行狀態,有一點被動
  • 四、關於三,使用logging庫,應該是可以直接去輸出其日志,但是日志庫作為第三方庫,相當於是對整個運行狀態進行監控,會不會再占用一個進程,這個需要去測試
  • 五、完備性及容災處理,如果程序由於資源等其他問題掛掉後,會有一些數據冗餘下來,也就是一些算法未進行處理,這個時候需要考慮怎麼樣去補數據?原始文件如果沒有保留下來呢?而且如果這些數據是極重要的數據該怎麼處理?如果程序掛掉後,應該如何快速的去處理呢?直接重啟嗎?
  • 六、如果數據處理的進程所用的時間比算法還多,那該怎麼辦?目前的業務來看,是遠低於的,但是如果是遠高於呢?可否將處理工作進行分配,利用多臺機器來處理,然後再把結果合並起來?

分佈式處理的思想越來越濃。

到此這篇關於python中進程間通信及設置狀態量控制另一個進程的文章就介紹到這瞭,更多相關python進程通信內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: