python連接clickhouse數據庫的兩種方式小結

python連接clickhouse數據庫

在Python中獲取系統信息的一個好辦法是使用psutil這個第三方模塊。

顧名思義,psutil = process and system utilities,它不僅可以通過一兩行代碼實現系統監控,還可以跨平臺使用。

主要針對clickhouse_driver的使用進行簡要介紹

第一步:

  • 通過pip install clickhouse_driver 安裝 clickhouse_driver

第二步:

  • 方法一:使用clickhouse_driver 包中的Client類,通過實例化一個客戶端進行對數據庫的增刪改查操作
from clickhouse_driver import Client
from datetime import datetime
import psutil
host_name = '192.168.50.94'
client = Client(host=host_name,database='default',user='default',password='自己設的密碼',send_receive_timeout=20,port=55666)
now = datetime.now()
time_stamp = now.strftime('%a %b %d %H:%M:%S CST %Y')# Tue Apr 06 15:32:55 CST 2021  <class 'str'>
create_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
disk_io = psutil.disk_io_counters()
net_io = psutil.net_io_counters()
chart_name = ["磁盤IO","網絡IO"]
metric_name1 = ["讀(數量)","寫(數量)", "讀(字節)", "寫(字節)", "讀(時間)", "寫(時間)"]
metric_name2 = ["發送字節數","接收字節數","發送包數","接收包"]
metric_value1 = [disk_io.read_count,disk_io.write_count,disk_io.read_bytes,disk_io.write_bytes,disk_io.read_time,disk_io.write_time]
metric_value2 = [net_io.bytes_sent,net_io.bytes_recv,net_io.packets_sent,net_io.packets_recv]
try:
    for i in chart_name:
        if i is "磁盤IO":
            for j in metric_name1:
                sql = "insert into clickhouse_host_metrics777(time_stamp,host_name, chart_name, metric_name,metric_value,create_at) " \
                      "values('%s','%s','%s','%s','%s','%s')" % \
                         (time_stamp, host_name, i, j, metric_value1[metric_name1.index(j)], create_at)
                res = client.execute(sql)
        elif i is "網絡IO":
            for j in metric_name2:
                sql = "insert into clickhouse_host_metrics777(time_stamp,host_name, chart_name, metric_name,metric_value,create_at) " \
                      "values('%s','%s','%s','%s','%s','%s')" % \
                      (time_stamp, host_name, i, j, metric_value2[metric_name2.index(j)], create_at)
                res = client.execute(sql)
    print("成功寫入數據")
except Exception as e:
    print(str(e))
  • 方法二:使用clickhouse_driver 包中的connect函數,通過實例化一個客戶端進行對數據庫的增刪改查操作
from datetime import datetime
import psutil
from clickhouse_driver import connect
host_name = '192.168.50.94'
#賬號:密碼@主機名:端口號/數據庫
conn = connect('clickhouse://default:自己設的密碼@'+host_name+':55666/default')
cursor = conn.cursor()
now = datetime.now()
time_stamp = now.strftime('%a %b %d %H:%M:%S CST %Y')# Tue Apr 06 15:32:55 CST 2021  <class 'str'>
create_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
disk_io = psutil.disk_io_counters()
net_io = psutil.net_io_counters()
chart_name = ["磁盤IO","網絡IO"]
metric_name1 = ["讀(數量)","寫(數量)", "讀(字節)", "寫(字節)", "讀(時間)", "寫(時間)"]
metric_name2 = ["發送字節數","接收字節數","發送包數","接收包"]
metric_value1 = [disk_io.read_count,disk_io.write_count,disk_io.read_bytes,disk_io.write_bytes,disk_io.read_time,disk_io.write_time]
metric_value2 = [net_io.bytes_sent,net_io.bytes_recv,net_io.packets_sent,net_io.packets_recv]
try:
    for i in chart_name:
        if i is "磁盤IO":
            for j in metric_name1:
                sql = "insert into clickhouse_host_metrics777(time_stamp,host_name, chart_name, metric_name,metric_value,create_at) values('%s','%s','%s','%s','%s','%s')" % \
                         (time_stamp, host_name, i, j, metric_value1[metric_name1.index(j)], create_at)
                # res = client.execute(sql)
                res = cursor.execute(sql)
        elif i is "網絡IO":
            for j in metric_name2:
                sql = "insert into clickhouse_host_metrics777(time_stamp,host_name, chart_name, metric_name,metric_value,create_at) values('%s','%s','%s','%s','%s','%s')" % \
                      (time_stamp, host_name, i, j, metric_value2[metric_name2.index(j)], create_at)
                res = cursor.execute(sql)
    cursor.close()
    print("成功寫入數據")
except Exception as e:
    print(str(e))

python將數據寫入clickhouse

from clickhouse_driver import Client 
 
# connect ClickHouse
client = Client(host= ,port= ,user= ,database= , password=)
 
 
# 得到table1中查詢的數據導入table2中(database2中應該事先建立對應的table2表)
query_ck_sql = """ SELECT *
    FROM database1.table1
    WHERE date = today() """
# 導入數據到臨時表
try:
    # 導入數據
    client.execute("insert into {official_table_db}.{official_all_table_name}  \
        {query_ck_sql}".format(
            official_table_db = database2, 
            official_table_name = table2, 
            query_ck_sql = query_ck_sql) 
        ,types_check = True)
except Exception as e:
    print str(e)

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: