python如何獲取Prometheus監控數據

獲取Prometheus監控數據

獲取Prometheus target數據

調用http://<prometheus.address>/api/v1/targets並解析。

def getTargetsStatus(address):
    url = address + '/api/v1/targets'
    response = requests.request('GET', url)
    if response.status_code == 200:
        targets = response.json()['data']['activeTargets']
        aliveNum, totalNum = 0, 0
        downList = []
        for target in targets:
            totalNum += 1
            if target['health'] == 'up':
                aliveNum += 1
            else:
                downList.append(target['labels']['instance'])
        print('-----------------------TargetsStatus--------------------------')
        print(str(aliveNum) + ' in ' + str(totalNum) + ' Targets are alive !!!')
        print('--------------------------------------------------------------')
        for down in downList:
            print('\033[31m\033[1m' + down + '\033[0m' + ' down !!!')
        print('-----------------------TargetsStatus--------------------------')
    else:
        print('\033[31m\033[1m' + 'Get targets status failed!' + '\033[0m')

獲取Prometheus 監控信息(cpu、mem、disks)

調用http://<prometheus.address>/api/v1/query?query=<expr>並解析,其中expr為prometheus的查詢語句。

### 定義cpu、mem、disks使用率的空字典
diskUsageDict = {}
cpuUsageDict = {}
memUsageDict = {}
### 定義采集時間間隔 s
monitorInterval = 5
### 定義超時告警時間 s
diskAlertTime = 5
cpuAlertTime = 300
memAlertTime = 300
### 定義告警閾值 %
diskThreshold = 80
cpuThreshold = 60
memThreshold = 70
def queryUsage(address, expr):
    url = address + '/api/v1/query?query=' + expr
    try:
        return json.loads(requests.get(url=url).content.decode('utf8', 'ignore'))
    except Exception as e:
        print(e)
        return {}
def orderUsageDict(usageDict, currentTime, monitorInterval):
    '''
    :param usageDict: 資源使用率字典
    :param usageDict: 資源使用率字典
    :param currentTime: 當前獲取監控數據的時間節點
    :return:
    :description: 剔除字典中不滿足連續超出閾值的數據
    '''
    for key in list(usageDict.keys()):
        if currentTime - usageDict[key][1] >= monitorInterval:
            usageDict.pop(key)
def getCurrentUsageGreater(address, record, threshold, usageDict, monitorInterval):
    '''
    :param address: Prometheus address
    :param record: Prometheus rules record
    :param threshold: 閾值
    :param usageDict: 資源使用率字典
    :param monitorInterval: 監控時間間隔
    :return:
    :description: 獲取資源使用率大於閾值的數據
    '''
    expr = record + '>=' + str(threshold)
    usage = queryUsage(address=address, expr=expr)
    currentTime = 0
    if 'data' in usage and usage['data']['result']:
        for metric in usage['data']['result']:
            instance = metric['metric']['instance']
            if record == 'node:fs_usage:ratio' or record == 'node:fs_root_usage:ratio':
                metricLabel = instance + ':' + metric['metric']['mountpoint']
            else:
                metricLabel = instance
            utctime = metric['value'][0]
            value = metric['value'][1]
            describe = record.split(':')[1]
            if not metricLabel in usageDict.keys():
                usageDict[metricLabel] = (utctime, utctime, describe, value)
            else:
                startTime = usageDict.get(metricLabel)[0]
                usageDict[metricLabel] = (startTime, utctime, describe, value)
            currentTime = utctime
    orderUsageDict(usageDict=usageDict, currentTime=currentTime, monitorInterval=monitorInterval)
def printUsageDict(usageDict, alertTime):
    '''
    :param usageDict: 資源使用率字典
    :param alertTime: 監控告警時間
    :return:
    :description: 打印出超過監控告警時間的數據
    '''
    for key, value in usageDict.items():
        deltaT = value[1] - value[0]
        if deltaT >= alertTime:
            print(key + ' ----- ' + value[2] + '\033[31m\033[1m ' + str(value[3]) + '\033[0m ----- lasted for\033[31m\033[1m %.2f \033[0mseconds' % deltaT)
def monitorUsageGreater(address):
    '''
    :param address: Prometheus address
    :return:
    :description: 持續監控並輸出數據
    '''
    while True:
        getCurrentUsageGreater(address, 'node:fs_usage:ratio', diskThreshold, diskUsageDict, monitorInterval)
        printUsageDict(diskUsageDict, alertTime=diskAlertTime)
        getCurrentUsageGreater(address, 'node:memory_usage:ratio', cpuThreshold, memUsageDict, monitorInterval)
        printUsageDict(memUsageDict, alertTime=memAlertTime)
        getCurrentUsageGreater(address, 'node:cpu_usage:ratio', memThreshold, cpuUsageDict, monitorInterval)
        printUsageDict(cpuUsageDict, alertTime=cpuAlertTime)
        time.sleep(monitorInterval)

其中有一些使用細節,比如統一資源標識符URI的構建,將HttpEntity用UTF-8編碼方式轉換為字符串再解析為JSON對象,我都寫在註釋裡瞭。

		String paramValue="http_requests_total";
//HTTP客戶端連接工具
        CloseableHttpClient httpClient=HttpClients.createDefault();
        //參數裡有特殊字符,不能直接寫成String(會報Illegal Character錯誤),用URIBuilder構造。
        URIBuilder uri=null;
        HttpGet get =null;
        try {
            //一對參數,使用addParameter(param: ,value:)這個方法添加參數。
            //若多對參數,使用第二種方法(但其實在這裡沒有這種情況):uri.addParameters(List<NameValuePair>);
            //這裡的ip,port換成你的Prometheus的ip+port。paramValue要自己定義,比如http_request_total
            uri=new URIBuilder("http://ip:port/api/v1/query");
            uri.addParameter("query",paramValue);
            //uri此時是http://ip:port/api/v1/query?query=http_requests_total
            get=new HttpGet(uri.build());
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
        JSONObject jsonObject=null;
        CloseableHttpResponse response=null;
        try {
            // 執行請求並接收+轉換  得到jsonObject就可以解析瞭。
            response = httpClient.execute(get);
            String resStr= EntityUtils.toString(response.getEntity(),"UTF-8");
            jsonObject=JSONObject.parseObject(resStr);

通過promsql讀取prometheus內的數據

需求是python讀取prometheus內的數據,做數據處理後入庫到mysql。這裡主要說一下,python如何使用官方api通過promsql查詢prom內的數據。

官方提供的api為:

http://ip:port/api/v1/query?query=

樣例如下:

html = urllib.request.urlopen('http://ip:port/api/v1/query?query=count(node_cpu_seconds_total{job="%s",mode="idle"})' %(s))
data = html.read().decode("utf-8")
json = json.loads(data)

返回值為json類型,如下圖:

在這裡插入圖片描述

具體的json各位自己分析,瞬時值為value,值內數據,第一位是時間戳,第二位為查詢的結果值

區間向量返回值為values,也比較好理解。

還有個需求需要查詢之前的數據,比如前一天,月初一周之類的,可以使用如下api:

http://ip:port/api/v1/query_range?query=avg(1-avg(rate(node_cpu_seconds_total{job="%s",mode="idle"}[5m]))by(instance))&start='+start+'&end='+end+'&step=15s

其中start為采集開始時間,end為采集結束時間,step為步長,即多久設個采集點。

start和end的格式如下:

2021-11-01T00:00:00Z

獲取方式可以采取以下方式:

獲取每月的第一周數據,所以從每月一號零點開始到八號的零點

now = datetime.datetime.now()
start = datetime.datetime(now.year, now.month, 1)
end = datetime.datetime(now.year, now.month, 8)
# 格式轉換:yyyy-mm-ddThh:MM:ssZ
start_trans = "T".join(str(start).split(" "))+"Z"
end_trans = "T".join(str(end).split(" "))+"Z"

獲取前一周的時間

now_time = datetime.datetime.now()
one_week_ago_time = now_time + datetime.timedelta(days=-7)
# 精確到毫秒
now = now_time.strftime("%Y-%m-%dT%H:%M:%S.%f")
one_week_ago = one_week_ago_time.strftime("%Y-%m-%dT%H:%M:%S.%f")
n = now[0:len(now)-7]+"Z"
one_week = one_week_ago[0:len(one_week_ago)-7]+"Z"

如果獲取時間周期太長,返回數據太多會導致報錯,這時候可調整step大小,或者將時間段分成幾天獲取。

主要還是瞭解兩個api,其他的都是小問題

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: