Python中經常使用的代碼片段
針對工作生活中基礎的功能和操作,梳理瞭下對應的幾個Python代碼片段,供參考:
日期生成
獲取過去 N 天的日期
import datetime def get_nday_list(n): before_n_days = [] # [::-1]控制日期排序 for i in range(1, n + 1)[::-1]: before_n_days.append(str(datetime.date.today() - datetime.timedelta(days=i))) return before_n_days a = get_nday_list(30) print(a)
輸出:
['2021-12-26', '2021-12-27', '2021-12-28', '2021-12-29', '2021-12-30', '2021-12-31', '2022-01-01', '2022-01-02', '2022-01-03', '2022-01-04', '2022-01-05', '2022-01-06', '2022-01-07', '2022-01-08', '2022-01-09', '2022-01-10', '2022-01-11', '2022-01-12', '2022-01-13', '2022-01-14', '2022-01-15', '2022-01-16', '2022-01-17', '2022-01-18', '2022-01-19', '2022-01-20', '2022-01-21', '2022-01-22', '2022-01-23', '2022-01-24']
生成一段時間區間內的日期
import datetime def create_assist_date(datestart = None,dateend = None): # 創建日期輔助表 if datestart is None: datestart = '2016-01-01' if dateend is None: dateend = datetime.datetime.now().strftime('%Y-%m-%d') # 轉為日期格式 datestart=datetime.datetime.strptime(datestart,'%Y-%m-%d') dateend=datetime.datetime.strptime(dateend,'%Y-%m-%d') date_list = [] date_list.append(datestart.strftime('%Y-%m-%d')) while datestart<dateend: # 日期疊加一天 datestart+=datetime.timedelta(days=+1) # 日期轉字符串存入列表 date_list.append(datestart.strftime('%Y-%m-%d')) return date_list d_list = create_assist_date(datestart='2021-12-27', dateend='2021-12-30') print(d_list)
輸出:
['2021-12-27', '2021-12-28', '2021-12-29', '2021-12-30']
保存數據到CSV
保存數據到 CSV 算是比較常見的操作瞭,下面代碼如果運行正確會生成"2022_data_2022-01-25.csv"文件。
import os def save_data(data, date): """ :param data: :param date: :return: """ if not os.path.exists(r'2022_data_%s.csv' % date): with open("2022_data_%s.csv" % date, "a+", encoding='utf-8') as f: f.write("標題,熱度,時間,url\n") for i in data: title = i["title"] extra = i["extra"] time = i['time'] url = i["url"] row = '{},{},{},{}'.format(title,extra,time,url) f.write(row) f.write('\n') else: with open("2022_data_%s.csv" % date, "a+", encoding='utf-8') as f: for i in data: title = i["title"] extra = i["extra"] time = i['time'] url = i["url"] row = '{},{},{},{}'.format(title,extra,time,url) f.write(row) f.write('\n') data = [{"title": "demo", "extra": "hello", "time": "1998-01-01", "url": "https://www.baidu.com/"}] date = "2022-01-25" save_data(data, date)
requests 庫調用
據統計,requests 庫是 Python 傢族裡被引用的最多的第三方庫,足見其江湖地位之高大!
發送 GET 請求
import requests headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36', 'cookie': 'some_cookie' } response = requests.request("GET", url, headers=headers)
發送 POST 請求
import requests payload={} files=[] headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36', 'cookie': 'some_cookie' } response = requests.request("POST", url, headers=headers, data=payload, files=files)
Python 操作各種數據庫
操作 Redis
連接 Redis
import redis def redis_conn_pool(): pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) rd = redis.Redis(connection_pool=pool) return rd
寫入 Redis
from redis_conn import redis_conn_pool rd = redis_conn_pool() rd.set('test_data', 'mytest')
操作 MongoDB
連接 MongoDB
from pymongo import MongoClient conn = MongoClient("mongodb://%s:%s@ipaddress:49974/mydb" % ('username', 'password')) db = conn.mydb mongo_collection = db.mydata
批量插入數據
res = requests.get(url, params=query).json() commentList = res['data']['commentList'] mongo_collection.insert_many(commentList)
操作 MySQL
連接 MySQL
import MySQLdb # 打開數據庫連接 db = MySQLdb.connect("localhost", "testuser", "test123", "TESTDB", charset='utf8' ) # 使用cursor()方法獲取操作遊標 cursor = db.cursor()
執行 SQL 語句
# 使用 execute 方法執行 SQL 語句 cursor.execute("SELECT VERSION()") # 使用 fetchone() 方法獲取一條數據 data = cursor.fetchone() print "Database version : %s " % data # 關閉數據庫連接 db.close()
本地文件整理
整理文件涉及需求的比較多,這裡分享的是將本地多個 CSV 文件整合成一個文件
import pandas as pd import os df_list = [] for i in os.listdir(): if "csv" in i: day = i.split('.')[0].split('_')[-1] df = pd.read_csv(i) df['day'] = day df_list.append(df) df = pd.concat(df_list, axis=0) df.to_csv("total.txt", index=0)
多線程代碼
多線程也有很多實現方式,我們選擇自己最為熟悉順手的方式即可
import threading import time exitFlag = 0 class myThread (threading.Thread): def __init__(self, threadID, name, delay): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.delay = delay def run(self): print ("開始線程:" + self.name) print_time(self.name, self.delay, 5) print ("退出線程:" + self.name) def print_time(threadName, delay, counter): while counter: if exitFlag: threadName.exit() time.sleep(delay) print ("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1 # 創建新線程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # 開啟新線程 thread1.start() thread2.start() thread1.join() thread2.join() print ("退出主線程")
異步編程代碼
異步爬取網站代碼示例:
import asyncio import aiohttp import aiofiles async def get_html(session, url): try: async with session.get(url=url, timeout=8) as resp: if not resp.status // 100 == 2: print(resp.status) print("爬取", url, "出現錯誤") else: resp.encoding = 'utf-8' text = await resp.text() return text except Exception as e: print("出現錯誤", e) await get_html(session, url)
使用異步請求之後,對應的文件保存也需要使用異步,即是一處異步,處處異步
async def download(title_list, content_list): async with aiofiles.open('{}.txt'.format(title_list[0]), 'a', encoding='utf-8') as f: await f.write('{}'.format(str(content_list)))
總結
到此這篇關於Python中經常使用的代碼片段的文章就介紹到這瞭,更多相關Python代碼片段內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Python 時間操作datetime詳情
- python 多線程爬取壁紙網站的示例
- Python爬取股票交易數據並可視化展示
- 熱門問題python爬蟲的效率如何提高
- python 制作網站篩選工具(附源碼)