Go語言kylin任務自動化實例詳解

前言

kylin是一個開源的OLAP分析引擎,具有亞秒級查詢大表的能力

通過kylin提供的cube預構建功能,省去瞭不斷寫sql查詢hive的麻煩,強化瞭任務統一管理和結果快速呈現的效果

kylin官網: https://kylin.apache.org/cn/

任務

當kylin集群比較大,和有多個kylin集群時,說明cube也越來越多,幾百上千個cube便是常用便飯瞭

這些任務的運行就成瞭難題,人工去界面上點點點完全不實現瞭。此時就需要做成自動化周期性的任務

因為官方沒有提供Go的客戶端,隻提供瞭http的api請求。下列例子使用Go中的http包來實現自動化任務

自動化實現

初始化

使用第三方http包(HttpRequest)來做http相關的請求,該包支持GET,POST,DELETE,PUT等四種請求方法,正好完全滿足請求kylin的要求

var (
   url = "http://ip:7070/kylin/"
   username = "ADMIN"
   password = "Password"
   req *HttpRequest.Request
)
func init() {
   req = HttpRequest.NewRequest().Debug(false).SetTimeout(time.Second*5).
      SetHeaders(map[string]string{
         "Content-Type": "application/json;charset=utf-8",
      }).SetBasicAuth(username, password)
}

cube提交build

該方法接收三個參數,需要構建的cube名稱,以及開始時間戳和結束時間戳

調用示例:

cubeBuild("dwd_jd_order","1637193600000","1637280000000")

時間戳獲取方法,在第6小節

func cubeBuild(cube,startTime,endTime string) {
   m := map[string]string{
      "startTime": startTime,
      "endTime":   endTime,
      "buildType": "BUILD",
   }
   resp, err := req.JSON().Put(url+"api/cubes/"+cube+"/build", m)
   if err != nil {
      fmt.Println("cube構建請求錯誤: ", err)
   }
   if resp.StatusCode() != 200 {
      fmt.Println("cube構建狀態碼不符期望: ",resp.StatusCode())
   }
}

cube運行結果檢查

檢查cube運行結果,是成功還是失敗瞭,還提供一個重新構建開關,如果cube失敗,調用重構

kylin job檢查接口屬性說明

jobSearchMode 搜索模式(檢查點和cubeing兩種) ALL所有模式的數據

limit 限制返回條數

offset 位置(0是從第一條開始)

status 狀態類型(8是錯誤類型,0是new,1是pending,2是running,32是stopped,4是finished,16是discarded)

timeFilter 時間范圍過濾(1是一天,2是一周,3是一月,4是一年,5是全部)

調用示例: jobCheck(false)

為什麼要在檢查裡面調重構方法,是因為重構cube需要拿到uuid,但uuid隻能在這個接口中獲取到,且uuid不是固定的,需要運cube運行後才可得到

func jobCheck(resumeSwitch bool) {
   resp, err := req.Get(url+"api/jobs?jobSearchMode=ALL&limit=15&offset=0&status=8&timeFilter=1")
   if err != nil {
      fmt.Println("job檢查請求錯誤: ", err)
   }
   if resp.StatusCode() != 200 {
      fmt.Println("job檢查狀態碼不符期望: ",resp.StatusCode())
   }
   body, _ := resp.Body()
   var i interface{}
   json.Unmarshal(body,&i)
   uuid, err := jmespath.Search("[0].uuid", i)
   if err != nil {
      fmt.Println("search err: ",err)
   }
   fmt.Println(uuid)
   if resumeSwitch {
      cubeResume("uuid")
   }
}

重構cube

重構cube在job失敗後,自動構建非常有用,避免人工頻繁介入到這些工作中,是自動化中關鍵一步

調用示例: cubeResume("uuid")

func cubeResume(uuid string)  {
   resp, err := req.Put(url+"api/jobs/"+uuid+"/resume")
   if err != nil {
      fmt.Println("cube重新build請求錯誤: ", err)
   }
   if resp.StatusCode() != 200 {
      fmt.Println("cube重新build狀態碼不符期望: ",resp.StatusCode())
   }
}

歷史job清理

kylin在運行一段時間後,就會產生很多冗餘,且時需要周期性的清理這些歷史job

調用示例: jobHistoryDelete("uuid")

需要先檢查job,獲取uuid,然後再刪除歷史job

func jobHistoryDelete(uuid string) {
   resp, err := req.Delete(url+"api/jobs/"+uuid+"/drop")
   if err != nil {
      fmt.Println("歷史job清理請求錯誤: ", err)
   }
   if resp.StatusCode() != 200 {
      fmt.Println("歷史job清理狀態碼不符期望: ",resp.StatusCode())
   }
}

時間戳

kylin要求的時間毫秒,這裡使用納秒時間戳方法除一下就得到瞭毫秒

func timestamp()  {
   year := time.Now().Year()
   month := time.Now().Month()
   day := time.Now().Day()
   //今天的時間戳
   today := time.Date(year, month, day, 8, 0, 0, 0, time.Local).UnixNano() / 1e6
   fmt.Println(today)
   //昨天的時間戳
   iDay := time.Now().AddDate(0, 0, -1).Day()
   yesterday := time.Date(year, month, iDay, 8, 0, 0, 0, time.Local).UnixNano() / 1e6
   fmt.Println(yesterday)
}

小結

以上方法配合定時任務,就可以實現kylin自動化運維工作瞭,當然kylin官網還提供瞭更多接口,有需求的同學可以看看

傳送門: https://kylin.apache.org/cn/docs31/howto/howto_use_restapi.html

更多關於Go語言kylin任務自動化的資料請關註WalkonNet其它相關文章!

推薦閱讀: