Go緩沖channel和非緩沖channel的區別說明
在看本篇文章前我們需要瞭解阻塞的概念
在執行過程中暫停,以等待某個條件的觸發 ,我們就稱之為阻塞
在Go中我們make一個channel有兩種方式,分別是有緩沖的和沒緩沖的
緩沖channel 即 buffer channel 創建方式為 make(chan TYPE,SIZE)
如 make(chan int,3) 就是創建一個int類型,緩沖大小為3的 channel
非緩沖channel 即 unbuffer channel 創建方式為 make(chan TYPE)
如 make(chan int) 就是創建一個int類型的非緩沖channel
非緩沖channel 和 緩沖channel 的區別
非緩沖 channel,channel 發送和接收動作是同時發生的
例如 ch := make(chan int) ,如果沒 goroutine 讀取接收者<-ch ,那麼發送者ch<- 就會一直阻塞
緩沖 channel 類似一個隊列,隻有隊列滿瞭才可能發送阻塞
代碼演示
非緩沖 channel
package main import ( "fmt" "time" ) func loop(ch chan int) { for { select { case i := <-ch: fmt.Println("this value of unbuffer channel", i) } } } func main() { ch := make(chan int) ch <- 1 go loop(ch) time.Sleep(1 * time.Millisecond) }
這裡會報錯 fatal error: all goroutines are asleep – deadlock! 就是因為 ch<-1 發送瞭,但是同時沒有接收者,所以就發生瞭阻塞
但如果我們把 ch <- 1 放到 go loop(ch) 下面,程序就會正常運行
緩沖 channel
的阻塞隻會發生在 channel 的緩沖使用完的情況下
package main import ( "fmt" "time" ) func loop(ch chan int) { for { select { case i := <-ch: fmt.Println("this value of unbuffer channel", i) } } } func main() { ch := make(chan int,3) ch <- 1 ch <- 2 ch <- 3 ch <- 4 go loop(ch) time.Sleep(1 * time.Millisecond) }
這裡也會報 fatal error: all goroutines are asleep – deadlock! ,這是因為 channel 的大小為 3 ,而我們要往裡面塞 4 個數據,所以就會阻塞住
解決的辦法有兩個
把 channel 開大一點,這是最簡單的方法,也是最暴力的
把 channel 的信息發送者 ch <- 1 這些代碼移動到 go loop(ch) 下面 ,讓 channel 實時消費就不會導致阻塞瞭
補充:3種優雅的Go channel用法
寫Go的人應該都聽過Rob Pike的這句話
Do not communicate by sharing memory; instead, share memory by communicating.
相信很多朋友和我一樣,在實際應用中總感覺不到好處,為瞭用channel而用。但以我的切身體會來說,這是寫代碼時碰到的場景不復雜、對channel不熟悉導致的,所以希望這篇文章能給大傢帶來點新思路,對Golang優雅的channel有更深的認識 :)
Fan In/Out
數據的輸出有時候需要做扇出/入(Fan In/Out),但是在函數中調用常常得修改接口,而且上下遊對於數據的依賴程度非常高,所以一般使用通過channel進行Fan In/Out,這樣就可以輕易實現類似於shell裡的管道。
func fanIn(input1, input2 <-chan string) <-chan string { c := make(chan string) go func() { for { select { case s := <-input1: c <- s case s := <-input2: c <- s } } }() return c }
同步Goroutine
兩個goroutine之間同步狀態,例如A goroutine需要讓B goroutine退出,一般做法如下:
func main() { g = make(chan int) quit = make(chan bool) go B() for i := 0; i < 3; i++ { g <- i } quit <- true // 沒辦法等待B的退出隻能Sleep fmt.Println("Main quit") } func B() { for { select { case i := <-g: fmt.Println(i + 1) case <-quit: fmt.Println("B quit") return } } } /* Output: 1 2 3 Main quit */
可是瞭main函數沒辦法等待B合適地退出,所以B quit 沒辦法打印,程序直接退出瞭。
然而,chan是Go裡的第一對象,所以可以把chan傳入chan中,所以上面的代碼可以把quit 定義為chan chan bool,以此控制兩個goroutine的同步
func main() { g = make(chan int) quit = make(chan chan bool) go B() for i := 0; i < 5; i++ { g <- i } wait := make(chan bool) quit <- wait <-wait //這樣就可以等待B的退出瞭 fmt.Println("Main Quit") } func B() { for { select { case i := <-g: fmt.Println(i + 1) case c := <-quit: c <- true fmt.Println("B Quit") return } } } /* Output 1 2 3 B Quit Main Quit */
分佈式遞歸調用
在現實生活中,如果你要找美國總統聊天,你會怎麼做?
第一步打電話給在美國的朋友,然後他們也會發動自己的關系網,再找可能認識美國總統的人,以此類推,直到找到為止。
這在Kadmelia分佈式系統中也是一樣的,如果需要獲取目標ID信息,那麼就不停地查詢,被查詢節點就算沒有相關信息,也會返回它覺得最近節點,直到找到ID或者等待超時。
好瞭,這個要用Go來實現怎麼做呢?
func recursiveCall(ctx context.Context, id []byte, initialNodes []*node){ seen := map[string]*node{} //已見過的節點記錄 request := make(chan *node, 3) //設置請求節點channel // 輸入初始節點 go func() { for _, n := range initialNodes { request <- n } }() OUT: for { //循環直到找到數據 if data != nil { return } // 在新的請求,超時和上層取消請求中select select { case n := <-request: go func() { // 發送新的請求 response := s.sendQuery(ctx, n, MethodFindValue, id) select { case <-ctx.Done(): case msg :=<-response: seen[responseToNode(response)] = n //更新已見過的節點信息 // 加載新的節點 for _, rn := range LoadNodeInfoFromByte(msg[PayLoadStart:]) { mu.Lock() _, ok := seen[rn.HexID()] mu.Unlock() // 見過瞭,跳過這個節點 if ok { continue } AddNode(rn) // 將新的節點送入channel request <- rn } } } }() case <-time.After(500 * time.Millisecond): break OUT // break至外層,否則僅僅是跳至loop外 case <-ctx.Done(): break OUT } } return }
這時的buffered channel類似於一個局部queue,對需要的節點進行處理,但這段代碼的精妙之處在於,這裡的block操作是select的,隨時可以取消,而不是要等待或者對queue的長度有認識。
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。如有錯誤或未考慮完全的地方,望不吝賜教。
推薦閱讀:
- go語言中如何使用select的實現示例
- go select的用法
- Golang的select多路復用及channel使用操作
- Go select使用與底層原理講解
- Go語言如何輕松編寫高效可靠的並發程序