golang 並發編程之生產者消費者詳解
golang 最吸引人的地方可能就是並發瞭,無論代碼的編寫上,還是性能上面,golang 都有絕對的優勢
學習一個語言的並發特性,我喜歡實現一個生產者消費者模型,這個模型非常經典,適用於很多的並發場景,下面我通過這個模型,來簡單介紹一下 golang 的並發編程
go 並發語法
協程 go
協程是 golang 並發的最小單元,類似於其他語言的線程,隻不過線程的實現借助瞭操作系統的實現,每次線程的調度都是一次系統調用,需要從用戶態切換到內核態,這是一項非常耗時的操作,因此一般的程序裡面線程太多會導致大量的性能耗費在線程切換上。而在 golang 內部實現瞭這種調度,協程在這種調度下面的切換非常的輕量級,成百上千的協程跑在一個 golang 程序裡面是很正常的事情
golang 為並發而生,啟動一個協程的語法非常簡單,使用 go 關鍵字即可
go func () { // do something }
同步信號 sync.WaitGroup
多個協程之間可以通過 sync.WaitGroup 同步,這個類似於 Linux 裡面的信號量
var wg sync.WaitGroup // 申明一個信號量 wg.Add(1) // 信號量加一 wg.Done() // 信號量減一 wg.Wait() // 信號量為正時阻塞,直到信號量為0時被喚醒
通道 chan
通道可以理解為一個消息隊列,生產者往隊列裡面放,消費者從隊列裡面取。通道可以使用 close 關閉
ic := make(chan int, 10) // 申明一個通道 ic <- 10 // 往通道裡面放 i := <- ic // 從通道裡面取 close(ic) // 關閉通道
生產者消費者實現
定義產品類
這個產品類根據具體的業務需求定義
type Product struct { name int value int }
生產者
如果 stop 標志不為 false,不斷地往通道裡面放 product,完成之後信號量完成
func producer(wg *sync.WaitGroup, products chan<- Product, name int, stop *bool) { for !*stop { product := Product{name: name, value: rand.Int()} products <- product fmt.Printf("producer %v produce a product: %#v\n", name, product) time.Sleep(time.Duration(200+rand.Intn(1000)) * time.Millisecond) } wg.Done() }
消費者
不斷地從通道裡面取 product,然後作對應的處理,直到通道被關閉,並且 products 裡面為空, for 循環才會終止,而這正是我們期望的
func consumer(wg *sync.WaitGroup, products <-chan Product, name int) { for product := range products { fmt.Printf("consumer %v consume a product: %#v\n", name, product) time.Sleep(time.Duration(200+rand.Intn(1000)) * time.Millisecond) } wg.Done() }
主線程
var wgp sync.WaitGroup var wgc sync.WaitGroup stop := false products := make(chan Product, 10) // 創建 5 個生產者和 5 個消費者 for i := 0; i < 5; i++ { go producer(&wgp, products, i, &stop) go consumer(&wgc, products, i) wgp.Add(1) wgc.Add(1) } time.Sleep(time.Duration(1) * time.Second) stop = true // 設置生產者終止信號 wgp.Wait() // 等待生產者退出 close(products) // 關閉通道 wgc.Wait() // 等待消費者退出
補充:Go並發編程–通過channel實現生產者消費者模型
概述
生產者消費者模型是多線程設計的經典模型,該模型被廣泛的應用到各個系統的多線程/進程模型設計中。
本文介紹瞭Go語言中channel的特性,並通過Go語言實現瞭兩個生產者消費者模型。
channel的一些特性
在Go中channel是非常重要的協程通信的手段,channel是雙向的通道,通過channel可以實現協程間數據的傳遞,通過channel也可以實現協程間的同步(後面會有介紹)。
本文介紹的生產者消費者模型主要用到瞭channel的以下特性:任意時刻隻能有一個協程能夠對channel中某一個item進行訪問。
單生產者單消費者模型
把生產者和消費者都放到一個無線循環中,這個和我們的服務器端的任務處理非常相似。生產者不斷的向channel中放入數據,而消費者不斷的從channel中取出數據,並對數據進行處理(打印)。
由於生產者的協程不會退出,所以channel的寫入會永久存在,這樣當channel中沒有放入數據時,消費者端將會阻塞,等待生產者端放入數據。
代碼的實現如下:
package main import ( "fmt" "time" ) var ch1 chan int = make(chan int) var bufChan chan int = make(chan int, 1000) var msgChan chan int = make(chan int) func sum(a int, b int) { ch1 <- a + b } // write data to channel func writer(max int) { for { for i := 0; i < max; i++ { // 簡單的向channel中放入一個整數 bufChan <- i time.Sleep(1 * time.Millisecond) //控制放入的頻率 } } } // read data fro m channel func reader(max int) { for { r := <-bufChan fmt.Printf("read value: %d\n", r) } // 通知主線程,工作結束瞭,這一步可以省略 msgChan <- 1 } func testWriterAndReader(max int) { go writer(max) go reader(max) // writer 和reader的任務結束瞭,主線程會得到通知 res := <-msgChan fmt.Printf("task is done: value=%d\n", res) } func main() { testWriterAndReader(100) }
多生產者消費者模型
我們可以利用channel在某個時間點隻能有一個協程能夠訪問其中的某一個數據,的特性來實現生產者消費者模型。由於channel具有這樣的特性,我們在放數據和消費數據時可以不需要加鎖。
package main import ( "time" "fmt" "os" ) var ch1 chan int = make(chan int) var bufChan chan int = make(chan int, 1000) var msgChan chan string = make(chan string) func sum(a int, b int) { ch1 <- a + b } // write data to channel func writer(max int) { for { for i := 0; i < max; i++ { bufChan <- i fmt.Fprintf(os.Stderr, "%v write: %d\n", os.Getpid(), i) time.Sleep(10 * time.Millisecond) } } } // read data fro m channel func reader(name string) { for { r := <-bufChan fmt.Printf("%s read value: %d\n", name, r) } msgChan <- name } func testWriterAndReader(max int) { // 開啟多個writer的goroutine,不斷地向channel中寫入數據 go writer(max) go writer(max) // 開啟多個reader的goroutine,不斷的從channel中讀取數據,並處理數據 go reader("read1") go reader("read2") go reader("read3") // 獲取三個reader的任務完成狀態 name1 := <-msgChan name2 := <-msgChan name3 := <-msgChan fmt.Println("%s,%s,%s: All is done!!", name1, name2, name3) } func main() { testWriterAndReader(100) }
輸出如下:
read3 read value: 0
80731 write: 0
80731 write: 0
read1 read value: 0
80731 write: 1
read2 read value: 1
80731 write: 1
read3 read value: 1
80731 write: 2
read2 read value: 2
80731 write: 2
… …
總結
本文通過channel實現瞭經典的生產者和消費者模型,利用瞭channel的特性。但要註意,當消費者的速度小於生產者時,channel就有可能產生擁塞,導致占用內存增加,所以,在實際場景中需要考慮channel的緩沖區的大小。
設置瞭channel的大小,當生產的數據大於channel的容量時,生產者將會阻塞,這些問題都是要在實際場景中需要考慮的。
一個解決辦法就是使用一個固定的數組或切片作為環形緩沖區,而非channel,通過Sync包的機制來進行同步,實現生產者消費者模型,這樣可以避免由於channel滿而導致消費者端阻塞。
但,對於環形緩沖區而言,可能會覆蓋老的數據,同樣需要考慮具體的使用場景。關於環形緩沖區的原理和實現,在分析Sync包的使用時再進一步分析。
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。如有錯誤或未考慮完全的地方,望不吝賜教。