基於Python實現配置熱加載的方法詳解

背景

由於最近工作需求,需要在已有項目添加一個新功能,實現配置熱加載的功能。所謂的配置熱加載,也就是說當服務收到配置更新消息之後,我們不用重啟服務就可以使用最新的配置去執行任務。

如何實現

下面我分別采用多進程、多線程、協程的方式去實現配置熱加載。

使用多進程實現配置熱加載

如果我們代碼實現上使用多進程, 主進程1來更新配置並發送指令,任務的調用是進程2,如何實現配置熱加載呢?

使用signal信號量來實現熱加載

當主進程收到配置更新的消息之後(配置讀取是如何收到配置更新的消息的? 這裡我們暫不討論), 主進程就向進子程1發送kill信號,子進程1收到kill的信號就退出,之後由信號處理函數來啟動一個新的進程,使用最新的配置文件來繼續執行任務。

main 函數

def main():
    # 啟動一個進程執行任務
    p1 = Process(target=run, args=("p1",))
    p1.start()

    monitor(p1, run) # 註冊信號
    processes["case100"] = p1 #將進程pid保存
    num = 0 
    while True: # 模擬獲取配置更新
        print(
            f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
        print(f"{processes=}\n")
        sleep(2)
        if num == 4:
            kill_process(processes["case100"]) # kill 當前進程
        if num == 8:
            kill_process(processes["case100"]) # kill 當前進程
        if num == 12:
            kill_process(processes["case100"]) # kill 當前進程
        num += 1

signal_handler 函數

def signal_handler(process: Process, func, signum, frame):
    # print(f"{signum=}")
    global counts

    if signum == 17:  # 17 is SIGCHILD 
        # 這個循環是為瞭忽略SIGTERM發出的信號,避免搶占瞭主進程發出的SIGCHILD
        for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
            signal.signal(signame, SIG_DFL)

        print("Launch a new process")
        p = multiprocessing.Process(target=func, args=(f"p{counts}",))
        p.start()
        monitor(p, run)
        processes["case100"] = p
        counts += 1

    if signum == 2:
        if process.is_alive():
            print(f"Kill {process} process")
            process.terminate()
        signal.signal(SIGCHLD, SIG_IGN)
        sys.exit("kill parent process")

完整代碼如下

#! /usr/local/bin/python3.8
from multiprocessing import Process
from typing import Dict
import signal
from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN
import multiprocessing
from multiprocessing import Process
from typing import Callable
from data import processes
import sys
from functools import partial
import time

processes: Dict[str, Process] = {}
counts = 2


def run(process: Process):
    while True:
        print(f"{process} running...")
        time.sleep(1)


def kill_process(process: Process):
    print(f"kill {process}")
    process.terminate()


def monitor(process: Process, func: Callable):
    for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]:
        # SIGTERM is kill signal.
        # No SIGCHILD is not trigger singnal_handler,
        # No SIGINT is not handler ctrl+c,
        # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name='<stdout>'>
        signal.signal(signame, partial(signal_handler, process, func))


def signal_handler(process: Process, func, signum, frame):
    print(f"{signum=}")
    global counts

    if signum == 17:  # 17 is SIGTERM
        for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
            signal.signal(signame, SIG_DFL)
        print("Launch a new process")
        p = multiprocessing.Process(target=func, args=(f"p{counts}",))
        p.start()
        monitor(p, run)
        processes["case100"] = p
        counts += 1

    if signum == 2:
        if process.is_alive():
            print(f"Kill {process} process")
            process.terminate()
        signal.signal(SIGCHLD, SIG_IGN)
        sys.exit("kill parent process")


def main():
    p1 = Process(target=run, args=("p1",))
    p1.start()
    monitor(p1, run)
    processes["case100"] = p1
    num = 0
    while True:
        print(
            f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
        print(f"{processes=}\n")
        time.sleep(2)
        if num == 4:
            kill_process(processes["case100"])
        if num == 8:
            kill_process(processes["case100"])
        if num == 12:
            kill_process(processes["case100"])
        num += 1


if __name__ == '__main__':
    main()

執行結果如下

multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1

processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>}

p1 running...
p1 running...
kill <Process name='Process-1' pid=2533 parent=2532 started>
multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1

processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>}

signum=17
Launch a new process
p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1

processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}

p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1

processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}

p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1

processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}

p2 running...
p2 running...
kill <Process name='Process-2' pid=2577 parent=2532 started>
signum=17
Launch a new process
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1

processes={'case100': <Process name='Process-3' pid=2675 parent=2532 started>}

p3 running...
p3 running...
multiprocessing.active_children()=[<Process name='Process-3' pid=2675 parent=2532 started>], count=1

總結

好處:使用信號量可以處理多進程之間通信的問題。

壞處:代碼不好寫,寫出來代碼不好理解。信號量使用必須要很熟悉,不然很容易自己給自己寫瞭一個bug.(所有初學者慎用,老司機除外。)

還有一點不是特別理解的就是process.terminate() 發送出信號是SIGTERM number是15,但是第一次signal_handler收到信號卻是number=17,如果我要去處理15的信號,就會導致前一個進程不能kill掉的問題。歡迎有對信號量比較熟悉的大佬,前來指點迷津,不甚感謝。

采用multiprocessing.Event 來實現配置熱加載

實現邏輯是主進程1 更新配置並發送指令。進程2啟動調度任務。

這時候當主進程1更新好配置之後,發送指令給進程2,這時候的指令就是用Event一個異步事件通知。

直接上代碼

scheduler 函數

def scheduler():
    while True:
        print('wait message...')
        case_configurations = scheduler_notify_queue.get()
        print(f"Got case configurations {case_configurations=}...")

        task_schedule_event.set() # 設置set之後, is_set 為True

        print(f"Schedule will start ...")
        while task_schedule_event.is_set(): # is_set 為True的話,那麼任務就會一直執行
            run(case_configurations)

        print("Clearing all scheduling job ...") 

event_scheduler 函數

def event_scheduler(case_config):

    scheduler_notify_queue.put(case_config)
    print(f"Put cases config to the Queue ...")

    task_schedule_event.clear() # clear之後,is_set 為False
    print(f"Clear scheduler jobs ...")

    print(f"Schedule job ...")

完整代碼如下

import multiprocessing
import time


scheduler_notify_queue = multiprocessing.Queue()
task_schedule_event = multiprocessing.Event()


def run(case_configurations: str):
    print(f'{case_configurations} running...')
    time.sleep(3)


def scheduler():
    while True:
        print('wait message...')
        case_configurations = scheduler_notify_queue.get()

        print(f"Got case configurations {case_configurations=}...")
        task_schedule_event.set()

        print(f"Schedule will start ...")
        while task_schedule_event.is_set():
            run(case_configurations)

        print("Clearing all scheduling job ...")


def event_scheduler(case_config: str):

    scheduler_notify_queue.put(case_config)
    print(f"Put cases config to the Queue ...")

    task_schedule_event.clear()
    print(f"Clear scheduler jobs ...")

    print(f"Schedule job ...")


def main():
    scheduler_notify_queue.put('1')
    p = multiprocessing.Process(target=scheduler)
    p.start()

    count = 1
    print(f'{count=}')
    while True:
        if count == 5:
            event_scheduler('100')
        if count == 10:
            event_scheduler('200')
        count += 1
        time.sleep(1)


if __name__ == '__main__':
    main()

執行結果如下

wait message...
Got case configurations case_configurations='1'...
Schedule will start ...
1 running...
1 running...
Put cases config to the Queue ...
Clear scheduler jobs ...
Schedule job ...
Clearing all scheduling job ...
wait message...
Got case configurations case_configurations='100'...
Schedule will start ...
100 running...
Put cases config to the Queue ...
Clear scheduler jobs ...
Schedule job ...
Clearing all scheduling job ...
wait message...
Got case configurations case_configurations='200'...
Schedule will start ...
200 running...
200 running...

總結

使用Event事件通知,代碼不易出錯,代碼編寫少,易讀。相比之前信號量的方法,推薦大傢多使用這種方式。

使用多線程或協程的方式,其實和上述實現方式一致。唯一區別就是調用瞭不同庫中,queue和 event.

# threading
scheduler_notify_queue = queue.Queue()
task_schedule_event = threading.Event()

# async
scheduler_notify_queue = asyncio.Queue()
task_schedule_event = asyncio.Event()

結語

具體的實現的方式有很多,也各自有各自的優劣勢。我們需要去深刻理解到需求本身,才去做技術選型。

以上就是基於Python實現配置熱加載的方法詳解的詳細內容,更多關於Python配置熱加載的資料請關註WalkonNet其它相關文章!

推薦閱讀: