Python 4種實現定時任務的方案

1.利用 while True: + sleep() 實現定時任務

位於 time 模塊中的 sleep(secs) 函數,可以實現令當前執行的線程暫停 secs 秒後再繼續執行。所謂暫停,即令當前線程進入阻塞狀態,當達到 sleep() 函數規定的時間後,再由阻塞狀態轉為就緒狀態,等待 CPU 調度。

基於這樣的特性我們可以通過 while 死循環+sleep() 的方式實現簡單的定時任務。

代碼示例:

import datetime

import time

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

def loop_monitor():

    while True:

        time_printer()

        time.sleep(5)  # 暫停 5 秒

if __name__ == "__main__":

    loop_monitor()

主要缺點:

  • 隻能設定間隔,不能指定具體的時間,比如每天早上 8:00
  • sleep 是一個阻塞函數,也就是說 sleep 這一段時間,程序什麼也不能操作。

2.使用 Timeloop 庫運行定時任務

Timeloop[2] 是一個庫,可用於運行多周期任務。這是一個簡單的庫,它使用 decorator 模式在線程中運行標記函數。

示例代碼:

import time

from timeloop import Timeloop

from datetime import timedelta

tl = Timeloop()

@tl.job(interval=timedelta(seconds=2))

def sample_job_every_2s():

    print "2s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=5))

def sample_job_every_5s():

    print "5s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=10))

def sample_job_every_10s():

    print "10s job current time : {}".format(time.ctime())

3.利用 threading.Timer 實現定時任務

threading 模塊中的 Timer 是一個非阻塞函數,比 sleep 稍好一點,timer 最基本理解就是定時器,我們可以啟動多個定時任務,這些定時器任務是異步執行,所以不存在等待順序執行問題。

Timer(interval, function, args=[ ], kwargs={ })

  • interval: 指定的時間
  • function: 要執行的方法
  • args/kwargs: 方法的參數

代碼示例:

import datetime

from threading import Timer

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

    loop_monitor()

def loop_monitor():

    t = Timer(5, time_printer)

    t.start()

if __name__ == "__main__":

    loop_monitor()

備註:Timer 隻能執行一次,這裡需要循環調用,否則隻能執行一次

4.利用內置模塊 sched 實現定時任務

sched 模塊實現瞭一個通用事件調度器,在調度器類使用一個延遲函數等待特定的時間,執行任務。同時支持多線程應用程序,在每個任務執行後會立刻調用延時函數,以確保其他線程也能執行。

class sched.scheduler(timefunc, delayfunc) 這個類定義瞭調度事件的通用接口,它需要外部傳入兩個參數,timefunc 是一個沒有參數的返回時間類型數字的函數(常用使用的如 time 模塊裡面的 time),delayfunc 應該是一個需要一個參數來調用、與 timefunc 的輸出兼容、並且作用為延遲多個時間單位的函數(常用的如 time 模塊的 sleep)。

代碼示例:

import datetime

import time

import sched

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

    loop_monitor()

def loop_monitor():

    s = sched.scheduler(time.time, time.sleep)  # 生成調度器

    s.enter(5, 1, time_printer, ())

    s.run()

if __name__ == "__main__":

    loop_monitor()

scheduler 對象主要方法:

  • enter(delay, priority, action, argument) ,安排一個事件來延遲 delay 個時間單位。
  • cancel(event):從隊列中刪除事件。如果事件不是當前隊列中的事件,則該方法將跑出一個 ValueError
  • run():運行所有預定的事件。這個函數將等待(使用傳遞給構造函數的 delayfunc() 函數),然後執行事件,直到不再有預定的事件。

個人點評:比 threading.Timer 更好,不需要循環調用。

到此這篇關於Python 4種實現定時任務的方案的文章就介紹到這瞭,更多相關Python 實現定時任務方案內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: