Python 多線程處理任務實例

美餐每天發一個用Excel匯總的就餐數據,我們把它導入到數據庫後,行政辦公服務用它和公司內的就餐數據進行比對查重。

初始實現是單線程,和import_records去掉多線程後的部分差不多。

讀取Excel數據 —> 發送到行政服務接口

安全起見線上操作放在瞭晚上進行。運行時發現每條數據導入消耗1s多,晚上十點開始跑這幾千條數據想想都讓人崩潰。

等著也是幹等,下樓轉兩圈透透氣,屋裡齷齪的空氣讓人昏昏沉沉,寒冷讓人清醒不少,突然想到為什麼不用多線程呢?

第一版多線程和處理業務的程序糅合在瞭一起,跟屎一樣難讀。後面兩天又抽瞭點時間重構瞭幾個版本,分離出來一個線程池、迭代器和import_records

清晰不少,但是迭代器被暴露瞭出來,需要import_records調用一下判斷當前任務是否給當前線程處理,類似協程的思路。

暴露有好有壞,但已基本滿足日常使用,可以往一邊先放放瞭。讀讀書、看看電影,不亦樂乎 :)。

import threading

def task_pool(thread_num, task_fn):

  if thread_num <= 0 :
      raise ValueError

  threads = []

  def gen_thread_checker(thread_id, step):

      base = 1
      i = 0

      def thread_checker():
          nonlocal i

          i += 1
          # print((thread_id,i,step, i < base or (i - base) % step != thread_id))

          if i < base or (i - base) % step != thread_id:
              return False

          return True

      return thread_checker


  for x in range(0, thread_num):
    threads.append(threading.Thread(target=task_fn, args=(x,thread_num, gen_thread_checker(x, thread_num))))

  # 啟動所有線程
  for t in threads:
    t.start()
  # 主線程中等待所有子線程退出
  for t in threads:
    t.join()

import argparse
import re

import requests
from openpyxl import load_workbook
from requests import RequestException

import myThread

parser = argparse.ArgumentParser(description='美餐到店交易數據導入')
parser.add_argument('--filename', '-f', help='美餐到店交易數據 .xlsx 文件路徑', required=True)
parser.add_argument('--thread_num', '-t', help='線程數量', default= 100, required=False)
parser.add_argument('--debug', '-d', help='調試模式', default= 0, required=False)
args = parser.parse_args()

filename = args.filename
thread_num = int(args.thread_num)
debug = args.debug

if debug:
    print((filename,thread_num,debug))


def add_meican_meal_record(data):
   pass

def import_records(thread_id, thread_number, thread_checker):
    wb = load_workbook(filename=filename)
    ws = wb.active

    for row in ws:
        #------------------------------------------
        if row[0].value is None:
            break

        if not thread_checker():
            continue
        #------------------------------------------

        if row[0].value == '日期' or row[0].value == '總計' or not re.findall('^\d{4}-\d{1,2}-\d{1,2}$', row[0].value):
            continue
        else:

            date = str.replace(row[0].value,'-', '')

            order_id = row[3].value
            restaurant_name = row[5].value
            meal_plan_name = row[6].value
            meal_staffid = row[10].value
            identify = row[11].value
    
            add_meican_meal_record({
                'orderId':order_id,
                'date': date,
                'meal_plan_name':meal_plan_name,
                'meal_staffid':meal_staffid,
                'identify':identify,
                'restaurant_name':restaurant_name
            })

myThread.task_pool(thread_num,import_records)

到此這篇關於Python 多線程處理任務實例的文章就介紹到這瞭,更多相關Python 多線程處理任務內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: