From 823548bab78d40b796fc635cc617a46f8a2e17ef Mon Sep 17 00:00:00 2001 From: Enrico Ludwig Date: Thu, 4 Jul 2024 19:33:01 +0200 Subject: [PATCH] added python script to read beautified ufw logs --- ufw_beautifier.py | 91 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 ufw_beautifier.py diff --git a/ufw_beautifier.py b/ufw_beautifier.py new file mode 100644 index 0000000..4357448 --- /dev/null +++ b/ufw_beautifier.py @@ -0,0 +1,91 @@ +import re +import ipaddress +import time + +# Colors for console output +class bcolors: + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKCYAN = '\033[96m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + +# Emoji mapping +emoji_map = { + 'BLOCK': '🚫', + 'ALLOW': '✅', + 'DENY': '❌', + 'LIMIT': '⚠️' +} + +# Function to shorten IPv6 addresses +def shorten_ipv6(ip): + try: + return str(ipaddress.IPv6Address(ip).compressed) + except ipaddress.AddressValueError: + return ip + +# Function to format and print a log entry +def format_and_print_log(log): + match = re.search(r'\[UFW (\w+)\] IN=(\S+) OUT=(\S*) MAC=([\w:]+) SRC=([\da-fA-F:.]+) DST=([\da-fA-F:.]+) LEN=(\d+) .*PROTO=(\w+) SPT=(\d+) DPT=(\d+)', log) + if match: + action, iface_in, iface_out, mac, src_ip, dst_ip, length, proto, src_port, dst_port = match.groups() + src_ip = shorten_ipv6(src_ip) + dst_ip = shorten_ipv6(dst_ip) + emoji = emoji_map.get(action, '') + color = bcolors.FAIL if action == 'BLOCK' else bcolors.OKGREEN + + # Define fixed column widths + col_width_action = 2 + col_width_ip = 39 + col_width_port = 5 + col_width_proto = 5 + + # Determine direction arrow + direction_arrow = '➡️' if iface_in != '-' else '⬅️' + + # Format strings to fixed width + action_str = f"{emoji}".ljust(col_width_action) + src_ip_str = f"{src_ip}".ljust(col_width_ip) + src_port_str = f"{src_port}".ljust(col_width_port) + dst_ip_str = f"{dst_ip}".ljust(col_width_ip) + dst_port_str = f"{dst_port}".ljust(col_width_port) + proto_str = f"{proto}".ljust(col_width_proto) + + formatted_log = ( + f"{color}{action_str} {src_ip_str} {src_port_str} {direction_arrow} {dst_ip_str} {dst_port_str} {proto_str}{bcolors.ENDC}" + ) + print(formatted_log) + else: + print(log.strip()) # Remove trailing newline + +# Read log file and follow it continuously +log_file = '/var/log/ufw.log' + +def follow(file): + file.seek(0, 2) # Move to the end of the file + while True: + line = file.readline() + if not line: + time.sleep(0.1) # Wait briefly before checking again + continue + yield line + +with open(log_file, 'r') as file: + # Display the first 10 entries + for _ in range(10): + log = file.readline() + if not log: + break + format_and_print_log(log) + + # Switch to continuous monitoring mode + log_lines = follow(file) + + # Parse and format log entries + for log in log_lines: + format_and_print_log(log)