詳解nginx進程鎖的實現

一、 nginx進程鎖的作用

nginx是多進程並發模型應用,直白點就是:有多個worker都在監聽網絡請求,誰接收某個請求,那麼後續的事務就由它來完成。如果沒有鎖的存在,那麼就是這種場景,當一個請求被系統接入後,所以可以監聽該端口的進程,就會同時去處理該事務。當然瞭,系統會避免這種糟糕事情的發生,但也就出現瞭所謂的驚群。(不知道說得對不對,大概是那麼個意思吧)

所以,為瞭避免出現同一時刻,有許多進程監聽,就應該該多個worker間有序地監聽socket. 為瞭讓多個worker有序,所以就有瞭本文要講的進程鎖的出現瞭,隻有搶到鎖的進程才可以進行網絡請求的接入操作。

即如下過程:

// worker 核心事務框架
// ngx_event.c
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;

    if (ngx_timer_resolution) {
        timer = NGX_TIMER_INFINITE;
        flags = 0;

    } else {
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME;

#if (NGX_WIN32)

        /* handle signals from master in case of network inactivity */

        if (timer == NGX_TIMER_INFINITE || timer > 500) {
            timer = 500;
        }

#endif
    }

    if (ngx_use_accept_mutex) {
        // 為瞭一定的公平性,避免反復爭搶鎖
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            // 隻有搶到鎖的進程,進行 socket 的 accept() 操作
            // 其他worker則處理之前接入的請求,read/write操作
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }

            if (ngx_accept_mutex_held) {
                flags |= NGX_POST_EVENTS;

            } else {
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }
    // 其他核心事務處理
    if (!ngx_queue_empty(&ngx_posted_next_events)) {
        ngx_event_move_posted_next(cycle);
        timer = 0;
    }

    delta = ngx_current_msec;

    (void) ngx_process_events(cycle, timer, flags);

    delta = ngx_current_msec - delta;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);

    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    if (delta) {
        ngx_event_expire_timers();
    }

    ngx_event_process_posted(cycle, &ngx_posted_events);
}
// 獲取鎖,並註冊socket accept() 過程如下
ngx_int_t
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
    if (ngx_shmtx_trylock(&ngx_accept_mutex)) {

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "accept mutex locked");

        if (ngx_accept_mutex_held && ngx_accept_events == 0) {
            return NGX_OK;
        }

        if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
            // 解鎖操作
            ngx_shmtx_unlock(&ngx_accept_mutex);
            return NGX_ERROR;
        }

        ngx_accept_events = 0;
        ngx_accept_mutex_held = 1;

        return NGX_OK;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "accept mutex lock failed: %ui", ngx_accept_mutex_held);

    if (ngx_accept_mutex_held) {
        if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }

        ngx_accept_mutex_held = 0;
    }

    return NGX_OK;
}

其他的不必多說,核心即搶到鎖的worker,才可以進行accept操作。而沒有搶到鎖的worker, 則要主動釋放之前的accept()權力。從而達到,同一時刻,隻有一個worker在處理accept事件。

二、入門級鎖使用

鎖這種東西,一般都是編程語言自己定義好的接口,或者固定用法。

比如 java 中的 synchronized xxx, Lock 相關並發包鎖如 CountDownLatch, CyclicBarrier, ReentrantLock, ReentrantReadWriteLock, Semaphore…

比如 python 中的 threading.Lock(), threading.RLock()…

比如 php 中的 flock()…

之所以說是入門級,是因為這都是些接口api, 你隻要按照使用規范,調一下就可以瞭,無需更多知識。但要想用好各細節,則實際不簡單。

三、nginx進程鎖的實現

nginx因為是使用C語言編寫的,所以肯定是更接近底層些的。能夠通過它的實現,來看鎖如何實現,應該能夠讓我們更能理解鎖的深層次含義。

一般地,鎖包含這麼幾個大方向:鎖數據結構定義,上鎖邏輯,解鎖邏輯,以及一些通知機制,超時機制什麼的。下面我們就其中幾個方向,看下nginx 實現:

3.1、鎖的數據結構

首先要定義出鎖有些什麼變量,然後實例化一個值,共享給多進程使用。

// event/ngx_event.c
// 全局accept鎖變量定義
ngx_shmtx_t           ngx_accept_mutex;
// 這個鎖有一個
// atomic 使用 volatile 修飾實現
typedef volatile ngx_atomic_uint_t  ngx_atomic_t;
typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)
    // 有使用原子更新變量實現鎖,其背後是共享內存區域
    ngx_atomic_t  *lock;
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_t  *wait;
    ngx_uint_t     semaphore;
    sem_t          sem;
#endif
#else
    // 有使用fd實現鎖,fd的背後是一個文件實例
    ngx_fd_t       fd;
    u_char        *name;
#endif
    ngx_uint_t     spin;
} ngx_shmtx_t;
// 共享內存數據結構定義
typedef struct {
    u_char      *addr;
    size_t       size;
    ngx_str_t    name;
    ngx_log_t   *log;
    ngx_uint_t   exists;   /* unsigned  exists:1;  */
} ngx_shm_t;

3.2、基於fd的上鎖/解鎖實現

有瞭鎖實例,就可以對其進行上鎖解鎖瞭。nginx有兩種鎖實現,主要是基於平臺的差異性決定的:基於文件或者基於共享內在實現。基於fd即基於文件的實現,這個還是有點重的操作。如下:

// ngx_shmtx.c
ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
    ngx_err_t  err;

    err = ngx_trylock_fd(mtx->fd);

    if (err == 0) {
        return 1;
    }

    if (err == NGX_EAGAIN) {
        return 0;
    }

#if __osf__ /* Tru64 UNIX */

    if (err == NGX_EACCES) {
        return 0;
    }

#endif

    ngx_log_abort(err, ngx_trylock_fd_n " %s failed", mtx->name);

    return 0;
}
// core/ngx_shmtx.c
// 1. 上鎖過程
ngx_err_t
ngx_trylock_fd(ngx_fd_t fd)
{
    struct flock  fl;

    ngx_memzero(&fl, sizeof(struct flock));
    fl.l_type = F_WRLCK;
    fl.l_whence = SEEK_SET;

    if (fcntl(fd, F_SETLK, &fl) == -1) {
        return ngx_errno;
    }

    return 0;
}
// os/unix/ngx_file.c
ngx_err_t
ngx_lock_fd(ngx_fd_t fd)
{
    struct flock  fl;

    ngx_memzero(&fl, sizeof(struct flock));
    fl.l_type = F_WRLCK;
    fl.l_whence = SEEK_SET;
    // 調用系統提供的上鎖方法
    if (fcntl(fd, F_SETLKW, &fl) == -1) {
        return ngx_errno;
    }

    return 0;
}

// 2. 解鎖實現
// core/ngx_shmtx.c
void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
    ngx_err_t  err;

    err = ngx_unlock_fd(mtx->fd);

    if (err == 0) {
        return;
    }

    ngx_log_abort(err, ngx_unlock_fd_n " %s failed", mtx->name);
}
// os/unix/ngx_file.c
ngx_err_t
ngx_unlock_fd(ngx_fd_t fd)
{
    struct flock  fl;

    ngx_memzero(&fl, sizeof(struct flock));
    fl.l_type = F_UNLCK;
    fl.l_whence = SEEK_SET;

    if (fcntl(fd, F_SETLK, &fl) == -1) {
        return  ngx_errno;
    }

    return 0;
}

重點就是 fcntl() 這個系統api的調用,無他。當然,站在一個旁觀者角度來看,實際就是因為多進程對文件的操作是可見的,所以達到進程鎖的目的。其中,tryLock 和 lock 存在一定的語義差異,即try時,會得到一些是否成功的標識,而直接進行lock時,則不能得到標識。一般會要求阻塞住請求

3.3、nginx鎖實例的初始化

也許在有些地方,一個鎖實例的初始化,就是一個變量的簡單賦值而已。但在nginx有些不同。首先,需要保證各worker能看到相同的實例或者相當的實例。因為worker是從master處fork()出來的進程,所以隻要在master中實例化好的鎖,必然可以保證各worker能拿到一樣的值。那麼,到底是不是隻是這樣呢?

// 共享鎖的初始化,在ngx master 中進行,後fork()到worker進程
// event/ngx_event.c
static ngx_int_t
ngx_event_module_init(ngx_cycle_t *cycle)
{
    void              ***cf;
    u_char              *shared;
    size_t               size, cl;
    // 定義一段共享內存
    ngx_shm_t            shm;
    ngx_time_t          *tp;
    ngx_core_conf_t     *ccf;
    ngx_event_conf_t    *ecf;

    cf = ngx_get_conf(cycle->conf_ctx, ngx_events_module);
    ecf = (*cf)[ngx_event_core_module.ctx_index];

    if (!ngx_test_config && ngx_process <= NGX_PROCESS_MASTER) {
        ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
                      "using the \"%s\" event method", ecf->name);
    }

    ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

    ngx_timer_resolution = ccf->timer_resolution;

#if !(NGX_WIN32)
    {
    ngx_int_t      limit;
    struct rlimit  rlmt;

    if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "getrlimit(RLIMIT_NOFILE) failed, ignored");

    } else {
        if (ecf->connections > (ngx_uint_t) rlmt.rlim_cur
            && (ccf->rlimit_nofile == NGX_CONF_UNSET
                || ecf->connections > (ngx_uint_t) ccf->rlimit_nofile))
        {
            limit = (ccf->rlimit_nofile == NGX_CONF_UNSET) ?
                         (ngx_int_t) rlmt.rlim_cur : ccf->rlimit_nofile;

            ngx_log_error(NGX_LOG_WARN, cycle->log, 0,
                          "%ui worker_connections exceed "
                          "open file resource limit: %i",
                          ecf->connections, limit);
        }
    }
    }
#endif /* !(NGX_WIN32) */


    if (ccf->master == 0) {
        return NGX_OK;
    }

    if (ngx_accept_mutex_ptr) {
        return NGX_OK;
    }


    /* cl should be equal to or greater than cache line size */

    cl = 128;

    size = cl            /* ngx_accept_mutex */
           + cl          /* ngx_connection_counter */
           + cl;         /* ngx_temp_number */

#if (NGX_STAT_STUB)

    size += cl           /* ngx_stat_accepted */
           + cl          /* ngx_stat_handled */
           + cl          /* ngx_stat_requests */
           + cl          /* ngx_stat_active */
           + cl          /* ngx_stat_reading */
           + cl          /* ngx_stat_writing */
           + cl;         /* ngx_stat_waiting */

#endif

    shm.size = size;
    ngx_str_set(&shm.name, "nginx_shared_zone");
    shm.log = cycle->log;
    // 分配共享內存空間, 使用 mmap 實現
    if (ngx_shm_alloc(&shm) != NGX_OK) {
        return NGX_ERROR;
    }

    shared = shm.addr;

    ngx_accept_mutex_ptr = (ngx_atomic_t *) shared;
    ngx_accept_mutex.spin = (ngx_uint_t) -1;
    // 基於共享文件或者內存賦值進程鎖,從而實現多進程控制
    if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared,
                         cycle->lock_file.data)
        != NGX_OK)
    {
        return NGX_ERROR;
    }

    ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl);

    (void) ngx_atomic_cmp_set(ngx_connection_counter, 0, 1);

    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "counter: %p, %uA",
                   ngx_connection_counter, *ngx_connection_counter);

    ngx_temp_number = (ngx_atomic_t *) (shared + 2 * cl);

    tp = ngx_timeofday();

    ngx_random_number = (tp->msec << 16) + ngx_pid;

#if (NGX_STAT_STUB)

    ngx_stat_accepted = (ngx_atomic_t *) (shared + 3 * cl);
    ngx_stat_handled = (ngx_atomic_t *) (shared + 4 * cl);
    ngx_stat_requests = (ngx_atomic_t *) (shared + 5 * cl);
    ngx_stat_active = (ngx_atomic_t *) (shared + 6 * cl);
    ngx_stat_reading = (ngx_atomic_t *) (shared + 7 * cl);
    ngx_stat_writing = (ngx_atomic_t *) (shared + 8 * cl);
    ngx_stat_waiting = (ngx_atomic_t *) (shared + 9 * cl);

#endif

    return NGX_OK;
}
// core/ngx_shmtx.c
// 1. 基於文件進程共享空間, 使用 fd
ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    // 由master進程創建,所以是進程安全的操作,各worker直接使用即可
    if (mtx->name) {
        // 如果已經創建好瞭,則 fd 已被賦值,不能創建瞭,直接共享fd即可
        // fd 的背後是一個文件實例
        if (ngx_strcmp(name, mtx->name) == 0) {
            mtx->name = name;
            return NGX_OK;
        }

        ngx_shmtx_destroy(mtx);
    }
    // 使用文件創建的方式鎖共享
    mtx->fd = ngx_open_file(name, NGX_FILE_RDWR, NGX_FILE_CREATE_OR_OPEN,
                            NGX_FILE_DEFAULT_ACCESS);

    if (mtx->fd == NGX_INVALID_FILE) {
        ngx_log_error(NGX_LOG_EMERG, ngx_cycle->log, ngx_errno,
                      ngx_open_file_n " \"%s\" failed", name);
        return NGX_ERROR;
    }
    // 創建完成即可刪除,後續隻基於該fd實例做鎖操作
    if (ngx_delete_file(name) == NGX_FILE_ERROR) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                      ngx_delete_file_n " \"%s\" failed", name);
    }

    mtx->name = name;

    return NGX_OK;
}

// 2. 基於共享內存的共享鎖的創建
// ngx_shmtx.c
ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    mtx->lock = &addr->lock;

    if (mtx->spin == (ngx_uint_t) -1) {
        return NGX_OK;
    }

    mtx->spin = 2048;

#if (NGX_HAVE_POSIX_SEM)

    mtx->wait = &addr->wait;

    if (sem_init(&mtx->sem, 1, 0) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                      "sem_init() failed");
    } else {
        mtx->semaphore = 1;
    }

#endif

    return NGX_OK;
}
// os/unix/ngx_shmem.c
ngx_int_t
ngx_shm_alloc(ngx_shm_t *shm)
{
    shm->addr = (u_char *) mmap(NULL, shm->size,
                                PROT_READ|PROT_WRITE,
                                MAP_ANON|MAP_SHARED, -1, 0);

    if (shm->addr == MAP_FAILED) {
        ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno,
                      "mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size);
        return NGX_ERROR;
    }

    return NGX_OK;
}

基於fd的鎖實現,本質是基於其背後的文件系統的實現,因為文件系統是進程可見的,所以對於相同fd控制,就是對共同的鎖的控制瞭。

3.4、基於共享內存的上鎖/解鎖實現

所謂共享內存,實際就是一塊公共的內存區域,它超出瞭進程的范圍(受操作系統管理)。就是前面我們看到的mmap()的創建,就是一塊共享內存。

// ngx_shmtx.c
ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
    // 直接對共享內存區域的值進行改變
    // cas 改變成功即是上鎖成功。
    return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}

// shm版本的解鎖操作, cas 解析,帶通知
void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
    if (mtx->spin != (ngx_uint_t) -1) {
        ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock");
    }

    if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {
        ngx_shmtx_wakeup(mtx);
    }
}
// 通知等待進程
static void
ngx_shmtx_wakeup(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_uint_t  wait;

    if (!mtx->semaphore) {
        return;
    }

    for ( ;; ) {

        wait = *mtx->wait;

        if ((ngx_atomic_int_t) wait <= 0) {
            return;
        }

        if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) {
            break;
        }
    }

    ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
                   "shmtx wake %uA", wait);

    if (sem_post(&mtx->sem) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                      "sem_post() failed while wake shmtx");
    }

#endif
}

共享內存版本的鎖的實現,基本就是cas的對內存變量的設置。隻是這個面向的內存,是共享區域的內存。

四、 說到底鎖的含義是什麼

見過瞭許多的鎖,依然過不好這一關。

鎖到底是什麼呢?事實上,鎖就是一個標識位。當有人看到這個標識位後,就主動停止操作,或者進行等等,從而使其看起來起到瞭鎖的作用。這個標識位,可以設置在某個對象中,也可以為設置在某個全局值中,還可以借助於各種存在介質,比如文件,比如redis,比如zk 。 這都沒有差別。因為問題關鍵不在存放在哪裡,而在於如何安全地設置這個標識位。

要實現鎖,一般都需要要一個強有力的底層含義保證,比如cpu層面的cas操作,應用級別的隊列串行原子操作。。。
至於什麼,內存鎖,文件鎖,高級鎖,都是有各自的應用場景。而要選好各種鎖,則變成瞭評價高低地關鍵。此時此刻,你應該能判斷出來的!

以上就是詳解nginx進程鎖的實現的詳細內容,更多關於nginx 進程鎖的資料請關註WalkonNet其它相關文章!

推薦閱讀: