92 lines
2.7 KiB
Python
92 lines
2.7 KiB
Python
import re
|
|
import ipaddress
|
|
import time
|
|
|
|
# Colors for console output
|
|
class bcolors:
|
|
HEADER = '\033[95m'
|
|
OKBLUE = '\033[94m'
|
|
OKCYAN = '\033[96m'
|
|
OKGREEN = '\033[92m'
|
|
WARNING = '\033[93m'
|
|
FAIL = '\033[91m'
|
|
ENDC = '\033[0m'
|
|
BOLD = '\033[1m'
|
|
UNDERLINE = '\033[4m'
|
|
|
|
# Emoji mapping
|
|
emoji_map = {
|
|
'BLOCK': '🚫',
|
|
'ALLOW': '✅',
|
|
'DENY': '❌',
|
|
'LIMIT': '⚠️'
|
|
}
|
|
|
|
# Function to shorten IPv6 addresses
|
|
def shorten_ipv6(ip):
|
|
try:
|
|
return str(ipaddress.IPv6Address(ip).compressed)
|
|
except ipaddress.AddressValueError:
|
|
return ip
|
|
|
|
# Function to format and print a log entry
|
|
def format_and_print_log(log):
|
|
match = re.search(r'\[UFW (\w+)\] IN=(\S+) OUT=(\S*) MAC=([\w:]+) SRC=([\da-fA-F:.]+) DST=([\da-fA-F:.]+) LEN=(\d+) .*PROTO=(\w+) SPT=(\d+) DPT=(\d+)', log)
|
|
if match:
|
|
action, iface_in, iface_out, mac, src_ip, dst_ip, length, proto, src_port, dst_port = match.groups()
|
|
src_ip = shorten_ipv6(src_ip)
|
|
dst_ip = shorten_ipv6(dst_ip)
|
|
emoji = emoji_map.get(action, '')
|
|
color = bcolors.FAIL if action == 'BLOCK' else bcolors.OKGREEN
|
|
|
|
# Define fixed column widths
|
|
col_width_action = 2
|
|
col_width_ip = 39
|
|
col_width_port = 5
|
|
col_width_proto = 5
|
|
|
|
# Determine direction arrow
|
|
direction_arrow = '➡️' if iface_in != '-' else '⬅️'
|
|
|
|
# Format strings to fixed width
|
|
action_str = f"{emoji}".ljust(col_width_action)
|
|
src_ip_str = f"{src_ip}".ljust(col_width_ip)
|
|
src_port_str = f"{src_port}".ljust(col_width_port)
|
|
dst_ip_str = f"{dst_ip}".ljust(col_width_ip)
|
|
dst_port_str = f"{dst_port}".ljust(col_width_port)
|
|
proto_str = f"{proto}".ljust(col_width_proto)
|
|
|
|
formatted_log = (
|
|
f"{color}{action_str} {src_ip_str} {src_port_str} {direction_arrow} {dst_ip_str} {dst_port_str} {proto_str}{bcolors.ENDC}"
|
|
)
|
|
print(formatted_log)
|
|
else:
|
|
print(log.strip()) # Remove trailing newline
|
|
|
|
# Read log file and follow it continuously
|
|
log_file = '/var/log/ufw.log'
|
|
|
|
def follow(file):
|
|
file.seek(0, 2) # Move to the end of the file
|
|
while True:
|
|
line = file.readline()
|
|
if not line:
|
|
time.sleep(0.1) # Wait briefly before checking again
|
|
continue
|
|
yield line
|
|
|
|
with open(log_file, 'r') as file:
|
|
# Display the first 10 entries
|
|
for _ in range(10):
|
|
log = file.readline()
|
|
if not log:
|
|
break
|
|
format_and_print_log(log)
|
|
|
|
# Switch to continuous monitoring mode
|
|
log_lines = follow(file)
|
|
|
|
# Parse and format log entries
|
|
for log in log_lines:
|
|
format_and_print_log(log)
|