python腳本使用阿裡雲slb對惡意攻擊進行封堵的實現
環境準備:
1.安裝python3.7和相關的依賴
並安裝redis緩存數據庫
pip install aliyun-python-sdk-core pip install aliyun-python-sdk-slb pip intall IPy pip intall redis pip intall paramiko
2.添加ram訪問控制的編程接口用戶
3.添加slb的訪問控制策略並和需要頻控的slb進行綁定
redis封堵ip的格式
腳本程序目錄
Aliyun_SLB_Manager ├── helpers │ ├── common.py │ ├── email.py │ ├── remote.py │ └── slb.py ├── logs │ └── run_20210204.log └── run.py
# 程序核心就是使用shell命令對nginx的日志中出現的ip地址 和 訪問的接口進行過濾,找出訪問頻繁的那些程序加入slb黑名單,同時加入redis緩存,因為slb有封堵ip個數限制,redis中存儲的ip需要設置過期時間,對比後刪除slb中封堵的Ip
# grep 04/Feb/2021:15:4 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -200 2454 114.248.45.15 1576 47.115.122.23 1569 47.107.239.148 269 112.32.217.52 grep 04/Feb/2021:14:5 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}' [root@alisz-edraw-api-server-web01:~]# grep 04/Feb/2021:15:4 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -3 2454 114.248.45.15 1576 47.115.122.23 1569 47.107.239.148
python腳本
主入口程序
run.py
import time from helpers.email import send_mail from helpers.remote import get_black_ips from helpers.common import is_white_ip,get_ban_ip_time,set_ban_ip_time,groups from helpers.slb import slb_add_host,slb_del_host,slb_get_host if __name__ == "__main__": # aliyun 訪問控制針對 slb 的管理用戶 # 用戶登錄名稱 [email protected] accessKeyId = 'id' accessSecret = 'pass' # slb 訪問控制策略id acl_id = 'acl-slb' # reginid 查詢地址:https://help.aliyun.com/document_detail/40654.html?spm=a2c6h.13066369.0.0.54a17471VmN3kA region_id = 'cn-shenzhen' # 黑名單限制個數 300 slb_limit = 200 # 每10分鐘訪問限制閾值 threshold = 50 # 接收郵箱 mails = ['[email protected]'] # 遠程ssh執行grep過濾出可疑ip res = get_black_ips(threshold) deny_host_list = res[0] hosts_with_count = res[1] hosts_with_count = sorted(hosts_with_count.items(), key=lambda x: x[1] , reverse=True) print(hosts_with_count) # exit() # 等待被ban的ip , 過濾掉ip白名單 deny_hosts = [] for host in deny_host_list: if (is_white_ip(host) == False): deny_hosts.append(host + '/32') # 獲取所有已經被ban的ip response = slb_get_host(accessKeyId , accessSecret , acl_id , region_id) denied_hosts = [] if('AclEntrys' in response.keys()): for item in response['AclEntrys']['AclEntry']: denied_hosts.append(item['AclEntryIP']) # 被ban超過2天,首先移除 must_del_hosts = [] denied_hosts_clone = denied_hosts.copy() for host in denied_hosts: if (get_ban_ip_time(host) == 0 or (get_ban_ip_time(host) < int(round(time.time())) - 2* 24 * 3600)): must_del_hosts.append(host) denied_hosts_clone.remove(host) # 排除相同的 deny_hosts_new = [] for item in deny_hosts: if(item not in denied_hosts_clone): deny_hosts_new.append(item) # 兩者和超過300的限制 if((len(denied_hosts_clone)+len(deny_hosts_new))>slb_limit): denied_hosts_detail = {} for host in denied_hosts_clone: denied_hosts_detail[host] = get_ban_ip_time(host) # 需要排除的數量 num = len(denied_hosts_clone) + len(deny_hosts_new) - slb_limit denied_hosts_detail = sorted(denied_hosts_detail.items(), key=lambda x: x[1]) denied_hosts_detail = denied_hosts_detail[:num] for item in denied_hosts_detail: must_del_hosts.append(item[0]) print("denied:",denied_hosts) print("delete:",must_del_hosts) print("add:",deny_hosts_new) # exit() # 先刪除一部分 must_del_hosts if(len(must_del_hosts)>0): if (len(must_del_hosts)>50): must_del_hosts_clone = groups(must_del_hosts,50) for item in must_del_hosts_clone: slb_del_host(item, accessKeyId, accessSecret, acl_id, region_id) time.sleep(1) else : slb_del_host(must_del_hosts, accessKeyId, accessSecret, acl_id, region_id) # 再新增 deny_hosts_new if(len(deny_hosts_new)>0): if(len(deny_hosts_new)>50): deny_hosts_new_clone = groups(deny_hosts_new,50) for item in deny_hosts_new_clone: slb_add_host(item, accessKeyId, accessSecret, acl_id, region_id) time.sleep(1) else: slb_add_host(deny_hosts_new, accessKeyId, accessSecret, acl_id, region_id) # 記錄ip被禁時間 for host in deny_hosts_new: set_ban_ip_time(host) if (len(deny_hosts_new) >= 1): mail_content = '' if(len(must_del_hosts) > 0): mail_content += "以下黑名單已被解禁("+str(len(must_del_hosts))+"):\n"+"\n".join(must_del_hosts) + "\n" mail_content += "\n新增以下ip黑名單("+str(len(deny_hosts_new))+"):\n"+"\n".join(deny_hosts_new) mail_content += "\n\n10分鐘訪問超過15次("+str(len(hosts_with_count))+"):\n" for item in hosts_with_count: mail_content += str(item[1]) + " " + str(item[0]) + "\n" mail_content += "\n\n黑名單("+str(len(denied_hosts))+"個):\n" for item in denied_hosts: mail_content += str(item) + "\n" send_mail(mail_content , mails)
slb操作相關的腳本
slb.py
import logging , json from aliyunsdkcore.client import AcsClient from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest from aliyunsdkslb.request.v20140515.DescribeAccessControlListAttributeRequest import DescribeAccessControlListAttributeRequest # 阿裡雲slb訪問控制裡添加ip def slb_add_host(hosts, accessKeyId, accessSecret, acl_id, region_id): client = AcsClient(accessKeyId, accessSecret, region_id) request = AddAccessControlListEntryRequest() request.set_accept_format('json') logging.info("正在封印IP:%s" % ",".join(hosts)) try: add_hosts = [] for host in hosts: add_hosts.append({"entry": host, "comment": "deny"}) request.set_AclEntrys(add_hosts) request.set_AclId(acl_id) response = client.do_action_with_exception(request) print(response) except BaseException as e: logging.error("添加黑名單失敗,原因:%s" % e) # slb刪除ip def slb_del_host(hosts, accessKeyId, accessSecret, acl_id , region_id = 'us-west-1'): logging.info("正在解封IP:%s" % ",".join(hosts)) try: del_hosts = [] for host in hosts: del_hosts.append({"entry": host, "comment": "deny"}) client = AcsClient(accessKeyId, accessSecret, region_id) request = RemoveAccessControlListEntryRequest() request.set_accept_format('json') request.set_AclEntrys(del_hosts) request.set_AclId(acl_id) client.do_action_with_exception(request) logging.info("slb刪除IP:%s成功" % ",".join(hosts)) # 查看調用接口結果 logging.info("slb刪除IP:%s成功" % ",".join(hosts)) # 查看調用接口結果 except BaseException as e: logging.error("移出黑名單失敗,原因:%s" % e) # 阿裡雲slb獲取IP黑名單列表 def slb_get_host(accessKeyId, accessSecret, acl_id, region_id): client = AcsClient(accessKeyId, accessSecret, region_id) request = DescribeAccessControlListAttributeRequest() request.set_accept_format('json') try: request.set_AclId(acl_id) response = client.do_action_with_exception(request) data_sub = json.loads((response.decode("utf-8"))) return data_sub except BaseException as e: logging.error("獲取黑名單失敗,原因:%s" % e)
遠程操作日志的腳本
remote.py
#!/usr/bin/python # -*- coding: UTF-8 -*- import datetime import re import paramiko def get_black_ips(threshold = 100): # file = '/data/www/logs/nginx_log/access/*api*_access.log' file = '/data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log' # 可以 ssh 訪問服務器 nginx 日志的用戶信息 username = 'apache' passwd = 'pass' ten_min_time = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%d/%b/%Y:%H:%M") ten_min_time = ten_min_time[:-1] # 線上 需要對日志進行過濾的目標服務器,一般是內網ip,本地調試時可以直接使用外網ip方便調試 ssh_hosts = ['1.1.1.1'] deny_host_list = [] for host in ssh_hosts: ''' # 過濾日志文件,需要顯示如下效果,次數 ip地址,需要定位具體的api接口,否則誤傷率極高 # grep 04/Feb/2021:15:2 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -5 | awk '{if ($1 >15)print $1,$2}' 2998 116.248.89.2 2381 114.248.45.15 1639 47.107.239.148 1580 47.115.122.23 245 59.109.149.45 ''' shell = ( # "grep %s %s | grep '/index.php?submod=checkout&method=index&pid' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'") % ( # grep 04/Feb/2021:14:5 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api/user' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}' "grep %s %s | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >2000)print $1,$2}'") % ( ten_min_time, file) print(shell) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host, port=2020, username=username, password=passwd) stdin, stdout, stderr = ssh.exec_command(shell) result = stdout.read().decode(encoding="utf-8") deny_host_re = re.compile(r'\d{1,99} \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}') deny_host_re = deny_host_re.findall(result) deny_host_list = deny_host_list + deny_host_re uniq_host = {} for host_str in deny_host_list: tmp = host_str.split(' ') if tmp[1] in uniq_host: uniq_host[tmp[1]] += int(tmp[0]) else: uniq_host[tmp[1]] = int(tmp[0]) deny_host_list = [] for v in uniq_host: if (uniq_host[v] > threshold): deny_host_list.append(v) return [deny_host_list , uniq_host]
發送郵件的腳本
email.py
#!/usr/bin/python # -*- coding: UTF-8 -*- import smtplib from email.mime.text import MIMEText from email.header import Header import logging def send_mail(host , receivers): # 發送郵件的服務器,用戶信息 mail_host = "smtpdm-ap-southeast-1.aliyun.com" mail_user = "[email protected]" mail_pass = "pass" sender = '[email protected]' message = MIMEText('chinasoft國內接口被刷,單個IP最近10分鐘內訪問超過閾值100次會收到此郵件告警!!!!\n%s' % (host), 'plain', 'utf-8') message['From'] = Header("chinasoft國內接口被刷", 'utf-8') subject ='[DDOS]購買鏈接接口異常鏈接!!' message['Subject'] = Header(subject, 'utf-8') try: smtpObj = smtplib.SMTP(mail_host, 80) smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender, receivers, message.as_string()) logging.info("郵件發送成功") except smtplib.SMTPException as e: logging.error("發送郵件失敗,原因:%s" % e)
配置文件
common.py
#!/usr/bin/python # -*- coding: UTF-8 -*- import IPy from functools import reduce import redis,time def groups(L1,len1): groups=zip(*(iter(L1),)*len1) L2=[list(i) for i in groups] n=len(L1) % len1 L2.append(L1[-n:]) if n !=0 else L2 return L2 def ip_into_int(ip): return reduce(lambda x, y: (x << 8) + y, map(int, ip.split('.'))) # 過濾掉內網ip def is_internal_ip(ip): ip = ip_into_int(ip) net_a = ip_into_int('10.255.255.255') >> 24 net_b = ip_into_int('172.31.255.255') >> 20 net_c = ip_into_int('192.168.255.255') >> 16 return ip >> 24 == net_a or ip >> 20 == net_b or ip >> 16 == net_c # 是否為白名單ip (公司內網+集群內網ip+slb和需要互訪的服務器ip避免誤殺) def is_white_ip(ip): if (is_internal_ip(ip)): return True white_hosts = [ # web-servers '1.1.1.1', '1.1.1.2', ]; for white in white_hosts: if (ip in IPy.IP(white)): return True return False def get_ban_ip_time(ip): pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1) client = redis.Redis(connection_pool=pool) key = 'slb_ban_'+ip val = client.get(key) if val == None: return 0 else : return int(val) def set_ban_ip_time(ip): pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1) client = redis.Redis(connection_pool=pool) key = 'slb_ban_'+ip timestamp = time.time() timestamp = int(round(timestamp)) return client.set(key , timestamp , 86400)
本地可以直接運行run.py進行調試
到此這篇關於python腳本使用阿裡雲slb對惡意攻擊進行封堵的實現的文章就介紹到這瞭,更多相關python腳本阿裡雲slb內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Shell常用服務器日志分析命令總結
- python 利用百度API進行淘寶評論關鍵詞提取
- python調用文字識別OCR輕松搞定驗證碼
- 教你如何精準統計出你的接口"QPS"
- Pipes實現LeetCode(192.單詞頻率)