golang高並發限流操作 ping / telnet
需求
當需要同時ping/telnet多個ip時,可以通過引入ping包/telnet包實現,也可以通過go調用cmd命令實現,不過後者調用效率較差,所以這裡選擇ping包和telnet包
還有就是高並發的問題,可以通過shell腳本或者go實現高並發,所以我選擇的用go自帶的協程實現,但是如果要同時處理1000+個ip,考慮到機器的性能,需要ratelimit控制開辟的go協程數量,這裡主要寫一下我的建議和淌過的坑
ping
參考鏈接: https://github.com/sparrc/go-ping
import "github.com/sparrc/go-ping" import "time" func (p *Ping) doPing(timeout time.Duration, count int, ip string) (err error) { pinger, cmdErr := ping.NewPinger(ip) if cmdErr != nil { glog.Error("Failed to ping " + p.ipAddr) err = cmdErr return } pinger.Count = count pinger.Interval = time.Second pinger.Timeout = timeout // true的話,代表是標準的icmp包,false代表可以丟包類似udp pinger.SetPrivileged(false) // 執行 pinger.Run() // 獲取ping後的返回信息 stats := pinger.Statistics() //延遲 latency = float64(stats.AvgRtt) // 標準的往返總時間 jitter = float64(stats.StdDevRtt) //丟包率 packetLoss = stats.PacketLoss return }
註意: pinger.Run() 這裡執行的時候是阻塞的,如果並發量大的時候,程序會卡死在這裡,所以當有高並發的需求時建議如下處理:
go pinger.Run()
time.Sleep(timeout)
telnet
package main import ( "github.com/reiver/go-telnet" ) func doTelnet(ip string, port int) { var caller telnet.Caller = telnet.StandardCaller address := ip + ":"+ strconv.Itoa(port) // DialToAndCall 檢查連通性並且調用 telnet.DialToAndCall(address, caller) } }
bug出現報錯:
lookup tcp/: nodename nor servname provided, or not known
解決:
修改string(port)為strconv.Itoa(port)
DialToAndCall這種方式telnet無法設置超時時間,默認的超時時間有1分鐘,所以使用DialTimeout這個方式實現telnet
import "net" func doTelnet(ip string, ports []string) map[string]string { // 檢查 emqx 1883, 8083, 8080, 18083 端口 results := make(map[string]string) for _, port := range ports { address := net.JoinHostPort(ip, port) // 3 秒超時 conn, err := net.DialTimeout("tcp", address, 3*time.Second) if err != nil { results[port] = "failed" } else { if conn != nil { results[port] = "success" _ = conn.Close() } else { results[port] = "failed" } } } return results }
shell高並發
本質就是讀取ip.txt文件裡的ip,然後調用ping方法,實現高並發也是借助&遍歷所有的ip然後同一交給操作系統去處理高並發
while read ip do { doPing(ip) } & done < ip.txt
go高並發限速
import ( "context" "fmt" "log" "time" "sync" "golang.org/x/time/rate" ) func Limit(ips []string)([]string, []string, error) { //第一個參數是每秒鐘最大的並發數,第二個參數是桶的容量,第一次的時候每秒可執行的數量就是桶的容量,建議這兩個值都寫成一樣的 r := rate.NewLimiter(10, 10) ctx := context.Background() wg := sync.WaitGroup{} wg.Add(len(ips)) lock := sync.Mutex{} var success []string var fail []string defer wg.Done() for _,ip:=range ips{ //每次消耗2個,放入一個,消耗完瞭還會放進去,如果初始是5個,所以這段代碼再執行到第4次的時候筒裡面就空瞭,如果當前不夠取兩個瞭,本次就不取,再放一個進去,然後返回false err := r.WaitN(ctx, 2) if err != nil { log.Fatal(err) } go func(ip string) { defer func() { wg.Done() }() err := doPing(time.Second, 2, ip) lock.Lock() defer lock.Unlock() if err != nil { fail = append(fail, ip) return } else { success = append(success, ip) } }(ip) } // wait等待所有go協程結束 wg.wait() return success,fail,nil } func main() { ips := [2]string{"192.168.1.1","192.168.1.2"} success,fail,err := Limit(ips) if err != nil { fmt.Printf("ping error") } }
這裡註意一個並發實現的坑,在for循環裡使用goroutine時要把遍歷的參數傳進去才能保證每個遍歷的參數都被執行,否則隻能執行一次
(拓展)管道、死鎖
先看個例子:
func main() { go print() // 啟動一個goroutine print() } func print() { fmt.Println("*******************") }
輸出結果:
*******************
沒錯,隻有一行,因為當go開辟一個協程想去執行print方法時,主函數已經執行完print並打印出來,所以goroutine還沒有機會執行程序就已經結束瞭,解決這個問題可是在主函數裡加time.sleep讓主函數等待goroutine執行完,也可以使用WaitGroup.wait等待goroutine執行完,還有一種就是信道
信道分無緩沖信道和緩沖信道
無緩沖信道
無緩沖信道也就是定義長度為0的信道,存入一個數據,從無緩沖信道取數據,若信道中無數據,就會阻塞,還可能引發死鎖,同樣數據進入無緩沖信道, 如果沒有其他goroutine來拿走這個數據,也會阻塞,記住無緩沖數據並不存儲數據
func main() { var channel chan string = make(chan string) go func(message string) { channel<- message // 存消息 }("Ping!") fmt.Println(<-messages) // 取消息 }
緩存信道
顧名思義,緩存信道可以存儲數據,goroutine之間不會發生阻塞,for循環讀取信道中的數據時,一定要判斷當管道中不存在數據時的情況,否則會發生死鎖,看個例子
channel := make(chan int, 3) channel <- 1 channel <- 2 channel <- 3 // 顯式關閉信道 close(channel) for v := range channel { fmt.Println(v) // 如果現有數據量為0,跳出循環,與顯式關閉隧道效果一樣,選一個即可 if len(ch) <= 0 { break } }
但是這裡有個問題,信道中數據是可以隨時存入的,所以我們遍歷的時候無法確定目前的個數就是信道的總個數,所以推薦使用select監聽信道
// 創建一個計時信道 timeout := time.After(1 * time.Second) // 監聽3個信道的數據 select { case v1 := <- c1: fmt.Printf("received %d from c1", v1) case v2 := <- c2: fmt.Printf("received %d from c2", v2) case v3 := <- c3: fmt.Printf("received %d from c3", v3) case <- timeout: is_timeout = true // 超時 break } }
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。如有錯誤或未考慮完全的地方,望不吝賜教。
推薦閱讀:
- Go語言如何輕松編寫高效可靠的並發程序
- Golang 內存模型The Go Memory Model
- golang 定時任務方面time.Sleep和time.Tick的優劣對比分析
- Go語言開發必知的一個內存模型細節
- Golang開發中如何解決共享變量問題