python3線程池ThreadPoolExecutor處理csv文件數據
背景
由於不同乙方對服務商業務接口字段理解不一致,導致線上上千萬數據量數據存在問題,為瞭修復數據,通過 Python 腳本進行修改
知識點
Python3、線程池、pymysql、CSV 文件操作、requests
拓展
當我們程序在使用到線程、進程或協程的時候,以下三個知識點可以先做個基本認知
CPU 密集型、IO 密集型、GIL 全局解釋器鎖
庫
pip3 install requests
pip3 install pymysql
流程
實現代碼
# -*- coding:utf-8 -*- # @FileName:grade_update.py # @Desc :在一臺超級計算機上運行過的牛逼Python代碼 import time from concurrent.futures import ThreadPoolExecutor,FIRST_COMPLETED,wait import requests import pymysql from projectPath import path gradeId = [4303, 4304, 1000926, 1000927] def writ_mysql(): """ 數據庫連接 """ return pymysql.connect(host="localhost", port=3306, user="admin", password="admin", database="test" ) def oprationdb(grade_id, member_id): """ 操作數據庫 """ db = writ_mysql() try: cursor = db.cursor() sql = f"UPDATE `t_m_member_grade` SET `current_grade_id`={grade_id}, `modified` =now() WHERE `member_id`={member_id};" cursor.execute(sql) db.commit() print(f"提交的SQL->{sql}") except pymysql.Error as e: db.rollback() print("DB數據庫異常:", e) db.close() return True def interface(rows, thead): """ 調用第三方接口 """ print(f"處理數據行數--->{thead}----數據--->{rows}") try: url = "http://xxxx/api/xxx-data/Tmall/bindQuery" body = { "nickname": str(rows[0]), "seller_name": "test", "mobile": "111" } heade={"Content-Type": "application/x-www-form-urlencoded"} res = requests.post(url=url, data=body,headers=heade) result = res.json() if result["data"]["status"] in [1, 2]: grade = result["data"]["member"]["level"] grade_id = gradeId[grade] oprationdb(grade_id=grade_id, member_id=rows[1]) return True return True except Exception as e: print(f"調用異常:{e}") def read_csv(): import csv # db = writ_mysql() #線程數 MAX_WORKERS=5 with ThreadPoolExecutor(MAX_WORKERS) as pool: with open(path + '/file/result2_colu.csv', 'r', newline='', encoding='utf-8') as f: #set() 函數創建無序不重復元素集 seq_notdone = set() seq_done = set() # 使用csv的reader()方法,創建一個reader對象 reader = csv.reader(f) n = 0 for row in reader: n += 1 # 遍歷reader對象的每一行 try: seq_notdone.add(pool.submit(interface, rows=row, thead=n)) if len(seq_notdone) >= MAX_WORKERS: #FIRST_COMPLETED文檔說明 -- Return when any future finishes or is cancelled. done, seq_notdone = wait(seq_notdone,return_when=FIRST_COMPLETED) seq_done.update(done) except Exception as e: print(f"解析結果出錯:{e}") # db.close() return "完成" if __name__ == '__main__': read_csv()
解釋
引入線程池庫
from concurrent.futures import ThreadPoolExecutor,FIRST_COMPLETED,wait
pool.submit(interface, rows=row, thead=n)
提交任務,interface 調用的函數,rows、thead 為 interface() 函數的入參
任務持續提交,線程池通過 MAX_WORKERS 定義的線程數持續消費
說明像這種 I/O 密集型的操作腳本適合使用多線程,如果是 CPU 密集型建議使用進行,根據機器核數進行配置
以上就是python3線程池ThreadPoolExecutor處理csv文件數據的詳細內容,更多關於python3 ThreadPoolExecutor處理csv的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- python中ThreadPoolExecutor線程池和ProcessPoolExecutor進程池
- python使用期物處理並發教程
- Python線程池thread pool創建使用及實例代碼分享
- python concurrent.futures模塊的使用測試
- Python 線程池模塊之多線程操作代碼