Go1.18新特性使用Generics泛型進行流式處理
前言
Stream 是一個基於 Go 1.18+ 泛型的流式處理庫, 它支持並行處理流中的數據. 並行流會將元素平均劃分多個的分區, 並創建相同數量的 goroutine 執行, 並且會保證處理完成後流中元素保持原始順序.
GitHub – xyctruth/stream: A Stream library based on Go 1.18+ Generics (Support Parallel Stream)
安裝
需要安裝 Go 1.18+ 版本
$ go get github.com/xyctruth/stream
在代碼中導入它
import "github.com/xyctruth/stream"
基礎
s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}). Filter(func(s string) bool { return s != "b" }). Map(func(s string) string { return "class_" + s }). Sort(). Distinct(). ToSlice() // 需要轉換切片元素的類型 s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}). Filter(func(v int) bool { return v >3 }). Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }). Reduce(func(r string, v string) string { return r + v })
類型約束
any
接受任何類型的元素, 所以不能使用 ==
!=
>
<
比較元素, 導致你不能使用 Sort(), Find()…等函數 ,但是你可以使用 SortFunc(fn), FindFunc(fn)… 代替
type SliceStream[E any] struct { slice []E } stream.NewSlice([]int{1, 2, 3, 7, 1})
comparable
接收的類型可以使用 ==
!=
比較元素, 但仍然不能使用 >
<
比較元素, 因此你不能使用 Sort(), Min()…等函數 ,但是你可以使用 SortFunc(fn), MinFunc()… 代替
type SliceComparableStream[E comparable] struct { SliceStream[E] } stream.NewSliceByComparable([]int{1, 2, 3, 7, 1})
constraints.Ordered
接收的類型可以使用 ==
!=
>
<
, 所以可以使用所有的函數
type SliceOrderedStream[E constraints.Ordered] struct { SliceComparableStream[E] } stream.NewSliceByOrdered([]int{1, 2, 3, 7, 1})
類型轉換
有些時候我們需要使用 Map
,Reduce
轉換切片元素的類型,但是很遺憾目前 Golang 並不支持結構體的方法有額外的類型參數,所有類型參數必須在結構體中聲明。在 Golang 支持之前我們暫時使用臨時方案解決這個問題。
// SliceMappingStream Need to convert the type of slice elements. // - E elements type // - MapE map elements type // - ReduceE reduce elements type type SliceMappingStream[E any, MapE any, ReduceE any] struct { SliceStream[E] } s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}). Filter(func(v int) bool { return v >3 }). Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }). Reduce(func(r string, v string) string { return r + v })
並行
Parallel
函數接收一個 goroutines int
參數. 如果 goroutines>1 則開啟並行, 否則關閉並行, 默認流是關閉並行的。
並行會將流中的元素平均劃分多個的分區, 並創建相同數量的 goroutine 執行, 並且會保證處理完成後流中元素保持原始順序.
s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}). Parallel(10). Filter(func(s string) bool { // 一些耗時操作 return s != "b" }). Map(func(s string) string { // 一些耗時操作 return "class_" + s }). ForEach( func(index int, s string) { // 一些耗時操作 }, ).ToSlice()
並行類型
First
: 一旦獲得第一個返回值,並行處理就結束. For: AllMatch, AnyMatch, FindFuncALL
: 所有元素都需要並行處理,得到所有返回值,然後並行結束. For: Map, FilterAction
: 所有元素需要並行處理,不需要返回值. For: ForEach, Action
並行 goroutines
開啟並行 goroutine 數量在面對 CPU 操作與 IO 操作有著不同的選擇。
一般面對 CPU 操作時 goroutine 數量不需要設置大於 CPU 核心數,而 IO 操作時 goroutine 數量可以設置遠遠大於 CPU 核心數.
CPU 操作
NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) { sort.Ints(newArray(1000)) // 模擬 CPU 耗時操作 })
使用6個cpu核心進行基準測試
go test -run=^$ -benchtime=5s -cpu=6 -bench=^BenchmarkParallelByCPU goarch: amd64 pkg: github.com/xyctruth/stream cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz BenchmarkParallelByCPU/no_parallel(0)-6 717 9183119 ns/op BenchmarkParallelByCPU/goroutines(2)-6 1396 4303113 ns/op BenchmarkParallelByCPU/goroutines(4)-6 2539 2388197 ns/op BenchmarkParallelByCPU/goroutines(6)-6 2932 2159407 ns/op BenchmarkParallelByCPU/goroutines(8)-6 2334 2577405 ns/op BenchmarkParallelByCPU/goroutines(10)-6 2649 2352926 ns/op
IO 操作
NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) { time.Sleep(time.Millisecond) // 模擬 IO 耗時操作 })
使用6個cpu核心進行基準測試
go test -run=^$ -benchtime=5s -cpu=6 -bench=^BenchmarkParallelByIO goos: darwin goarch: amd64 pkg: github.com/xyctruth/stream cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz BenchmarkParallelByIO/no_parallel(0)-6 52 102023558 ns/op BenchmarkParallelByIO/goroutines(2)-6 100 55807303 ns/op BenchmarkParallelByIO/goroutines(4)-6 214 27868725 ns/op BenchmarkParallelByIO/goroutines(6)-6 315 18925789 ns/op BenchmarkParallelByIO/goroutines(8)-6 411 14439700 ns/op BenchmarkParallelByIO/goroutines(10)-6 537 11164758 ns/op BenchmarkParallelByIO/goroutines(50)-6 2629 2310602 ns/op BenchmarkParallelByIO/goroutines(100)-6 5094 1221887 ns/op
項目地址 https://github.com/xyctruth/stream
以上就是Go1.18新特性使用Generics泛型進行流式處理的詳細內容,更多關於Go1.18 Generics泛型流式處理的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- Go語言如何輕松編寫高效可靠的並發程序
- Go語言sort包函數使用示例
- Swift泛型Generics淺析講解
- golang編程開發使用sort排序示例詳解
- golang中按照結構體的某個字段排序實例代碼