Python如何實現Paramiko的二次封裝
Paramiko是一個用於執行SSH命令的Python第三方庫,使用該庫可實現自動化運維的所有任務,如下是一些常用代碼的封裝方式,多數代碼為半成品,隻是敲代碼時的備份副本防止丟失,僅供參考。
目前本人巡檢百臺設備完全無壓力,如果要巡檢過千臺則需要多線程的支持,過萬臺則需要加入智能判斷等。
實現命令執行: 直接使用過程化封裝,執行CMD命令.
import paramiko ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) def BatchCMD(address,username,password,port,command): try: ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2) stdin , stdout , stderr = ssh.exec_command(command) result = stdout.read() if len(result) != 0: result = str(result).replace("\\n", "\n") result = result.replace("b'", "").replace("'", "") return result else: return None except Exception: return None
實現磁盤巡檢: 獲取磁盤空間並返回字典格式
def GetAllDiskSpace(address,username,password,port): ref_dict = {} cmd_dict = {"Linux\n" : "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'", "AIX\n" : "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'" } # 首先檢測系統版本 os_version = BatchCMD(address,username,password,port,"uname") for version,run_cmd in cmd_dict.items(): if(version == os_version): # 根據不同版本選擇不同的命令 os_ref = BatchCMD(address,username,password,port,run_cmd) ref_list= os_ref.split("\n") # 循環將其轉換為字典 for each in ref_list: # 判斷最後是否為空,過濾最後一項 if each != "": ref_dict[str(each.split(":")[1])] = str(each.split(":")[0]) return ref_dict # 磁盤巡檢總函數 def DiskMain(): with open("db.json", "r", encoding="utf-8") as read_fp: load_json = read_fp.read() js = json.loads(load_json) base = js.get("base") count = len(base) for each in range(0,count): print("\033[37m-\033[0m" * 80) print("\033[35m 檢測地址: {0:10} \t 用戶名: {1:10} \t 密碼: {2:10} \t 端口: {3:4}\033[0m". format(base[each][1],base[each][2],base[each][3],base[each][4])) print("\033[37m-\033[0m" * 80) ref = GetAllDiskSpace(base[each][1],base[each][2],base[each][3],base[each][4]) for k,v in ref.items(): # 判斷是否存在空盤 if( v.split("%")[0] != "-"): # 將占用百分比轉換為整數 space_ret = int(v.split("%")[0]) if space_ret >= 70: print("\033[31m 磁盤分區: {0:30} \t 磁盤占用: {1:5} \033[0m".format(k,v)) continue if space_ret >= 50: print("\033[33m 磁盤分區: {0:30} \t 磁盤占用: {1:5} \033[0m".format(k, v)) continue else: print("\033[34m 磁盤分區: {0:30} \t 磁盤占用: {1:5} \033[0m".format(k, v)) continue print() # 組內傳遞用戶名密碼時調用此方法 def GroupDiskMain(address,username,password,port): ref = GetAllDiskSpace(address,username,password,port) for k, v in ref.items(): if (v.split("%")[0] != "-"): space_ret = int(v.split("%")[0]) if space_ret >= 70: print("磁盤分區: {0:30} \t 磁盤占用: {1:5} -> [警告]".format(k, v)) continue if space_ret >= 50: print("磁盤分區: {0:30} \t 磁盤占用: {1:5} -> [警惕]".format(k, v)) continue else: print("磁盤分區: {0:30} \t 磁盤占用: {1:5} -> [正常]".format(k, v)) continue print()
獲取系統內存利用率: 獲取系統內存利用率
def GetAllMemSpace(address,username,password,port): cmd_dict = {"Linux\n" : "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'", "AIX\n" : "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'" } # 首先檢測系統版本 os_version = BatchCMD(address,username,password,port,"uname") for version,run_cmd in cmd_dict.items(): if(version == os_version): # 根據不同版本選擇不同的命令 os_ref = BatchCMD(address,username,password,port,run_cmd) # 首先現將KB轉化為MB mem_total = math.ceil( int(os_ref.split(":")[0].replace("\n","")) / 1024) mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n","")) / 1024) mem_used = str( int(mem_total) - int(mem_free)) # 計算占用空間百分比 percentage = 100 - int(mem_free / int(mem_total / 100)) print("內存總計空間: {}".format(str(mem_total) + " MB")) print("內存剩餘空間: {}".format(str(mem_free) + " MB")) print("內存已用空間: {}".format(str(mem_used) + " MB")) print("計算百分比: {}".format(str(percentage) + " %"))
獲取系統進程信息: 獲取系統進程信息,並返回字典格式
def GetAllProcessSpace(address,username,password,port): ref_dict = {} cmd_dict = {"Linux\n" : "ps aux | grep -v 'USER' | awk '{print $2 \":\" $11}' | uniq", "AIX\n" : "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq" } os_version = BatchCMD(address,username,password,port,"uname") for version,run_cmd in cmd_dict.items(): if(version == os_version): os_ref = BatchCMD(address, username, password, port, run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": ref_dict[str(each.split(":")[0])] = str(each.split(":")[1]) return ref_dict # 巡檢進程是否存在 def ProcessMain(): with open("db.json", "r", encoding="utf-8") as read_fp: load_json = read_fp.read() js = json.loads(load_json) process = js.get("process") process_count = len(process) for x in range(0,process_count): # 根據process中的值查詢base中的賬號密碼 base = js.get("base") if( list(process[x].keys())[0] == base[x][0] ): # 拿到賬號密碼之後再提取出他們的進程ID於進程名 print("\033[37m-\033[0m" * 80) print("\033[35m 檢測地址: {0:10} \t 用戶名: {1:10} \t 密碼: {2:10} \t 端口: {3:4}\033[0m". format(base[x][1], base[x][2], base[x][3], base[x][4])) print("\033[37m-\033[0m" * 80) ref_dic = GetAllProcessSpace(base[x][1],base[x][2],base[x][3],base[x][4]) # ref_val = 全部進程列表 proc_val = 需要檢測的進程列表 ref_val = list(ref_dic.values()) proc_val = list(process[x].values())[0] # 循環比較是否在列表中 for each in proc_val: flag = each in ref_val if(flag == True): print("\033[34m 進程: {0:50} 狀態: √ \033[0m".format(each)) else: print("\033[31m 進程: {0:50} 狀態: × \033[0m".format(each))
實現劇本運行功能: 針對特定一臺主機運行劇本功能,隨便寫的一個版本,僅供參考
def RunRule(address,username,password,port,playbook): os_version = BatchCMD(address,username,password,port,"uname") if(os_version == list(playbook.keys())[0]): play = list(playbook.values())[0] print() print("\033[37m-\033[0m" * 130) print("\033[35m 系統類型: {0:4} \t 地址: {1:10} \t 用戶名: {2:10} \t 密碼: {3:15} \t 端口: {4:4}\033[0m" .format(os_version.replace("\n",""),address,username,password,port)) print("\033[37m-\033[0m" * 130) for each in range(0,len(play)): RunCmd = play[each] + " > /dev/null 2>&1 && echo $?" print("\033[30m [>] 派發命令: {0:100} \t 狀態: {1:5} \033[0m".format( RunCmd.replace(" > /dev/null 2>&1 && echo $?", ""),"正在派發")) os_ref = BatchCMD(address, username, password, port, RunCmd) if(os_ref == "0\n"): print("\033[34m [√] 運行命令: {0:100} \t 狀態: {1:5} \033[0m".format( RunCmd.replace(" > /dev/null 2>&1 && echo $?",""),"派發完成")) else: print("\033[31m [×] 運行命令: {0:100} \t 狀態: {1:5} \033[0m".format( RunCmd.replace(" > /dev/null 2>&1 && echo $?",""),"派發失敗")) # 既然失敗瞭,就把剩下的也打出來吧,按照失敗處理 for x in range(each+1,len(play)): print("\033[31m [×] 運行命令: {0:100} \t 狀態: {1:5} \033[0m".format( play[x].replace(" > /dev/null 2>&1 && echo $?", ""), "終止執行")) break else: return 0 # 批量: 傳入主機組不同主機執行不同劇本 def RunPlayBook(HostList,PlayBook): count = len(HostList) error = [] success = [] for each in range(0,count): ref = RunRule(HostList[each][0],HostList[each][1],HostList[each][2],HostList[each][3],PlayBook) if ref == 0: error.append(HostList[each][0]) else: success.append(HostList[each][0]) print("\n\n") print("-" * 130) print("執行清單") print("-" * 130) for each in success: print("成功主機: {}".format(each)) for each in error: print("失敗主機: {}".format(each)) # 運行測試 def PlayBookRun(): playbook = \ { "Linux\n": [ "ifconfig", "vmstat", "ls", "netstat -an", "ifconfis", "cat /etc/passwd | grep 'root' | awk '{print $2}'" ] } addr_list = \ [ ["192.168.1.127", "root", "1233", "22"], ["192.168.1.126", "root", "1203", "22"] ] # 指定addr_list這幾臺機器執行playbook劇本 RunPlayBook(addr_list,playbook)
過程化實現文件上傳下載: 文件傳輸功能 PUT上傳 GET下載
def BatchSFTP(address,username,password,port,soruce,target,flag): transport = paramiko.Transport((address, int(port))) transport.connect(username=username, password=password) sftp = paramiko.SFTPClient.from_transport(transport) if flag == "PUT": try: ret = sftp.put(soruce, target) if ret !="": transport.close() return 1 else: transport.close() return 0 transport.close() except Exception: transport.close() return 0 elif flag == "GET": try: target = str(address + "_" + target) os.chdir("./recv_file") ret = sftp.get(soruce, target) if ret != "": transport.close() return 1 else: transport.close() return 0 transport.close() except Exception: transport.close() return 0 # 批量將本地文件 source 上傳到目標 target 中 def PutRemoteFile(source,target): with open("db.json", "r", encoding="utf-8") as fp: load_json = fp.read() js = json.loads(load_json) base = js.get("base") print("-" * 130) print("接收主機 \t\t 登錄用戶 \t 登錄密碼 \t 登錄端口 \t 本地文件 \t\t 傳輸到 \t\t\t 傳輸狀態") print("-" * 130) for each in range(0,len(base)): # 先判斷主機是否可通信 ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4],"uname") if ref == None: print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 未連通\033[0m".format( base[each][1],base[each][2],base[each][3],base[each][4],source,target)) continue ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"PUT") if(ref == 1): print("\033[34m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 傳輸成功\033[0m".format( base[each][1],base[each][2],base[each][3],base[each][4],source,target)) else: print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 傳輸失敗\033[0m".format( base[each][1], base[each][2], base[each][3], base[each][4], source, target)) # 批量將目標文件拉取到本地特定目錄(存在缺陷) def GetRemoteFile(source,target): with open("db.json", "r", encoding="utf-8") as fp: load_json = fp.read() js = json.loads(load_json) base = js.get("base") print("-" * 130) print("發送主機 \t\t 登錄用戶 \t 登錄密碼 \t 登錄端口 \t\t 遠程文件 \t\t 拉取到 \t\t\t 傳輸狀態") print("-" * 130) for each in range(0,len(base)): ref = BatchCMD(base[each][1], base[each][2], base[each][3], base[each][4], "uname") if ref == None: print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 未連通\033[0m".format( base[each][1], base[each][2], base[each][3], base[each][4], source, target)) continue ref = BatchSFTP(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"GET") if(ref == 1): print("\033[34m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 傳輸成功\033[0m".format( base[each][1],base[each][2],base[each][3],base[each][4],source,target)) else: print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 傳輸失敗\033[0m".format( base[each][1], base[each][2], base[each][3], base[each][4], source, target))
另一種命令執行方法:
import paramiko ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) def BatchCMD(address,username,password,port,command): try: ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2) stdin , stdout , stderr = ssh.exec_command(command) result = stdout.read() if len(result) != 0: return result else: return -1 except Exception: return -1 # 通過獲取主機Ping狀態 def GetPing(): fp = open("unix_base.db", "r", encoding="utf-8") count = len(open("unix_base.db", "r", encoding="utf-8").readlines()) print("-" * 100) print("{0:20} \t {1:10} \t {2:13} \t {3:5} \t {4:9} \t {5:40}".format("IP地址","機器系統","設備SN","機房位置","存活狀態","主機作用")) print("-" * 100) for each in range(count): ref = eval(fp.readline()) ret = BatchCMD(ref[0],ref[5],ref[6],22,"pwd | echo $?") if(int(ret)==0): print("{0:20} \t {1:10} \t {2:11} \t {3:5} \t {4:9} \t {5:40}". format(ref[0],ref[1],ref[2],ref[3],"正常",ref[4])) else: print("{0:20} \t {1:10} \t {2:13} \t {3:5} \t {4:9} \t {5:40}". format(ref[0],ref[1],ref[2],ref[3],"異常",ref[4])) fp.close() # ps aux | grep "usbCfgDev" | grep -v "grep" | awk {'print $2'} def GetProcessStatus(): fp = open("unix_process.db", "r", encoding="utf-8") count = len(open("unix_process.db", "r", encoding="utf-8").readlines()) for each in range(count): proc = eval(fp.readline()) proc_len = len(proc) print("-" * 70) print("---> 巡檢地址: {0:10} \t 登錄用戶: {1:7} \t 登錄密碼: {2:10}".format(proc[0],proc[1],proc[2])) print("-" * 70) for process in range(3, proc_len): command = "ps aux | grep \'{}\' | grep -v \'grep\' | awk '{}' | head -1".format(proc[process],"{print $2}") try: ref = BatchCMD(proc[0],proc[1],proc[2],22,command) if(int(ref)!=-1): print("進程: {0:18} \t PID: {1:10} \t 狀態: {2}".format(proc[process], int(ref),"√")) else: print("進程: {0:18} \t PID:{1:10} \t 狀態: {2}".format(proc[process], 0,"×")) except Exception: print("進程: {0:18} \t PID:{1:10} \t 狀態: {2}".format(proc[process], 0,"×")) print() fp.close() def GetDiskStatus(): fp = open("unix_disk.db", "r", encoding="utf-8") count = len(open("unix_disk.db", "r", encoding="utf-8").readlines()) for each in range(count): proc = eval(fp.readline()) proc_len = len(proc) print("-" * 100) print("---> 巡檢地址: {0:10} \t 登錄系統: {1:7} \t 登錄賬號: {2:10} 登錄密碼: {3:10}". format(proc[0],proc[1],proc[2],proc[3])) print("-" * 100) try: ref = BatchCMD(proc[0], proc[2], proc[3], 22, "df | grep -v 'Filesystem'") st = str(ref).replace("\\n", "\n") print(st.replace("b'", "").replace("'", "")) except Exception: pass print() fp.close() # 運行命令 def RunCmd(command,system): fp = open("unix_disk.db", "r", encoding="utf-8") count = len(open("unix_disk.db", "r", encoding="utf-8").readlines()) for each in range(count): proc = eval(fp.readline()) proc_len = len(proc) if proc[1] == system: print("-" * 100) print("---> 巡檢地址: {0:10} \t 登錄系統: {1:7} \t 登錄賬號: {2:10} 登錄密碼: {3:10}". format(proc[0],proc[1],proc[2],proc[3])) print("-" * 100) try: ref = BatchCMD(proc[0], proc[2], proc[3], 22, command) st = str(ref).replace("\\n", "\n") print(st.replace("b'", "").replace("'", "")) except Exception: pass fp.close()
面向對象的封裝方法: 使用面向對象封裝,可極大的提高復用性。
import paramiko class MySSH: def __init__(self,address,username,password,default_port = 22): self.address = address self.default_port = default_port self.username = username self.password = password self.obj = paramiko.SSHClient() self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.obj.connect(self.address,self.default_port,self.username,self.password) self.objsftp = self.obj.open_sftp() def BatchCMD(self,command): stdin , stdout , stderr = self.obj.exec_command(command) result = stdout.read() if len(result) != 0: result = str(result).replace("\\n", "\n") result = result.replace("b'", "").replace("'", "") return result else: return None def GetRemoteFile(self,remotepath,localpath): self.objsftp.get(remotepath,localpath) def PutLocalFile(self,localpath,remotepath): self.objsftp.put(localpath,remotepath) def GetFileSize(self,file_path): ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'") return ref def CloseSSH(self): self.objsftp.close() self.obj.close() if __name__ == '__main__': ssh = MySSH('192.168.191.3','root','1233',22) ref = ssh.BatchCMD("ifconfig") print(ref) sz = ssh.GetFileSize("/etc/passwd") print(sz) ssh.CloseSSH() 第二次封裝完善。 import paramiko,os,json,re class MySSH: def __init__(self,address,username,password,default_port = 22): self.address = address self.default_port = default_port self.username = username self.password = password try: self.obj = paramiko.SSHClient() self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.obj.connect(self.address,self.default_port,self.username,self.password,timeout=3,allow_agent=False,look_for_keys=False) self.objsftp = self.obj.open_sftp() except Exception: pass def BatchCMD(self,command): try: stdin , stdout , stderr = self.obj.exec_command(command,timeout=3) result = stdout.read() if len(result) != 0: result = str(result).replace("\\n", "\n") result = result.replace("b'", "").replace("'", "") return result else: return None except Exception: return None def GetRemoteFile(self,remote_path,local_path): try: self.objsftp.get(remote_path,local_path) return True except Exception: return False def PutLocalFile(self,localpath,remotepath): try: self.objsftp.put(localpath,remotepath) return True except Exception: return False def CloseSSH(self): self.objsftp.close() self.obj.close() # 獲取文件大小 def GetFileSize(self,file_path): ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'") return ref.replace("\n","") # 判斷文件是否存在 def IsFile(self,file_path): return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))
通過Eval函數解析執行: 自定義語法規則與函數,通過Eval函數實現解析執行. 沒寫完,僅供參考。
import json,os,sys,math import argparse,time,re import paramiko ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) def BatchCMD(address,username,password,port,command): try: ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2) stdin , stdout , stderr = ssh.exec_command(command) result = stdout.read() if len(result) != 0: result = str(result).replace("\\n", "\n") result = result.replace("b'", "").replace("'", "") return result else: return None except Exception: return None # ------------------------------------------------------------------------ # 內置解析方法 def GetDisk(x): return str(x) def GetCPULoad(): return str(10) # 句式解析器,解析句子並執行 def judge(string): # 如果匹配到IF則執行判斷條件解析 if re.findall(r'IF{ (.*?) }', string, re.M) != []: # 則繼續提取出他的表達式 ptr = re.compile(r'IF[{] (.*?) [}]',re.S) subject = re.findall(ptr,string)[0] subject_list = subject.split(" ") # 拼接語句並執行 sentence = eval(subject_list[0]) + subject_list[1] + subject_list[2] # 組合後執行,返回結果 if eval(sentence): return "IF" else: return False # 如果匹配到put則執行上傳解析 elif re.findall(r'PUT{ (.*?) }', string, re.M) != []: print("put") return False # 獲取特定目錄下所有的劇本 def GetAllRule(): rootdir = os.getcwd() + "\\rule\\" all_files = [f for f in os.listdir(rootdir)] print("-" * 90) print("{0:15} \t {1:10} \t {2:10} \t {3:5} \t {4:5}".format("劇本名稱","應用平臺","應用端口","執行主機數","命令條數")) print("-" * 90) for switch in all_files: # 首先判斷文件結尾是否為Json if( switch.endswith(".json") == True): all_switch_dir = rootdir + switch try: # 判斷文件內部是否符合JSON規范 with open(all_switch_dir , "r" ,encoding="utf-8") as read_file: # 判斷是否存在指定字段來識別規范 load = json.loads(read_file.read()) if load.get("framework") != None and load.get("task_sequence") != None: run_addr_count = len(load.get("address_list")) command_count = len(load.get("task_sequence")) print("{0:15} \t {1:10} {2:10} \t\t {3:5} \t\t {4:5}". format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count)) except ValueError: pass # 指定一個劇本並運行 def RunPlayBook(rule_name): rootdir = os.getcwd() + "\\rule\\" all_files = [f for f in os.listdir(rootdir)] for switch in all_files: if( switch.endswith(".json") == True): all_switch_dir = rootdir + switch # 尋找到需要加載的劇本地址 if( switch == rule_name): with open(all_switch_dir , "r" ,encoding="utf-8") as read_file: data = json.loads(read_file.read()) address_list = data.get("address_list") # 循環每個主機任務 for each in address_list: # 得到劇本內容 task_sequence = data.get("task_sequence") default_port = data.get("default_port") print("-" * 90) print("地址: {0:15} 用戶名: {1:10} 密碼: {2:10}".format(each[0],each[1],each[2])) print("-" * 90) for task in task_sequence: flag = judge(task[0]) if flag == "IF": ref = BatchCMD(each[0],each[1],each[2],default_port,task[1]) print(ref) elif flag == False: ref = BatchCMD(each[0],each[1],each[2],default_port,task[0]) print(ref) if __name__ == "__main__": RunPlayBook("get_log.json")
定義劇本規范如下。
{ "framework": "Centos", "version": "7.0", "address_list": [ ["192.168.191.3","root","1233"] ], "default_port": "22", "task_sequence": [ ["ifconfig"], ["IF{ GetLastCmdFlag() == True }","uname"] ] }
詞法分析: 詞法分析解析劇本內容。
# 獲取特定目錄下所有的劇本 def GetAllRule(): rootdir = os.getcwd() + "\\rule\\" all_files = [f for f in os.listdir(rootdir)] print("-" * 90) print("{0:15} \t {1:10} \t {2:10} \t {3:5} \t {4:5}".format("劇本名稱","應用平臺","應用端口","執行主機數","命令條數")) print("-" * 90) for switch in all_files: # 首先判斷文件結尾是否為Json if( switch.endswith(".json") == True): all_switch_dir = rootdir + switch try: # 判斷文件內部是否符合JSON規范 with open(all_switch_dir , "r" ,encoding="utf-8") as read_file: # 判斷是否存在指定字段來識別規范 load = json.loads(read_file.read()) if load.get("framework") != None and load.get("task_sequence") != None: run_addr_count = len(load.get("address_list")) command_count = len(load.get("task_sequence")) print("{0:15} \t {1:10} {2:10} \t\t {3:5} \t\t {4:5}". format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count)) except ValueError: pass # 句式解析器,解析句子並執行 def judge(string): # 如果匹配到IF則執行判斷條件解析 if re.findall(r'IF{ (.*?) }', string, re.M) != []: # 則繼續提取出他的表達式 ptr = re.compile(r'IF[{] (.*?) [}]',re.S) subject = re.findall(ptr,string)[0] subject_list = subject.split(" ") # 公開接口,執行命令 ssh = MySSH("192.168.191.3","root","1233","22") # 組合命令並執行 sentence = str(eval(subject_list[0]) + subject_list[1] + subject_list[2]) if eval(sentence): return "IF",ssh else: return False # 如果匹配到put則執行上傳解析 elif re.findall(r'PUT{ (.*?) }', string, re.M) != []: print("put") return False # 指定一個劇本並運行 def RunPlayBook(rule_name): rootdir = os.getcwd() + "\\rule\\" all_files = [f for f in os.listdir(rootdir)] for switch in all_files: if( switch.endswith(".json") == True): all_switch_dir = rootdir + switch # 尋找到需要加載的劇本地址 if( switch == rule_name): with open(all_switch_dir , "r" ,encoding="utf-8") as read_file: data = json.loads(read_file.read()) address_list = data.get("address_list") # 循環每個主機任務 for each in address_list: # 得到劇本內容 task_sequence = data.get("task_sequence") default_port = data.get("default_port") print("-" * 90) print("地址: {0:15} 用戶名: {1:10} 密碼: {2:10}".format(each[0],each[1],each[2])) print("-" * 90) for task in task_sequence: flag,obj = judge(task[0]) if flag == "IF": ret = obj.BatchCMD(task[1]) print(ret) if __name__ == '__main__': ret = judge("IF{ ssh.GetFileSize('/etc/passwd') >= 4 }") print(ret)
MySSH類最終封裝: 通過面向對象對其進行封裝,實現瞭查詢CPU,負載,內存利用率,磁盤容量,等通用數據的獲取。
import paramiko, math,json class MySSH: def __init__(self, address, username, password, default_port): self.address = address self.default_port = default_port self.username = username self.password = password # 初始化,遠程模塊 def Init(self): try: self.ssh_obj = paramiko.SSHClient() self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3, allow_agent=False, look_for_keys=False) self.sftp_obj = self.ssh_obj.open_sftp() except Exception: return False # 執行非交互命令 def BatchCMD(self, command): try: stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3) result = stdout.read() if len(result) != 0: result = str(result).replace("\\n", "\n") result = result.replace("b'", "").replace("'", "") return result else: return None except Exception: return None # 將遠程文件下載到本地 def GetRemoteFile(self, remote_path, local_path): try: self.sftp_obj.get(remote_path, local_path) return True except Exception: return False # 將本地文件上傳到遠程 def PutLocalFile(self, localpath, remotepath): try: self.sftp_obj.put(localpath, remotepath) return True except Exception: return False # 關閉接口 def CloseSSH(self): try: self.sftp_obj.close() self.ssh_obj.close() except Exception: pass # 獲取文件大小 def GetFileSize(self, file_path): ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'") return ref.replace("\n", "") # 判斷文件是否存在 def IsFile(self, file_path): return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path)) # 獲取系統型號 def GetSystemVersion(self): return self.BatchCMD("uname") # 檢測目標主機存活狀態 def GetPing(self): try: if self.GetSystemVersion() != None: return True else: return False except Exception: return False # 獲取文件列表,並得到大小 def GetFileList(self, path): try: ref_list = [] self.sftp_obj.chdir(path) file_list = self.sftp_obj.listdir("./") for sub_path in file_list: dict = {} file_size = self.GetFileSize(path + sub_path) dict[path + sub_path] = file_size ref_list.append(dict) return ref_list except Exception: return False # 將遠程文件全部打包後拉取到本地 def GetTarPackageAll(self, path): try: file_list = self.sftp_obj.listdir(path) self.sftp_obj.chdir(path) for packageName in file_list: self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packageName)) self.sftp_obj.get("/tmp/{}.tar.gz".format(packageName), "./file/{}.tar.gz".format(packageName)) self.sftp_obj.remove("/tmp/{}.tar.gz".format(packageName)) return True except Exception: return True # 獲取磁盤空間並返回字典 def GetAllDiskSpace(self): ref_dict = {} cmd_dict = {"Linux\n": "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'", "AIX\n": "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): # 根據不同版本選擇不同的命令 os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") # 循環將其轉換為字典 for each in ref_list: # 判斷最後是否為空,過濾最後一項 if each != "": ref_dict[str(each.split(":")[1])] = str(each.split(":")[0]) return ref_dict except Exception: return False # 獲取系統內存利用率 def GetAllMemSpace(self): cmd_dict = {"Linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'", "AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): # 根據不同版本選擇不同的命令 os_ref = self.BatchCMD(run_cmd) # 首先現將KB轉化為MB mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n", "")) / 1024) mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n", "")) / 1024) # 計算占用空間百分比 percentage = 100 - int(mem_free / int(mem_total / 100)) # 拼接字典數據 return {"Total": str(mem_total), "Free": str(mem_free), "Percentage": str(percentage)} except Exception: return False # 獲取系統進程信息,並返回字典格式 def GetAllProcessSpace(self): ref_dict = {} cmd_dict = {"Linux\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $11}' | uniq", "AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": ref_dict[str(each.split(":")[0])] = str(each.split(":")[1]) return ref_dict except Exception: return False # 獲取CPU利用率 def GetCPUPercentage(self): ref_dict = {} cmd_dict = {"Linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'", "AIX\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": each = each.split(":") ref_dict = {"us": each[0],"sys":each[1],"idea":each[2]} return ref_dict except Exception: return False # 獲取機器的負載情況 def GetLoadAVG(self): ref_dict = {} cmd_dict = {"Linux\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'", "AIX\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": each = each.replace(",","").split(":") ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]} return ref_dict return False except Exception: return False # 修改當前用戶密碼 def SetPasswd(self,username,password): try: os_id = self.BatchCMD("id | awk {'print $1'}") print(os_id) if(os_id == "uid=0(root)\n"): self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username)) return True except Exception: return False # 定義超類,集成基類MySSH class SuperSSH(MySSH): def __init__(self,address, username, password, default_port): super(SuperSSH, self).__init__(address, username, password, default_port)
我們繼續為上面的代碼加上命令行,讓其可以直接使用,這裡需要遵循一定的格式規范,我們使用JSON解析數據,JSON格式如下.
{ "aix": [ ["192.168.1.1","root","123123"], ["192.168.1.1","root","2019"], ], "suse": [ ["192.168.1.1","root","123123"], ], "centos": [ ["192.168.1.1","root","123123"], ] }
接著是主程序代碼,如下所示.
# -*- coding: utf-8 -*- from MySSH import MySSH import json,os,sys,argparse class InitJson(): def __init__(self,db): self.db_name = db def GetPlatform(self,plat): with open(self.db_name, "r", encoding="utf-8") as Read_Pointer: load_json = json.loads(Read_Pointer.read()) for k,v in load_json.items(): try: if k == plat: return v except Exception: return None return None if __name__ == "__main__": ptr = InitJson("database.json") parser = argparse.ArgumentParser() parser.add_argument("-G","--group",dest="group",help="指定主機組") parser.add_argument("-C","--cmd",dest="cmd",help="指定CMD命令") parser.add_argument("--get",dest="get",help="指定獲取數據類型<ping>") parser.add_argument("--dst", dest="dst_file",help="目標位置") parser.add_argument("--src",dest="src_file",help="原文件路徑") args = parser.parse_args() # 批量CMD --group=aix --cmd=ls if args.group and args.cmd: platform = ptr.GetPlatform(args.group) success,error = [],[] for each in platform: ssh = MySSH(each[0], each[1], each[2], 22) if ssh.Init() != False: print("-" * 140) print("主機: {0:15} \t 賬號: {1:10} \t 密碼: {2:10} \t 命令: {3:30}". format(each[0], each[1], each[2], args.cmd)) print("-" * 140) print(ssh.BatchCMD(args.cmd)) ssh.CloseSSH() success.append(each[0]) else: error.append(each[0]) ssh.CloseSSH() print("\n\n","-" * 140, "\n 執行報告 \n", "-" * 140, "\n失敗主機: {}\n".format(error), "-" * 140) # 批量獲取主機其他數據 --group=centos --get=ping if args.group and args.get: platform = ptr.GetPlatform(args.group) success, error = [], [] for each in platform: ssh = MySSH(each[0], each[1], each[2], 22) # 判斷是否為ping if ssh.Init() != False: if args.get == "ping": ret = ssh.GetPing() if ret == True: print("[*] 主機: {} 存活中.".format(each[0])) # 收集磁盤數據 elif args.get == "disk": print("-" * 140) print("主機: {0:15} \t 賬號: {1:10} \t 密碼: {2:10}". format(each[0], each[1], each[2])) print("-" * 140) ret = ssh.GetAllDiskSpace() for k, v in ret.items(): if (v.split("%")[0] != "-"): space_ret = int(v.split("%")[0]) if space_ret >= 70: print("磁盤分區: {0:30} \t 磁盤占用: {1:5} -> [警告]".format(k, v)) continue if space_ret >= 50: print("磁盤分區: {0:30} \t 磁盤占用: {1:5} -> [警惕]".format(k, v)) continue else: print("磁盤分區: {0:30} \t 磁盤占用: {1:5}".format(k, v)) continue print() else: error.append(each[0]) ssh.CloseSSH() print("\n\n", "-" * 140, "\n 執行報告 \n", "-" * 140, "\n失敗主機: {}\n".format(error), "-" * 140) # 實現文件上傳過程 --group=centos --src=./a.txt --dst=/tmp/test.txt if args.group and args.src_file and args.dst_file: platform = ptr.GetPlatform(args.group) success, error = [], [] for each in platform: ssh = MySSH(each[0], each[1], each[2], 22) if ssh.Init() != False: ret = ssh.PutLocalFile(args.src_file,args.dst_file) if ret == True: print("主機: {} \t 本地文件: {} \t ---> 傳到: {}".format(each[0], args.src_file,args.dst_file)) else: error.append(each[0]) ssh.CloseSSH() print("\n\n", "-" * 140, "\n 執行報告 \n", "-" * 140, "\n失敗主機: {}\n".format(error), "-" * 140)
簡單的使用命令:
遠程CMD: python main.py –group=centos –cmd=”free -h | grep -v ‘total'”
判斷存活: python main.py –group=centos –get=”ping”
拉取磁盤:python main.py –group=suse –get=”disk”
批量上傳文件: python main.py –group=suse –src=”./aaa” –dst=”/tmp/bbb.txt”
由於我的設備少,所以沒開多線程,擔心開多線程對目標造成過大壓力,也沒啥必要。
番外: 另外我研究瞭一個主機分組的小工具,加上命令執行代碼量才800行,實現瞭一個分組數據庫,在這裡記下使用方法。
默認運行進入一個交互式shell環境。
Init = 初始化json文件,ShowHostList=顯示所有主機,ShowGroup=顯示所有組,ShowAllGroup=顯示所有主機包括組。
添加修改與刪除記錄,命令如下。
添加刪除主機組。
通過UUID向主機組中添加或刪除主機記錄。
測試主機組連通性。
以上就是Python如何實現Paramiko的二次封裝的詳細內容,更多關於Python實現Paramiko的二次封裝的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- Python封裝zabbix-get接口的代碼分享
- Python數據類型最全知識總結
- Python使用paramiko連接遠程服務器執行Shell命令的實現
- Python實現信息管理系統
- Python函數的作用域及內置函數詳解