added python script to read beautified ufw logs
This commit is contained in:
parent
00d9d2a8aa
commit
823548bab7
91
ufw_beautifier.py
Normal file
91
ufw_beautifier.py
Normal file
@ -0,0 +1,91 @@
|
||||
import re
|
||||
import ipaddress
|
||||
import time
|
||||
|
||||
# Colors for console output
|
||||
class bcolors:
|
||||
HEADER = '\033[95m'
|
||||
OKBLUE = '\033[94m'
|
||||
OKCYAN = '\033[96m'
|
||||
OKGREEN = '\033[92m'
|
||||
WARNING = '\033[93m'
|
||||
FAIL = '\033[91m'
|
||||
ENDC = '\033[0m'
|
||||
BOLD = '\033[1m'
|
||||
UNDERLINE = '\033[4m'
|
||||
|
||||
# Emoji mapping
|
||||
emoji_map = {
|
||||
'BLOCK': '🚫',
|
||||
'ALLOW': '✅',
|
||||
'DENY': '❌',
|
||||
'LIMIT': '⚠️'
|
||||
}
|
||||
|
||||
# Function to shorten IPv6 addresses
|
||||
def shorten_ipv6(ip):
|
||||
try:
|
||||
return str(ipaddress.IPv6Address(ip).compressed)
|
||||
except ipaddress.AddressValueError:
|
||||
return ip
|
||||
|
||||
# Function to format and print a log entry
|
||||
def format_and_print_log(log):
|
||||
match = re.search(r'\[UFW (\w+)\] IN=(\S+) OUT=(\S*) MAC=([\w:]+) SRC=([\da-fA-F:.]+) DST=([\da-fA-F:.]+) LEN=(\d+) .*PROTO=(\w+) SPT=(\d+) DPT=(\d+)', log)
|
||||
if match:
|
||||
action, iface_in, iface_out, mac, src_ip, dst_ip, length, proto, src_port, dst_port = match.groups()
|
||||
src_ip = shorten_ipv6(src_ip)
|
||||
dst_ip = shorten_ipv6(dst_ip)
|
||||
emoji = emoji_map.get(action, '')
|
||||
color = bcolors.FAIL if action == 'BLOCK' else bcolors.OKGREEN
|
||||
|
||||
# Define fixed column widths
|
||||
col_width_action = 2
|
||||
col_width_ip = 39
|
||||
col_width_port = 5
|
||||
col_width_proto = 5
|
||||
|
||||
# Determine direction arrow
|
||||
direction_arrow = '➡️' if iface_in != '-' else '⬅️'
|
||||
|
||||
# Format strings to fixed width
|
||||
action_str = f"{emoji}".ljust(col_width_action)
|
||||
src_ip_str = f"{src_ip}".ljust(col_width_ip)
|
||||
src_port_str = f"{src_port}".ljust(col_width_port)
|
||||
dst_ip_str = f"{dst_ip}".ljust(col_width_ip)
|
||||
dst_port_str = f"{dst_port}".ljust(col_width_port)
|
||||
proto_str = f"{proto}".ljust(col_width_proto)
|
||||
|
||||
formatted_log = (
|
||||
f"{color}{action_str} {src_ip_str} {src_port_str} {direction_arrow} {dst_ip_str} {dst_port_str} {proto_str}{bcolors.ENDC}"
|
||||
)
|
||||
print(formatted_log)
|
||||
else:
|
||||
print(log.strip()) # Remove trailing newline
|
||||
|
||||
# Read log file and follow it continuously
|
||||
log_file = '/var/log/ufw.log'
|
||||
|
||||
def follow(file):
|
||||
file.seek(0, 2) # Move to the end of the file
|
||||
while True:
|
||||
line = file.readline()
|
||||
if not line:
|
||||
time.sleep(0.1) # Wait briefly before checking again
|
||||
continue
|
||||
yield line
|
||||
|
||||
with open(log_file, 'r') as file:
|
||||
# Display the first 10 entries
|
||||
for _ in range(10):
|
||||
log = file.readline()
|
||||
if not log:
|
||||
break
|
||||
format_and_print_log(log)
|
||||
|
||||
# Switch to continuous monitoring mode
|
||||
log_lines = follow(file)
|
||||
|
||||
# Parse and format log entries
|
||||
for log in log_lines:
|
||||
format_and_print_log(log)
|
Reference in New Issue
Block a user