深入理解golang chan的使用

前言

之前在看golang多線程通信的時候, 看到瞭go 的管道. 當時就覺得這玩意很神奇, 因為之前接觸過的不管是php, java, Python, js, c等等, 都沒有這玩意, 第一次見面, 難免勾起我的好奇心. 所以就想著看一看它具體是什麼東西. 很明顯, 管道是go實現在語言層面的功能, 所以我以為需要去翻他的源碼瞭. 雖然最終沒有翻到C的層次, 不過還是受益匪淺.

見真身

結構體

要想知道他是什麼東西, 沒什麼比直接看他的定義更加直接的瞭. 但是其定義在哪裡麼? 去哪裡找呢? 還記得我們是如何創建chan的麼? make方法. 但是當我找過去的時候, 發現make方法隻是一個函數的聲明.

這, 還是沒有函數的具體實現啊. 匯編看一下. 編寫以下內容:

package main

func main() {
	_ = make(chan int)
}

執行命令:

go tool compile -N -l -S main.go

雖然匯編咱看不懂, 但是其中有一行還是引起瞭我的註意.

make調用瞭runtime.makechan. 漂亮, 就找他.

找到他瞭, 是hchan指針對象. 整理瞭一下對象的字段(不過人傢自己也有註釋的):

// 其內部維護瞭一個循環隊列(數組), 用於管理發送與接收的緩存數據. 
type hchan struct {
  // 隊列中元素個數
	qcount   uint
  // 隊列的大小(數組長度)
	dataqsiz uint
  // 指向底層的緩存隊列, 是一個可以指向任意類型的指針. 
	buf      unsafe.Pointer
  // 管道每個元素的大小
	elemsize uint16
  // 是否被關閉瞭
	closed   uint32
  // 管道的元素類型
	elemtype *_type
  // 當前可以發送的元素索引(隊尾)
	sendx    uint  
  // 當前可以接收的元素索引(隊首)
	recvx    uint  
  // 當前等待接收數據的 goroutine 隊列
	recvq    waitq
  // 當前等待發送數據的 goroutine 隊列
	sendq    waitq 
	// 鎖, 用來保證管道的每個操作都是原子性的. 
	lock mutex
}

可以看的出來, 管道簡單說就是一個隊列加一把鎖.

發送數據

依舊使用剛才的方法分析, 發送數據時調用瞭runtime.chansend1 函數. 其實現簡單易懂:

然後查看真正實現, 函數步驟如下(個人理解, 有一些 test 使用的代碼被我刪掉瞭. ):

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // 異常處理, 若管道指針為空
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	// 常量判斷, 恒為 false, 應該是開發時調試用的. 
	if debugChan {
		print("chansend: chan=", c, "\n")
	}
	// 常量, 恒為 false, 沒看懂這個判斷
	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}
  // 若當前操作不阻塞, 且管道還沒有關閉時判斷
  // 當前隊列容量為0且沒有等待接收數據的 或 當前隊列容量不為0且隊列已滿
  // 那麼問題來瞭, 什麼時候不加鎖呢? select 的時候. 可以在不阻塞的時候快速返回
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}
	// 上鎖, 保證操作的原子性
	lock(&c.lock)
	// 若管道已經關閉, 報錯
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	// 從接受者隊列獲取一個接受者, 若存在, 數據直接發送, 不走緩存, 提高效率
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
	// 若緩存為滿, 則將數據放到緩存中排隊
	if c.qcount < c.dataqsiz {
    // 取出對尾的地址
		qp := chanbuf(c, c.sendx)
    // 將ep 的內容拷貝到 ap 地址
		typedmemmove(c.elemtype, qp, ep)
    // 更新隊尾索引
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
	// 若當前不阻塞, 直接返回
	if !block {
		unlock(&c.lock)
		return false
	}
	// 當走到這裡, 說明數據沒有成功發送, 且需要阻塞等待. 
  // 以下代碼沒看懂, 不過可以肯定的是, 其操作為阻塞當前協程, 等待發送數據
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	KeepAlive(ep)
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

雖然最終阻塞的地方沒看太明白, 不過發送數據的大體流程很清楚:

  • 若無需阻塞且不能發送數據, 返回失敗
  • 若存在接收者, 直接發送數據
  • 若存在緩存, 將數據放到緩存中
  • 若無需阻塞, 返回失敗
  • 阻塞等待發送數據

其中不加鎖的操作, 在看到selectnbsend函數的註釋時如下:

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

看這意思, select關鍵字有點類似於語法糖, 其內部會轉換成調用selectnbsend函數的簡單if判斷.

接收數據

至於接收數據的方法, 其內部實現與發送大同小異. runtime.chanrecv 方法.

源碼簡單看瞭一下, 雖理解不深, 但對channel也有瞭大體的認識.

上手

簡單對channel的使用總結一下.

定義

// 創建普通的管道類型, 非緩沖
a := make(chan int)
// 創建緩沖區大小為10的管道
b := make(chan int, 10)
// 創建隻用來發送的管道
c := make(chan<- int)
// 創建隻用來接收的管道
d := make(<-chan int)
// eg: 隻用來接收的管道, 每秒一個
e := time.After(time.Second)

發送與接收

// 接收數據
a := <- ch
b, ok := <- ch
// 發送數據
ch <- 2

最後, 看瞭一圈, 感覺channel並不是很復雜, 就是一個隊列, 一端接受, 一端發送. 不過其對多協程處理做瞭很多優化. 與協程配合, 靈活使用的話, 應該會有不錯的效果.

到此這篇關於深入理解golang chan的使用的文章就介紹到這瞭,更多相關golang chan內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: