Laravel中schedule調度的運行機制

Laravel 的 console 命令行極大的方便瞭 PHP 定時任務的設置以及運行。以往通過 crontab 配置定時任務過程相對比較繁瑣,並且通過 crontab 設置的定時任務很難防止任務的交疊運行。

所謂任務的交疊運行,是指由於定時任務運行時間較長,在 crontab 設置的運行周期不盡合理的情況下,已經啟動的任務還沒有結束運行,而系統又啟動瞭新的任務去執行相同的操作。如果程序內部沒有處理好數據一致性的問題,那麼兩個任務同時操作同一份數據,很可能會導致嚴重的後果。

⒈ runInBackground 和 withoutOverlapping

為瞭防止任務的交疊運行,Laravel 提供瞭 withoutOverlapping() 方法;為瞭能讓多任務在後臺並行執行,Laravel 提供瞭 runInBackground() 方法。

⑴ runInBackground() 方法

console 命令行中的每一個命令都代表一個 Event ,\App\Console\Kernel 中的 schedule() 方法的作用隻是將這些命令行代表的 Event 註冊到 Illuminate\Console\Scheduling\Schedule 的屬性 $events 中。

// namespace \Illuminate\Console\Scheduling\Schedule

public function command($command, array $parameters = [])
{
    if (class_exists($command)) {
        $command = Container::getInstance()->make($command)->getName();
    }

    return $this->exec(
        Application::formatCommandString($command), $parameters
    );
}

public function exec($command, array $parameters = [])
{
    if (count($parameters)) {
        $command .= ' '.$this->compileParameters($parameters);
    }

    $this->events[] = $event = new Event($this->eventMutex, $command, $this->timezone);

    return $event;
}

Event 的運行方式有兩種:Foreground 和 Background 。二者的區別就在於多個 Event 是否可以並行執行。Event 默認以 Foreground 的方式運行,在這種運行方式下,多個 Event 順序執行,後面的 Event 需要等到前面的 Event 運行完成之後才能開始執行。

但在實際應用中,我們往往是希望多個 Event 可以並行執行,此時就需要調用 Event 的 runInBackground() 方法將其運行方式設置為 Background 。
Laravel 框架對這兩種運行方式的處理區別在於命令行的組裝方式和回調方法的調用方式。

// namespace \Illuminate\Console\Scheduling\Event
protected function runCommandInForeground(Container $container)
{
    $this->callBeforeCallbacks($container);

    $this->exitCode = Process::fromShellCommandline($this->buildCommand(), base_path(), null, null, null)->run();

    $this->callAfterCallbacks($container);
}

protected function runCommandInBackground(Container $container)
{
    $this->callBeforeCallbacks($container);

    Process::fromShellCommandline($this->buildCommand(), base_path(), null, null, null)->run();
}

public function buildCommand()
{
    return (new CommandBuilder)->buildCommand($this);
}

// namespace Illuminate\Console\Scheduling\CommandBuilder
public function buildCommand(Event $event)
{
    if ($event->runInBackground) {
        return $this->buildBackgroundCommand($event);
    }

    return $this->buildForegroundCommand($event);
}

protected function buildForegroundCommand(Event $event)
{
    $output = ProcessUtils::escapeArgument($event->output);

    return $this->ensureCorrectUser(
        $event, $event->command.($event->shouldAppendOutput ? ' >> ' : ' > ').$output.' 2>&1'
    );
}

protected function buildBackgroundCommand(Event $event)
{
    $output = ProcessUtils::escapeArgument($event->output);

    $redirect = $event->shouldAppendOutput ? ' >> ' : ' > ';

    $finished = Application::formatCommandString('schedule:finish').' "'.$event->mutexName().'"';

    if (windows_os()) {
        return 'start /b cmd /c "('.$event->command.' & '.$finished.' "%errorlevel%")'.$redirect.$output.' 2>&1"';
    }

    return $this->ensureCorrectUser($event,
        '('.$event->command.$redirect.$output.' 2>&1 ; '.$finished.' "$?") > '
        .ProcessUtils::escapeArgument($event->getDefaultOutput()).' 2>&1 &'
    );
}

從代碼中可以看出,采用 Background 方式運行的 Event ,其命令行在組裝的時候結尾會增加一個 & 符號,其作用是使命令行程序進入後臺運行;另外,采用 Foreground 方式運行的 Event ,其回調方法是同步調用的,而采用 Background 方式運行的 Event ,其 after 回調則是通過 schedule:finish 命令行來執行的。

⑵ withoutOverlapping() 方法

在設置 Event 的運行周期時,由於應用場景的不斷變化,很難避免某個特定的 Event 在某個時間段內需要運行較長的時間才能完成,甚至在下一個運行周期開始時還沒有執行完成。如果不對這種情況進行處理,就會導致多個相同的 Event 同時運行,而如果這些 Event 當中涉及到對數據的操作並且程序中沒有處理好冪等問題,很可能會造成嚴重後果。
為瞭避免出現上述的問題,Event 中提供瞭 withoutOverlapping() 方法,該方法通過將 Event 的 withoutOverlapping 屬性設置為 TRUE ,在每次要執行 Event 時會檢查當前是否存在正在執行的相同的 Event ,如果存在,則不執行新的 Event 任務。

// namespace Illuminate\Console\Scheduling\Event
public function withoutOverlapping($expiresAt = 1440)
{
    $this->withoutOverlapping = true;

    $this->expiresAt = $expiresAt;

    return $this->then(function () {
        $this->mutex->forget($this);
    })->skip(function () {
        return $this->mutex->exists($this);
    });
}

public function run(Container $container)
{
    if ($this->withoutOverlapping &&
        ! $this->mutex->create($this)) {
        return;
    }

    $this->runInBackground
                ? $this->runCommandInBackground($container)
                : $this->runCommandInForeground($container);
}

⒉ mutex 互斥鎖

在調用 withoutOverlapping() 方法時,該方法還實現瞭另外兩個功能:一個是設置超時時間,默認為 24 小時;另一個是設置 Event 的回調。

⑴ 超時時間

首先說超時時間,這個超時時間並不是 Event 的超時時間,而是 Event 的屬性 mutex 的超時時間。在向 Illuminate\Console\Scheduling\Schedule 的屬性 $events 中註冊 Event 時,會調用 Schedule 中的 exec() 方法,在該方法中會新建 Event 對象,此時會向 Event 的構造方法中傳入一個 eventMutex ,這就是 Event 對象中的屬性 mutex ,超時時間就是為這個 mutex 設置的。而 Schedule 中的 eventMutex 則是通過實例化 CacheEventMutex 來創建的。

// namespace \Illuminate\Console\Scheduling\Schedule
$this->eventMutex = $container->bound(EventMutex::class)
                                ? $container->make(EventMutex::class)
                                : $container->make(CacheEventMutex::class);

設置瞭 withoutOverlapping 的 Event 在執行之前,首先會嘗試獲取 mutex 互斥鎖,如果無法成功獲取到鎖,那麼 Event 就不會執行。獲取互斥鎖的操作通過調用 mutex 的 create() 方法完成。
CacheEventMutex 在實例化時需要傳入一個 \Illuminate\Contracts\Cache\Factory 類型的實例,其最終傳入的是一個 \Illuminate\Cache\CacheManager 實例。在調用 create() 方法獲取互斥鎖時,還需要通過調用 store() 方法設置存儲引擎。

// namespace \Illuminate\Foundation\Console\Kernel
protected function defineConsoleSchedule()
{
    $this->app->singleton(Schedule::class, function ($app) {
        return tap(new Schedule($this->scheduleTimezone()), function ($schedule) {
            $this->schedule($schedule->useCache($this->scheduleCache()));
        });
    });
}

protected function scheduleCache()
{
    return Env::get('SCHEDULE_CACHE_DRIVER');
}

// namespace \Illuminate\Console\Scheduling\Schedule
public function useCache($store)
{
    if ($this->eventMutex instanceof CacheEventMutex) {
        $this->eventMutex->useStore($store);
    }

    /* ... ... */
    return $this;
}

// namespace \Illuminate\Console\Scheduling\CacheEventMutex
public function create(Event $event)
{
    return $this->cache->store($this->store)->add(
        $event->mutexName(), true, $event->expiresAt * 60
    );
}

// namespace \Illuminate\Cache\CacheManager
public function store($name = null)
{
    $name = $name ?: $this->getDefaultDriver();

    return $this->stores[$name] = $this->get($name);
}

public function getDefaultDriver()
{
    return $this->app['config']['cache.default'];
}

protected function get($name)
{
    return $this->stores[$name] ?? $this->resolve($name);
}

protected function resolve($name)
{
    $config = $this->getConfig($name);

    if (is_null($config)) {
        throw new InvalidArgumentException("Cache store [{$name}] is not defined.");
    }

    if (isset($this->customCreators[$config['driver']])) {
        return $this->callCustomCreator($config);
    } else {
        $driverMethod = 'create'.ucfirst($config['driver']).'Driver';

        if (method_exists($this, $driverMethod)) {
            return $this->{$driverMethod}($config);
        } else {
            throw new InvalidArgumentException("Driver [{$config['driver']}] is not supported.");
        }
    }
}

protected function getConfig($name)
{
    return $this->app['config']["cache.stores.{$name}"];
}

protected function createFileDriver(array $config)
{
    return $this->repository(new FileStore($this->app['files'], $config['path'], $config['permission'] ?? null));
}

在初始化 Schedule 時會指定 eventMutex 的存儲引擎,默認為環境變量中的配置項 SCHEDULE_CACHE_DRIVER 的值。但通常這一項配置在環境變量中並不存在,所以 useCache() 的參數值為空,進而 eventMutex 的 store 屬性值也為空。這樣,在 eventMutex 的 create() 方法中調用 store() 方法為其設置存儲引擎時,store() 方法的參數值也為空。

當 store() 方法的傳參為空時,會使用應用的默認存儲引擎(如果不做任何修改,默認 cache 的存儲引擎為 file)。之後會取得默認存儲引擎的配置信息(引擎、存儲路徑、連接信息等),然後實例化存儲引擎。最終,file 存儲引擎實例化的是 \Illuminate\Cache\FileStore 。

在設置完存儲引擎之後,緊接著會調用 add() 方法獲取互斥鎖。由於 store() 方法返回的是 \Illuminate\Contracts\Cache\Repository 類型的實例,所以最終調用的是 Illuminate\Cache\Repository 中的 add() 方法。

// namespace \Illuminate\Cache\Repository
public function add($key, $value, $ttl = null)
{
    if ($ttl !== null) {
        if ($this->getSeconds($ttl) <= 0) {
            return false;
        }

        if (method_exists($this->store, 'add')) {
            $seconds = $this->getSeconds($ttl);

            return $this->store->add(
                $this->itemKey($key), $value, $seconds
            );
        }
    }

    if (is_null($this->get($key))) {
        return $this->put($key, $value, $ttl);
    }

    return false;
}

public function get($key, $default = null)
{
    if (is_array($key)) {
        return $this->many($key);
    }

    $value = $this->store->get($this->itemKey($key));

    if (is_null($value)) {
        $this->event(new CacheMissed($key));

        $value = value($default);
    } else {
        $this->event(new CacheHit($key, $value));
    }

    return $value;
}

// namespace \Illuminate\Cache\FileStore
public function get($key)
{
    return $this->getPayload($key)['data'] ?? null;
}

protected function getPayload($key)
{
    $path = $this->path($key);

    try {
        $expire = substr(
            $contents = $this->files->get($path, true), 0, 10
        );
    } catch (Exception $e) {
        return $this->emptyPayload();
    }

    if ($this->currentTime() >= $expire) {
        $this->forget($key);

        return $this->emptyPayload();
    }

    try {
        $data = unserialize(substr($contents, 10));
    } catch (Exception $e) {
        $this->forget($key);

        return $this->emptyPayload();
    }

    $time = $expire - $this->currentTime();

    return compact('data', 'time');
}

這裡需要說明,所謂互斥鎖,其本質是寫文件。如果文件不存在或文件內容為空或文件中存儲的過期時間小於當前時間,則互斥鎖可以順利獲得;否則無法獲取到互斥鎖。文件內容為固定格式:timestampb:1 。

所謂超時時間,與此處的 timestamp 的值有密切的聯系。獲取互斥鎖時的時間戳,再加上超時時間的秒數,即是此處的 timestamp 的值。

由於 FileStore 中不存在 add() 方法,所以程序會直接嘗試調用 get() 方法獲取文件中的內容。如果 get() 返回的結果為 NULL,說明獲取互斥鎖成功,之後會調用 FileStore 的 put() 方法寫文件;否則,說明當前有相同的 Event 在運行,不會再運行新的 Event 。
在調用 put() 方法寫文件時,首先需要根據傳參計算 eventMutex 的超時時間的秒數,之後再調用 FileStore 中的 put() 方法,將數據寫入文件中。

// namespace \Illuminate\Cache\Repository
public function put($key, $value, $ttl = null)
{
    /* ... ... */

    $seconds = $this->getSeconds($ttl);

    if ($seconds <= 0) {
        return $this->forget($key);
    }

    $result = $this->store->put($this->itemKey($key), $value, $seconds);

    if ($result) {
        $this->event(new KeyWritten($key, $value, $seconds));
    }

    return $result;
}

// namespace \Illuminate\Cache\FileStore
public function put($key, $value, $seconds)
{
    $this->ensureCacheDirectoryExists($path = $this->path($key));

    $result = $this->files->put(
        $path, $this->expiration($seconds).serialize($value), true
    );

    if ($result !== false && $result > 0) {
        $this->ensureFileHasCorrectPermissions($path);

        return true;
    }

    return false;
}

protected function path($key)
{
    $parts = array_slice(str_split($hash = sha1($key), 2), 0, 2);

    return $this->directory.'/'.implode('/', $parts).'/'.$hash;
}

// namespace \Illuminate\Console\Scheduling\Schedule
public function mutexName()
{
    return 'framework'.DIRECTORY_SEPARATOR.'schedule-'.sha1($this->expression.$this->command);
}

這裡需要重點說明的是 $key 的生成方法以及文件路徑的生成方法。$key 通過調用 Event 的 mutexName() 方法生成,其中需要用到 Event 的 $expression 和 $command 屬性。其中 $command 為我們定義的命令行,在調用 $schedule->comand() 方法時傳入,然後進行格式化,$expression 則為 Event 的運行周期。

以命令行 schedule:test 為例,格式化之後的命令行為  `/usr/local/php/bin/php` `artisan` schedule:test,如果該命令行設置的運行周期為每分鐘一次,即 * * * * * ,則最終計算得到的 $key 的值為 framework/schedule-768a42da74f005b3ac29ca0a88eb72d0ca2b84be 。文件路徑則是將 $key 的值再次進行 sha1 計算之後,以兩個字符為一組切分成數組,然後取數組的前兩項組成一個二級目錄,而配置文件中 file 引擎的默認存儲路徑為 storage/framework/cache/data ,所以最終的文件路徑為 storage/framework/cache/data/eb/60/eb608bf555895f742e5bd57e186cbd97f9a6f432 。而文件中存儲的內容則為 1642122685b:1 。

⑵ 回調方法

再來說設置的 Event 回調,調用 withoutOverlapping() 方法會為 Event 設置兩個回調:一個是 Event 運行完成之後的回調,用於釋放互斥鎖,即清理緩存文件;另一個是在運行 Event 之前判斷互斥鎖是否被占用,即緩存文件是否已經存在。

無論 Event 是以 Foreground 的方式運行,還是以 Background 的方式運行,在運行完成之後都會調用 callAfterCallbacks() 方法執行 afterCallbacks 中的回調,其中就有一項回調用於釋放互斥鎖,刪除緩存文件 $this->mutex->forget($this) 。區別就在於,以 Foreground 方式運行的 Event 是在運行完成之後顯式的調用這些回調方法,而以 Background 方式運行的 Event 則需要借助 schedule:finish 來調用這些回調方法。
所有在 \App\Console\Kernel 中註冊 Event,都是通過命令行 schedule:run 來調度的。在調度之前,首先會判斷當前時間點是否滿足各個 Event 所配置的運行周期的要求。如果滿足的話,接下來就是一些過濾條件的判斷,這其中就包括判斷互斥鎖是否被占用。隻有在互斥鎖沒有被占用的情況下,Event 才可以運行。

// namespace \Illuminate\Console\Scheduling\ScheduleRunCommand
public function handle(Schedule $schedule, Dispatcher $dispatcher)
{
    $this->schedule = $schedule;
    $this->dispatcher = $dispatcher;

    foreach ($this->schedule->dueEvents($this->laravel) as $event) {
        if (! $event->filtersPass($this->laravel)) {
            $this->dispatcher->dispatch(new ScheduledTaskSkipped($event));

            continue;
        }

        if ($event->onOneServer) {
            $this->runSingleServerEvent($event);
        } else {
            $this->runEvent($event);
        }

        $this->eventsRan = true;
    }

    if (! $this->eventsRan) {
        $this->info('No scheduled commands are ready to run.');
    }
}

// namespace \Illuminate\Console\Scheduling\Schedule
public function dueEvents($app)
{
    return collect($this->events)->filter->isDue($app);
}

// namespace \Illuminate\Console\Scheduling\Event
public function isDue($app)
{
    /* ... ... */
    return $this->expressionPasses() &&
           $this->runsInEnvironment($app->environment());
}

protected function expressionPasses()
{
    $date = Carbon::now();
    /* ... ... */
    return CronExpression::factory($this->expression)->isDue($date->toDateTimeString());
}

// namespace \Cron\CronExpression
public function isDue($currentTime = 'now', $timeZone = null)
{
   /* ... ... */
   
    try {
        return $this->getNextRunDate($currentTime, 0, true)->getTimestamp() === $currentTime->getTimestamp();
    } catch (Exception $e) {
        return false;
    }
}

public function getNextRunDate($currentTime = 'now', $nth = 0, $allowCurrentDate = false, $timeZone = null)
{
    return $this->getRunDate($currentTime, $nth, false, $allowCurrentDate, $timeZone);
}

有時候,我們可能需要 kill 掉一些在後臺運行的命令行,但緊接著我們會發現這些被 kill 掉的命令行在一段時間內無法按照設置的運行周期自動調度,其原因就在於手動 kill 掉的命令行沒有調用 schedule:finish 清理緩存文件,釋放互斥鎖。這就導致在設置的過期時間到達之前,互斥鎖會一直被占用,新的 Event 不會再次運行。

到此這篇關於Laravel中schedule調度的運行機制的文章就介紹到這瞭,更多相關Laravel schedule調度內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: