詳解Go語言中Goroutine退出機制的原理及使用

goroutine是Go語言提供的語言級別的輕量級線程,在我們需要使用並發時,我們隻需要通過 go 關鍵字來開啟 goroutine 即可。作為Go語言中的最大特色之一,goroutine在日常的工作學習中被大量使用著,但是對於它的調度處理,尤其是goroutine的退出時機和方式,很多小夥伴都沒有搞的很清楚。因為最近的項目中遇到瞭問題—需要防止goroutine還沒執行完就直接退出,因此我仔細地調研瞭下goroutine的退出方式以及阻止goroutine退出的方法,希望能給到一些幫助。

goroutine的調度是由 Golang 運行時進行管理的。同一個程序中的所有 goroutine 共享同一個地址空間。goroutine設計的退出機制是由goroutine自己退出,不能在外部強制結束一個正在執行的goroutine(隻有一種情況正在運行的goroutine會因為其他goroutine的結束被終止,就是main函數退出或程序停止執行)。下面我先介紹下幾種退出方式:

退出方式

進程/main函數退出

kill進程/進程crash

當進程被強制退出,所有它占有的資源都會還給操作系統,而goroutine作為進程內的線程,資源被收回瞭,那麼還未結束的goroutine也會直接退出

main函數結束

同理,當主函數結束,goroutine的資源也會被收回,直接退出。具體可參考下下面的demo,其中go routine裡需要print出來的語句是永遠也不會出現的。

package main

import (

   "fmt"

   "time"

)

func routineTest() {

   time.Sleep(time.Second)

   fmt.Println("I'm alive")

}

func main(){

   fmt.Println("start test")

   go routineTest()

   fmt.Println("end test")

}

通過channel退出

Go實現瞭兩種並發形式。第一種是大傢普遍認知的:多線程共享內存。其實就是Java或者C++等語言中的多線程開發。另外一種是Go語言特有的,也是Go語言推薦的:CSP(communicating sequential processes)並發模型。CSP並發模型是在1970年左右提出的概念,屬於比較新的概念,不同於傳統的多線程通過共享內存來通信,CSP講究的是“以通信的方式來共享內存”。

其核心思想為:

DO NOT COMMUNICATE BY SHARING MEMORY; INSTEAD, SHARE MEMORY BY COMMUNICATING.

“不要以共享內存的方式來通信,相反,要通過通信來共享內存。”

普通的線程並發模型,就是像Java、C++、或者Python,他們線程間通信都是通過共享內存的方式來進行的。非常典型的方式就是,在訪問共享數據(例如數組、Map、或者某個結構體或對象)的時候,通過鎖來訪問,因此,在很多時候,衍生出一種方便操作的數據結構,叫做“線程安全的數據結構”。例如Java提供的包”java.util.concurrent”中的數據結構。Go中也實現瞭傳統的線程並發模型。

Go的CSP並發模型,就是通過goroutine和channel來實現的。

因為不是本文重點,在此對channel不做過多介紹,隻需要瞭解channel是goroutine之間的通信機制。 通俗的講,就是各個goroutine之間通信的”管道“,有點類似於Linux中的管道。channel是go最推薦的goroutine間的通信方式,同時通過channel來通知goroutine退出也是最主要的goroutine退出方式。goroutine雖然不能強制結束另外一個goroutine,但是它可以通過channel通知另外一個goroutine你的表演該結束瞭。

package main

import (

   "fmt"

   "time"

)

func cancelByChannel(quit <-chan time.Time) {

   for {

      select {

      case <-quit:

         fmt.Println("cancel goroutine by channel!")

         return

      default:

         fmt.Println("I'm alive")

         time.Sleep(1 * time.Second)

      }

   }

}

func main() {

   quit := time.After(time.Second * 10)

   go cancelByChannel(quit)

   time.Sleep(15*time.Second)

   fmt.Println("I'm done")

}

在上面的例子中,我們用時間定義瞭一個channel,當10秒後,會給到goroutine一個退出信號,然後go routine就會退出。這樣我們就實現瞭在其他線程中通知另一個線程退出的功能。

通過context退出

通過channel通知goroutine退出還有一個更好的方法就是使用context。沒錯,就是我們在日常開發中接口通用的第一個參數context。它本質還是接收一個channel數據,隻是是通過ctx.Done()獲取。將上面的示例稍作修改即可。

package main

import (

   "context"

   "fmt"

   "time"

)

func cancelByContext(ctx context.Context) {

   for {

      select {

      case <- ctx.Done():

         fmt.Println("cancel goroutine by context!")

         return

      default:

         fmt.Println("I'm alive")

         time.Sleep(1 * time.Second)

      }

   }

}

func main() {

   ctx, cancel := context.WithCancel(context.Background())

   go cancelByContext(ctx)

   time.Sleep(10*time.Second)

   cancel()

   time.Sleep(5*time.Second)

}

上面的case中,通過context自帶的WithCancel方法將cancel函數傳遞出來,然後手動調用cancel()函數給goroutine傳遞瞭ctx.Done()信號。context也提供瞭context.WithTimeout()和context.WithDeadline()方法來更方便的傳遞特定情況下的Done信號。

package main
import (

   "context"

   "fmt"

   "time"

)

func cancelByContext(ctx context.Context) {

   for {

      select {

      case <- ctx.Done():

         fmt.Println("cancel goroutine by context!")

         return

      default:

         fmt.Println("I'm alive")

         time.Sleep(1 * time.Second)

      }

   }

}

func main() {

   ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

   go cancelByContext(ctx)

   time.Sleep(15*time.Second)

}

上述case中使用瞭context.WithTimeout()來設置10秒後自動退出,使用context.WithDeadline()的功能基本一樣。區別是context.WithDeadline()可以指定一個固定的時間點,當然也可以使用time.Now().Add(time.Second*10)的方式來實現同context.WithTimeout()相同的功能。具體示例如下:

package main

import (

   "context"

   "fmt"

   "time"

)

func cancelByContext(ctx context.Context) {

   for {

      select {

      case <- ctx.Done():

         fmt.Println("cancel goroutine by context!")

         return

      default:

         fmt.Println("I'm alive")

         time.Sleep(1 * time.Second)

      }

   }

}

func main() {

   ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))

   go cancelByContext(ctx)

   time.Sleep(15*time.Second)

}

註:這裡需要註意的一點是上方兩個case中為瞭方便讀者理解,我將context傳回的cancel()函數拋棄掉瞭,實際使用中通常會加上defer cancel()來保證goroutine被殺死。

附:Context 使用原則和技巧

  • 不要把Context放在結構體中,要以參數的方式傳遞,parent Context一般為Background
  • 應該要把Context作為第一個參數傳遞給入口請求和出口請求鏈路上的每一個函數,放在第一位,變量名建議都統一,如ctx。
  • 給一個函數方法傳遞Context的時候,不要傳遞nil,否則在tarce追蹤的時候,就會斷瞭連接
  • Context的Value相關方法應該傳遞必須的數據,不要什麼數據都使用這個傳遞
  • Context是線程安全的,可以放心的在多個goroutine中傳遞
  • 可以把一個 Context 對象傳遞給任意個數的 gorotuine,對它執行 取消 操作時,所有 goroutine 都會接收到取消信號。

通過Panic退出

這是一種不推薦使用的方法!!!在此給出隻是提出這種操作的可能性。實際場景中尤其是生產環境請慎用!!

package main

import (

   "context"

   "fmt"

   "time"

)

func cancelByPanic(ctx context.Context) {

   defer func() {

      if err := recover(); err != nil {

         fmt.Println("cancel goroutine by panic!")

      }

   }()

   for i:=0 ; i< 5 ;i++{

      fmt.Println("hello cancelByPanic")

      time.Sleep(1 * time.Second)

   }

   panic("panic")

}

func main() {

   ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

   defer cancel()

   go cancelByPanic(ctx)

   time.Sleep(5*time.Second)

}

這裡我們通過在defer函數中使用recover來捕獲panic error並從panic中拿回控制權,確保程序不會再panic展開到goroutine調用棧頂部後崩潰。

等待自己退出

這是goroutine最常見的退出方式。我們通常都會等待goroutine執行完指定的任務之後自己退出。所以此處就不給示例瞭。

阻止goroutine退出的方法

瞭解到goroutine的退出方式後,我們已經可以解決一類問題。那就是當你需要手動控制某個goroutine結束的時候應該怎麼辦。但是在實際生產中關於goroutine還有一類問題需要解決,那就是當你的主進程結束時,應該如何等待goroutine全部執行完畢後再使主進程退出。

阻止程序退出的方法一種有兩種:

通過sync.WaitGroup

package main

import (

   "fmt"

)

func main() {

   arr := [3]string{"a", "b", "c"}

   for _, v := range arr {

      go func(s string) {

         fmt.Println(s)

      }(v)

   }

   fmt.Println("End")

}

以上方的case為例,可見我們在什麼都不加的時候,不會等待go func執行完主程序就會退出。因此下面給出使用WaitGroup的方法。

package main

import (

    "fmt"

    "sync"

)

func main() {

    var wg sync.WaitGroup // 定義 WaitGroup

    arr := [3]string{"a", "b", "c"}



    for _, v := range arr {

        wg.Add(1) // 增加一個 wait 任務

        go func(s string) {

            defer wg.Done() // 函數結束時,通知此 wait 任務已經完成

            fmt.Println(s)

        }(v)

    }

    // 等待所有任務完成

    wg.Wait()

}

WaitGroup可以理解為一個goroutine管理者。他需要知道有多少個goroutine在給他幹活,並且在幹完的時候需要通知他幹完瞭,否則他就會一直等,直到所有的小弟的活都幹完為止。我們加上WaitGroup之後,程序會進行等待,直到它收到足夠數量的Done()信號為止。

WaitGroup可被調用的方法隻有三個:Add() 、Done()、Wait()。通過這三個方法即可實現上述的功能,下面我們把源碼貼出。

func (wg *WaitGroup) Add(delta int) {

        statep := wg.state()

        state := atomic.AddUint64(statep, uint64(delta)<<32)

        v := int32(state >> 32) // 計數器

        w := uint32(state)      // 等待者個數。這裡用uint32,會直接截斷瞭高位32位,留下低32位

        if v < 0 {

                // Done的執行次數超出Add的數量

                panic("sync: negative WaitGroup counter")

        }

        if w != 0 && delta > 0 && v == int32(delta) {

                // 最開始時,Wait不能在Add之前被執行

                panic("sync: WaitGroup misuse: Add called concurrently with Wait")

        }

        if v > 0 || w == 0 {

                // 計數器不為零,還有沒Done的。return

    // 沒有等待者。return

                return

        }

        // 所有goroutine都完成任務瞭,但有goroutine執行瞭Wait後被阻塞,需要喚醒它

        if *statep != state {

                // 已經到瞭喚醒階段瞭,就不能同時並發Add瞭

                panic("sync: WaitGroup misuse: Add called concurrently with Wait")

        }

  // 清零之後,就可以繼續Add和Done瞭

        *statep = 0

        for ; w != 0; w-- {

    // 喚醒

                runtime_Semrelease(&wg.sema, false)

        }

}

func (wg *WaitGroup) Done() {

        wg.Add(-1)

}

func (wg *WaitGroup) Wait() {

        statep := wg.state()

        for {

                state := atomic.LoadUint64(statep)

                v := int32(state >> 32) // 計數器

                w := uint32(state)      // 等待者個數

                if v == 0 {

                        // 如果聲明變量後,直接執行Wait也不會有問題

                        // 下面CAS操作失敗,重試,但剛好發現計數器變成零瞭,安全退出

                        return

                }

                if atomic.CompareAndSwapUint64(statep, state, state+1) {

                        if race.Enabled && w == 0 {

                                race.Write(unsafe.Pointer(&wg.sema))

                        }

                        // 掛起當前的g

                        runtime_Semacquire(&wg.sema)

                        // 被喚醒後,計數器不應該大於0

                        // 大於0意味著Add的數量被Done完後,又開始瞭新一波Add

                        if *statep != 0 {

                                panic("sync: WaitGroup is reused before previous Wait has returned")

                        }

                        return

                }

        }

}

通過看源碼,我們可以知道,有些使用細節是需要註意的:

1.wg.Done()函數實際上實現的是wg.Add(-1),因此直接使用wg.Add(-1)是會造成同樣的結果的。在實際使用中要註意避免誤操作,使得監聽的goroutine數量出現誤差。

2.wg.Add()函數可以一次性加n。但是實際使用時通常都設為1。但是wg本身的counter不能設為負數。假設你在沒有Add到10以前,一次性wg.Add(-10),會出現panic !

package main

import (

   "fmt"

   "sync"
)

func main() {

   var wg sync.WaitGroup // 定義 WaitGroup

   arr := [3]string{"a", "b", "c"}

   for _, v := range arr {

      wg.Add(1) // 增加一個 wait 任務

      go func(s string) {

         defer wg.Done() // 函數結束時,通知此 wait 任務已經完成

         fmt.Println(s)

      }(v)

   }

   wg.Add(-10)

   // 等待所有任務完成

   wg.Wait()

}

panic: sync: negative WaitGroup counter

3.如果你的程序寫的有問題,出現瞭始終等待的waitgroup會造成死鎖。

package main

import (

   "fmt"

   "sync"

)

func main() {

   var wg sync.WaitGroup // 定義 WaitGroup

   arr := [3]string{"a", "b", "c"}

   for _, v := range arr {

      wg.Add(1) // 增加一個 wait 任務

      go func(s string) {

         defer wg.Done() // 函數結束時,通知此 wait 任務已經完成

         fmt.Println(s)

      }(v)

   }

   wg.Add(1)

   // 等待所有任務完成

   wg.Wait()

}

fatal error: all goroutines are asleep - deadlock!

通過channel

第二種方法即是通過channel。具體寫法如下:

package main
import "fmt"

func main() {

    arr := [3]string{"a", "b", "c"}

    ch := make(chan struct{}, len(arr))

    for _, v := range arr {

        go func(s string) {

            fmt.Println(s)

            ch <- struct{}{}

        }(v)

    }

    for i := 0; i < len(arr); i ++ {

        <-ch

    }

}

需要註意的是,channel同樣會導致死鎖。如下方示例:

package main

import "fmt"

func main() {

   arr := [3]string{"a", "b", "c"}

   ch := make(chan struct{}, len(arr))

   for _, v := range arr {

      go func(s string) {

         fmt.Println(s)

         ch <- struct{}{}

      }(v)

   }

   for i := 0; i < len(arr); i++ {

      <-ch

   }

   <-ch

}

fatal error: all goroutines are asleep - deadlock!

封裝

利用go routine的這一特性,我們可以將waitGroup等方式封裝起來,保證go routine在主進程結束時會繼續執行完。封裝demo:

package main

import (

   "fmt"

   "sync"

)

type WaitGroupWrapper struct {

   sync.WaitGroup

}

func (wg *WaitGroupWrapper) Wrap(f func(args ...interface{}), args ...interface{}) {

   wg.Add(1)

   go func() {

      f(args...)

      wg.Done()

   }()

}

func printArray(args ...interface{}){

   fmt.Println(args)

}

func main() {

   var w WaitGroupWrapper // 定義 WaitGroup

   arr := [3]string{"a", "b", "c"}



   for _, v := range arr {

      w.Wrap(printArray,v)

   }

   w.Wait()

}

還可以加上更高端一點的功能,增加時間、事件雙控制的wrapper。

package main

import (

   "fmt"

   "sync"

   "time"

)

type WaitGroupWrapper struct {

   sync.WaitGroup

}

func (wg *WaitGroupWrapper) Wrap(f func(args ...interface{}), args ...interface{}) {

   wg.Add(1)

   go func() {

      f(args...)

      wg.Done()

   }()

}

func (w *WaitGroupWrapper) WaitWithTimeout(d time.Duration) bool {

   ch := make(chan struct{})

   t := time.NewTimer(d)

   defer t.Stop()



   go func() {

      w.Wait()

      ch <- struct{}{}

   }()

   select {

   case <-ch:

      fmt.Println("job is done!")

      return true

   case <-t.C:

      fmt.Println("time is out!")

      return false

   }

}
func printArray(args ...interface{}){

   time.Sleep(3*time.Second) //3秒後會觸發time is out分支

   //如果改為time.Sleep(time.Second)即會觸發job is done分支

   fmt.Println(args)

}

func main() {

   var w WaitGroupWrapper // 定義 WaitGroup

   arr := [3]string{"a", "b", "c"}

   for _, v := range arr {

      w.Wrap(printArray,v)

   }

   w.WaitWithTimeout(2*time.Second)

}

總結

在本篇文章中,先介紹瞭goroutine的所有的退出方式,包括:

1)進程/main函數退出;

2)通過channel退出;

3)通過context退出;

4)通過panic退出;

5)等待自己退出。

又總結瞭阻止goroutine退出的方法:

1)通過sync.WaitGroup ;

2)通過channel。

最後給出瞭封裝好帶有阻止goroutine退出功能的wrapper demo。

以上就是詳解Go語言中Goroutine退出機制的原理及使用的詳細內容,更多關於Go語言 Goroutine退出機制的資料請關註WalkonNet其它相關文章!

推薦閱讀: