Python 實時獲取任務請求對應的Nginx日志的方法

需求描述

項目需求測試過程中,需要向Nginx服務器發送一些用例請求,然後查看對應的Nginx日志,判斷是否存在特征內容,來判斷任務是否執行成功。為瞭提升效率,需要將這一過程實現自動化。

實踐環境

Python 3.6.5

代碼設計與實現

#!/usr/bin/env python
# -*- coding:utf-8 -*-


'''
@CreateTime: 2021/06/26 9:05
@Author : shouke
'''


import time
import threading
import subprocess
from collections import deque


def collect_nginx_log():
    global nginx_log_queue
    global is_tasks_compete
    global task_status


    args = 'tail -0f /usr/local/openresty/nginx/logs/access.log'
    while task_status != 'req_log_got':
        with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc:
            log_for_req = ''
            outs, errs = '', ''

            try:
                outs, errs = proc.communicate(timeout=2)
            except subprocess.TimeoutExpired:
                print('獲取nginx日志超時,正在重試')
                proc.kill()
                try:
                    outs, errs = proc.communicate(timeout=5)
                except subprocess.TimeoutExpired:
                    print('獲取nginx日志超時,再次超時,停止重試')
                    break
            finally:
                for line in outs.split('\n'):
                    flag = '\"client_ip\":\"10.118.0.77\"' # 特征
                    if flag in line: # 查找包含特征內容的日志
                        log_for_req += line

                if task_status == 'req_finished':
                    nginx_log_queue.append(log_for_req)
                    task_status = 'req_log_got'



def run_tasks(task_list):
    '''
    運行任務
    :param task_list 任務列表
    '''

    global nginx_log_queue
    global is_tasks_compete
    global task_status


    for task in task_list:
        thread = threading.Thread(target=collect_nginx_log,
                                    name="collect_nginx_log")
        thread.start()
        time.sleep(1) # 執行任務前,讓收集日志線程先做好準備

        print('正在執行任務:%s' % task.get('name'))

        # 執行Nginx任務請求
        # ...

        task_status = 'req_finished'
        time_to_wait = 0.1
        while task_status != 'req_log_got': # 請求觸發的nginx日志收集未完成
            time.sleep(time_to_wait)
            time_to_wait += 0.01
        else:# 獲取到用例請求觸發的nginx日志
            if nginx_log_queue:
                nginx_log = nginx_log_queue.popleft()
                task_status = 'req_ready'
                # 解析日志
                # do something here
                # ...
            else:
                print('存儲請求日志的隊列為空')
                # do something here
                # ...


if __name__ == '__main__':
    nginx_log_queue = deque()
    is_tasks_compete = False # 所有任務是否執行完成

    task_status = 'req_ready' # req_ready,req_finished,req_log_got  # 存放執行次任務任務的一些狀態
    print('###########################任務開始###########################')

    tast_list = [{'name':'test_task', 'other':'...'}]
    run_tasks(tast_list)

    is_tasks_compete = True

    current_active_thread_num = len(threading.enumerate())
    while current_active_thread_num != 1:
        time.sleep(2)
        current_active_thread_num = len(threading.enumerate())
    print('###########################任務完成###########################')

註意:

1、上述代碼為啥不一步到位,直接 tail -0f /usr/local/openresty/nginx/logs/access.log | grep "特征內容"呢?這是因為這樣做無法獲取到Nginx的日志

2、實踐時發現,第一次執行proc.communicate(timeout=2)獲取日志時,總是無法獲取,會超時,需要二次獲取,並且timeout設置太小時(實踐時嘗試過設置為1秒),也會導致第二次執行時無法獲取Nginx日志。

到此這篇關於Python 實時獲取任務請求對應的Nginx日志的文章就介紹到這瞭,更多相關Python獲取Nginx日志內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: