Go1.18新特性使用Generics泛型進行流式處理

前言

Stream 是一個基於 Go 1.18+ 泛型的流式處理庫, 它支持並行處理流中的數據. 並行流會將元素平均劃分多個的分區, 並創建相同數量的 goroutine 執行, 並且會保證處理完成後流中元素保持原始順序.

GitHub – xyctruth/stream: A Stream library based on Go 1.18+ Generics (Support Parallel Stream)

安裝

需要安裝 Go 1.18+ 版本

$ go get github.com/xyctruth/stream

在代碼中導入它

import "github.com/xyctruth/stream"

基礎

s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
    Filter(func(s string) bool { return s != "b" }).
    Map(func(s string) string { return "class_" + s }).
    Sort().
    Distinct().
    ToSlice()
// 需要轉換切片元素的類型
s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}).
    Filter(func(v int) bool { return v >3 }).
    Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }).
    Reduce(func(r string, v string) string { return r + v })

類型約束

any 接受任何類型的元素, 所以不能使用 == != > < 比較元素, 導致你不能使用 Sort(), Find()…等函數 ,但是你可以使用 SortFunc(fn), FindFunc(fn)… 代替

type SliceStream[E any] struct {
    slice      []E
}
stream.NewSlice([]int{1, 2, 3, 7, 1})

comparable 接收的類型可以使用 == != 比較元素, 但仍然不能使用 > < 比較元素, 因此你不能使用 Sort(), Min()…等函數 ,但是你可以使用 SortFunc(fn), MinFunc()… 代替

type SliceComparableStream[E comparable] struct {
    SliceStream[E]
}
stream.NewSliceByComparable([]int{1, 2, 3, 7, 1})

constraints.Ordered 接收的類型可以使用 == != > <, 所以可以使用所有的函數

type SliceOrderedStream[E constraints.Ordered] struct {
    SliceComparableStream[E]
}
stream.NewSliceByOrdered([]int{1, 2, 3, 7, 1})

類型轉換

有些時候我們需要使用 Map ,Reduce 轉換切片元素的類型,但是很遺憾目前 Golang 並不支持結構體的方法有額外的類型參數,所有類型參數必須在結構體中聲明。在 Golang 支持之前我們暫時使用臨時方案解決這個問題。

// SliceMappingStream  Need to convert the type of slice elements.
// - E elements type
// - MapE map elements type
// - ReduceE reduce elements type
type SliceMappingStream[E any, MapE any, ReduceE any] struct {
    SliceStream[E]
}
s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}).
    Filter(func(v int) bool { return v >3 }).
    Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }).
    Reduce(func(r string, v string) string { return r + v })

並行

Parallel 函數接收一個 goroutines int 參數. 如果 goroutines>1 則開啟並行, 否則關閉並行, 默認流是關閉並行的。

並行會將流中的元素平均劃分多個的分區, 並創建相同數量的 goroutine 執行, 並且會保證處理完成後流中元素保持原始順序.

s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
    Parallel(10).
    Filter(func(s string) bool {
    // 一些耗時操作
    return s != "b"
    }).
    Map(func(s string) string {
    // 一些耗時操作
    return "class_" + s
    }).
    ForEach(
    func(index int, s string) {
    // 一些耗時操作
    },
    ).ToSlice()

並行類型

  • First: 一旦獲得第一個返回值,並行處理就結束. For: AllMatch, AnyMatch, FindFunc
  • ALL: 所有元素都需要並行處理,得到所有返回值,然後並行結束. For: Map, Filter
  • Action: 所有元素需要並行處理,不需要返回值. For: ForEach, Action

並行 goroutines

開啟並行 goroutine 數量在面對 CPU 操作與 IO 操作有著不同的選擇。

一般面對 CPU 操作時 goroutine 數量不需要設置大於 CPU 核心數,而 IO 操作時 goroutine 數量可以設置遠遠大於 CPU 核心數.

CPU 操作

NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) {
    sort.Ints(newArray(1000)) //  模擬 CPU 耗時操作
})

使用6個cpu核心進行基準測試

go test -run=^$ -benchtime=5s -cpu=6  -bench=^BenchmarkParallelByCPU
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByCPU/no_parallel(0)-6         	     717	   9183119 ns/op
BenchmarkParallelByCPU/goroutines(2)-6          	    1396	   4303113 ns/op
BenchmarkParallelByCPU/goroutines(4)-6          	    2539	   2388197 ns/op
BenchmarkParallelByCPU/goroutines(6)-6          	    2932	   2159407 ns/op
BenchmarkParallelByCPU/goroutines(8)-6          	    2334	   2577405 ns/op
BenchmarkParallelByCPU/goroutines(10)-6         	    2649	   2352926 ns/op

IO 操作

NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) {
    time.Sleep(time.Millisecond) // 模擬 IO 耗時操作
})

使用6個cpu核心進行基準測試

go test -run=^$ -benchtime=5s -cpu=6  -bench=^BenchmarkParallelByIO
goos: darwin
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByIO/no_parallel(0)-6          	      52	 102023558 ns/op
BenchmarkParallelByIO/goroutines(2)-6           	     100	  55807303 ns/op
BenchmarkParallelByIO/goroutines(4)-6           	     214	  27868725 ns/op
BenchmarkParallelByIO/goroutines(6)-6           	     315	  18925789 ns/op
BenchmarkParallelByIO/goroutines(8)-6           	     411	  14439700 ns/op
BenchmarkParallelByIO/goroutines(10)-6          	     537	  11164758 ns/op
BenchmarkParallelByIO/goroutines(50)-6          	    2629	   2310602 ns/op
BenchmarkParallelByIO/goroutines(100)-6         	    5094	   1221887 ns/op

項目地址 https://github.com/xyctruth/stream

以上就是Go1.18新特性使用Generics泛型進行流式處理的詳細內容,更多關於Go1.18 Generics泛型流式處理的資料請關註WalkonNet其它相關文章!

推薦閱讀: