import re import ipaddress import time # Colors for console output class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' # Emoji mapping emoji_map = { 'BLOCK': '🚫', 'ALLOW': '✅', 'DENY': '❌', 'LIMIT': '⚠️' } # Function to shorten IPv6 addresses def shorten_ipv6(ip): try: return str(ipaddress.IPv6Address(ip).compressed) except ipaddress.AddressValueError: return ip # Function to format and print a log entry def format_and_print_log(log): match = re.search(r'\[UFW (\w+)\] IN=(\S+) OUT=(\S*) MAC=([\w:]+) SRC=([\da-fA-F:.]+) DST=([\da-fA-F:.]+) LEN=(\d+) .*PROTO=(\w+) SPT=(\d+) DPT=(\d+)', log) if match: action, iface_in, iface_out, mac, src_ip, dst_ip, length, proto, src_port, dst_port = match.groups() src_ip = shorten_ipv6(src_ip) dst_ip = shorten_ipv6(dst_ip) emoji = emoji_map.get(action, '') color = bcolors.FAIL if action == 'BLOCK' else bcolors.OKGREEN # Define fixed column widths col_width_action = 2 col_width_ip = 39 col_width_port = 5 col_width_proto = 5 # Determine direction arrow direction_arrow = '➡️' if iface_in != '-' else '⬅️' # Format strings to fixed width action_str = f"{emoji}".ljust(col_width_action) src_ip_str = f"{src_ip}".ljust(col_width_ip) src_port_str = f"{src_port}".ljust(col_width_port) dst_ip_str = f"{dst_ip}".ljust(col_width_ip) dst_port_str = f"{dst_port}".ljust(col_width_port) proto_str = f"{proto}".ljust(col_width_proto) formatted_log = ( f"{color}{action_str} {src_ip_str} {src_port_str} {direction_arrow} {dst_ip_str} {dst_port_str} {proto_str}{bcolors.ENDC}" ) print(formatted_log) else: print(log.strip()) # Remove trailing newline # Read log file and follow it continuously log_file = '/var/log/ufw.log' def follow(file): file.seek(0, 2) # Move to the end of the file while True: line = file.readline() if not line: time.sleep(0.1) # Wait briefly before checking again continue yield line with open(log_file, 'r') as file: # Display the first 10 entries for _ in range(10): log = file.readline() if not log: break format_and_print_log(log) # Switch to continuous monitoring mode log_lines = follow(file) # Parse and format log entries for log in log_lines: format_and_print_log(log)