python基礎之並發編程(一)

一、進程(Process)

是一個具有一定獨立功能的程序關於某個數據集合的一次運行活動

二、線程(Thread)

是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進 程中的實際運作單位。

三、並發編程解決方案:

1、多任務的實現有 3 種方式:

  • 多進程模式;
  • 多線程模式;
  • 多進程+多線程模式

四、多線程實現 (兩種)

1、第一種 函數方法

# 方法包裝-啟動多線程
from threading import Thread 
from time import sleep, time 
def func1(name): 
    print("Threading:{} start".format(name)) 
    sleep(3) 
    print("Threading:{} end".format(name)) 
if __name__ == '__main__': 
    # 開始時間 
    start = time() 
    # 創建線程列表 
    t_list = [] 
    # 循環創建線程 
    for i in range(10): 
        t = Thread(target=func1, args=('t{}'.format(i),)) 
        t.start() 
        t_list.append(t) 
    # 等待線程結束 
    for t in t_list: 
        t.join() 
    # 計算使用時間 
    end = time() - start 
    print(end)

2、第二種 類方法包裝

# 類包裝-啟動多線程 
from threading import Thread 
from time import sleep, time 
class MyThread(Thread): 
    def __init__(self,name): 
        Thread.__init__(self) 
        self.name =name 
    def run(self): 
        print("Threading:{} start".format(self.name)) 
        sleep(3) 
        print("Threading:{} end".format(self.name)) 
if __name__ == '__main__': 
    # 開始時間 
    start = time() 
    # 創建線程列表 
    t_list = [] 
    # 循環創建線程 
    for i in range(10): 
        t = MyThread('t{}'.format(i)) 
        t.start() 
        t_list.append(t) 
    # 等待線程結束 
    for t in t_list: 
        t.join() 
    # 計算使用時間 
    end = time() - start 
    print(end)

註意:

主線程不會等待子線程運行結束,如果需要等待可使用 join()方法不要啟動線程後立即 join(),很容易造成串行運行,導致並發失效

五、守護線程與子線程

1、線程在分法有:

主線程:程序的本身

子線程:在程序另開起的線程

2、守護線程

主要的特征是它的生命周期。主線程死亡,它也就隨之 死亡

# 類包裝-啟動多線程 
from threading import Thread 
from time import sleep, time 
class MyThread(Thread): 
    def __init__(self,name): 
        Thread.__init__(self) 
        self.name =name 
    def run(self): 
        print("Threading:{} start".format(self.name)) 
        sleep(3) 
        print("Threading:{} end".format(self.name)) 
if __name__ == '__main__': 
    # 開始時間 
    start = time() 
    # 循環創建線程 
    for i in range(10): 
        t = MyThread('t{}'.format(i)) 
        t.setDaemon(True)
        t.start() 
    # 計算使用時間 
    end = time() - start 
    print(end)

六、鎖

from threading import Thread 
def func1(name): 
    print('Threading:{} start'.format(name)) 
    global num 
    for i in range(50000000): # 有問題 
    #for i in range(5000): # 無問題 
        num += 1 
    print('Threading:{} end num={}'.format(name, num))
if __name__ == '__main__': 
    num =0 
    # 創建線程列表 
    t_list = [] 
    # 循環創建線程 
    for i in range(5): 
        t = Thread(target=func1, args=('t{}'.format(i),)) 
        t.start() 
        t_list.append(t) 
    # 等待線程結束 
    for t in t_list: 
        t.join()

Python 使用線程的時候,會定時釋放 GIL 鎖,這時會 sleep,所以才會出現上面的問題。 面對這個問題,如果要解決此問題,我們可以使用 Lock 鎖解決此問題( 加鎖的目的是:保證數據安全)

from threading import Thread,Lock 
def func1(name):
    # 獲取鎖
    lock.acquire()
    with lock:
        global count
        for i in range(100000):
            count += 1
    # 釋放鎖 
    lock.release()
if __name__ == "__main__":
    count = 0
    t_list = []
    # 創建鎖對象
    lock = Lock()
    for i in range(10):
        t = Thread(target=func1,args=(f't{i+1}',))
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print(count)

七、死鎖

from threading import Thread, Lock #Lock 鎖 同步鎖 互斥鎖
from time import sleep 
def fun1(): 
    lock1.acquire() 
    print('fun1 拿到鍵盤') 
    sleep(2) 
    lock2.acquire() 
    print('fun1 拿到鼠標') 
    lock2.release() 
    print('fun1 釋放鼠標') 
    lock1.release() 
    print('fun1 釋放鍵盤') 
def fun2(): 
    lock2.acquire() 
    print('fun2 拿到鼠標') 
    lock1.acquire() 
    print('fun2 拿到鍵盤') 
    lock1.release() 
    print('fun2 釋放鍵盤') 
    lock2.release() 
    print('fun2 釋放鼠標') 
if __name__ == '__main__':
    lock1 = Lock() 
    lock2 = Lock() 
    t1 = Thread(target=fun1) 
    t2 = Thread(target=fun2) 
    t1.start() 
    t2.start()
from threading import RLock
'''
Lock 鎖 同步鎖 互斥鎖
RLock 遞歸鎖
'''
def func1():
    lock.acquire()
    print('func1獲取鎖')
    func2()
    lock.release()
    print('func1釋放鎖')
def func2():
    lock.acquire()
    print('func2獲取鎖')
    lock.release()
    print('func2釋放鎖')
def func3():
    func1()
    func2()
if __name__ == "__main__":
    #lock = Lock()  會產生錯誤 
    lock = RLock()
    func3()

八、信號量(Semaphore)

我們都知道在加鎖的情況下,程序就變成瞭串行,也就是單線程,而有時,我們在不用考 慮數據安全時,為瞭避免業務開啟過多的線程時。我們就可以通過信號量(Semaphore)來 設置指定個數的線程。(比如:電梯每次隻能承載三個人,那麼同時隻能有三個人乘坐,其他人隻能等別人做完才能乘坐)

from time import sleep
from threading import Thread
from threading import BoundedSemaphore
def index(num):
    lock.acquire()
    print(f'第{num}個人乘坐!!')
    sleep(2)
    lock.release()
if __name__ == "__main__":
    lock = BoundedSemaphore(3)
    for i in range(10):
        t = Thread(target=index,args=(f'{i+1}',))
        t.start()

九、事件(Event)

Event()可以創建一個事件管理標志,該標志(event)默認為 False,event 對象主要有 四種方法可以調用:

1、 event.wait(timeout=None):調用該方法的線程會被阻塞,如果設置瞭 timeout 參數,超時後,線程會停止阻塞繼續執行;

2、event.set():將 event 的標志設置為 True,調用 wait 方法的所有線程將被喚 醒;

3、event.clear():將 event 的標志設置為 False,調用 wait 方法的所有線程將被 阻塞;

4、event.is_set():判斷 event 的標志是否為 True。

十、線程通信-隊列

線程安全是多線程編程時的計算機程序代碼中的一個概念。在擁有共享數據的多條線程並 行執行的程序中,線程安全的代碼會通過同步機制保證各個線程都可以正常且正確的執行,不 會出現數據污染等意外情況

1使用的隊列的好處:

1. 安全

2. 解耦

3. 提高效率

2Queue模塊中的常用方法:

Python的Queue模塊中提供瞭同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列PriorityQueue。這些隊列都實現瞭鎖原語,能夠在多線程中直接使用。可以使用隊列來實現線程間的同步

  • Queue.qsize() 返回隊列的大小
  • Queue.empty() 如果隊列為空,返回True,反之False
  • Queue.full() 如果隊列滿瞭,返回True,反之False
  • Queue.full maxsize 大小對應
  • Queue.get([block[, timeout]])獲取隊列,timeout等待時間
  • Queue.get_nowait() 相當Queue.get(False)
  • Queue.put(item) 寫入隊列,timeout等待時間
  • Queue.put_nowait(item) 相當Queue.put(item, False)
  • Queue.task_done() 在完成一項工作之後,Queue.task_done()函數向任務已經完成的隊列發送一個信號
  • Queue.join() 實際上意味著等到隊列為空,再執行別的操作

十一、生產者和消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者 彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費 者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裡取,阻塞隊列 就相當於一個緩沖區,平衡瞭生產者和消費者的處理能力。

from threading import Thread
from queue import Queue
from time import sleep
def producer():
    num = 1
    while True:
        print(f'生產瞭{num}號皮卡丘')
        qe.put(f'{num}號皮卡丘')
        num += 1 
        sleep(1)
def consumer():
    print('購買瞭{}'.format(qe.get()))
    sleep(2)
if __name__ == "__main__":
    # 共享數據的容器
    qe= Queue(maxsize=5)
    # 創建生產者線程
    t1 = Thread(target = producer)
    # 創建消費者線程
    t2 = Thread(target = consumer)
    # 創建消費者線程
    t3 = Thread(target = consumer)
    # 開始工作
    t1.start()
    t2.start()
    t3.start()

總結

本篇文章就到這裡瞭,希望能夠給你帶來幫助,也希望您能夠多多關註WalkonNet的更多內容!

推薦閱讀: