add fail2ban scripts
This commit is contained in:
83
fail2ban/action.d/modifyBanList.py
Normal file
83
fail2ban/action.d/modifyBanList.py
Normal file
@ -0,0 +1,83 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
"""
|
||||
Called by entryPoint.py, performs the addition or deletion of an item in a Cloudflare custom list.
|
||||
Support for IPV6 is limited as it blocks the entire /64 subnet.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import requests
|
||||
from requests import Response
|
||||
import json
|
||||
import ipaddress
|
||||
|
||||
def getIPList(apiEndpoint : str, headers : dict) -> json:
|
||||
response = requests.get(apiEndpoint, headers=headers)
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
print(f"Failed to fetch existing IP list. Status code: {response.status_code}")
|
||||
print(response.text)
|
||||
sys.exit(1)
|
||||
|
||||
def addIPtoList(ipAddr : str, apiEndpoint : str, headers : dict) -> Response:
|
||||
payload = [{"ip": ipAddr}]
|
||||
response = requests.post(apiEndpoint, headers=headers, data=json.dumps(payload))
|
||||
return response
|
||||
|
||||
def removeIPFromList(ipId : str, apiEndpoint : str, headers : dict) -> Response:
|
||||
payload = {"items": [{"id": ipId}]}
|
||||
response = requests.delete(apiEndpoint, headers=headers, data=json.dumps(payload))
|
||||
return response
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: ./modifyBanList.py <ip> <add|del>")
|
||||
sys.exit(1)
|
||||
|
||||
ipAddr = sys.argv[1]
|
||||
|
||||
try:
|
||||
addr = ipaddress.IPv6Address(ipAddr)
|
||||
first_64_bits = str(addr.exploded).split(':')[:4]
|
||||
ipAddr = ':'.join(first_64_bits) + '::/64'
|
||||
except:
|
||||
pass
|
||||
|
||||
action = sys.argv[2]
|
||||
listId = ''
|
||||
accountId = ''
|
||||
email = ''
|
||||
apiKey = ''
|
||||
apiEndpoint = f'https://api.cloudflare.com/client/v4/accounts/{accountId}/rules/lists/{listId}/items'
|
||||
|
||||
headers = {
|
||||
'X-Auth-Email': f'{email}',
|
||||
'X-Auth-Key': f'{apiKey}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
existingIpList = getIPList(apiEndpoint,headers)
|
||||
print(existingIpList)
|
||||
response = None
|
||||
|
||||
if action == "del":
|
||||
ipId = None
|
||||
for item in existingIpList['result']:
|
||||
if item['ip'] == ipAddr:
|
||||
ipId = item['id']
|
||||
break
|
||||
payload = {"items": [{"id": ipId}]}
|
||||
|
||||
if ipId is not None:
|
||||
response = requests.delete(apiEndpoint,headers=headers,data=json.dumps(payload))
|
||||
elif not any(item['ip'] == ipAddr for item in existingIpList['result']):
|
||||
payload = [{
|
||||
"ip": ipAddr
|
||||
}]
|
||||
response = requests.post(apiEndpoint, headers=headers, data=json.dumps(payload))
|
||||
|
||||
if response is not None and response.status_code == 200:
|
||||
print(f"IP address {ipAddr} {action} to the custom IP list successfully.")
|
||||
else:
|
||||
print(f"Failed to {action} IP address {ipAddr} to the custom IP list.")
|
Reference in New Issue
Block a user