import re import ipaddress from datetime import datetime from typing import Dict, List, Any, Optional from models import LogEntry # iptables: ... SRC=1.2.3.4 DST=5.6.7.8 PROTO=TCP SPT=123 DPT=80 ... IPTABLES_RE = re.compile( r"SRC=(?P[0-9a-fA-F.:]+)\s+" r"DST=(?P[0-9a-fA-F.:]+)\s+" r"(?:PROTO=(?P\w+)\s+)?" r"(?:SPT=(?P\d+)\s+)?" r"(?:DPT=(?P\d+)\s+)?" r".*?(?PACCEPT|DROP|REJECT|DENY|ALLOW|PASS|BLOCK)", re.IGNORECASE, ) # pfSense filterlog: <134>1 2024-01-01T12:00:00+00:00 ... filterlog: ... 4,,,1000000103,em0,match,pass,in,4,0x0,,64,0,0,DF,6,tcp,60,192.168.1.1,10.0.0.1,0,0,0,0,12345,80,0,S,1234567890,,mss PFSENSE_RE = re.compile( r"filterlog:.*?,(?Ppass|block|match|reject),.*?,(?Ptcp|udp|icmp),.*?," r"(?P[0-9a-fA-F.:]+),(?P[0-9a-fA-F.:]+),.*?,(?P\d+)?,(?P\d+)?", re.IGNORECASE, ) # Cisco ASA: %ASA-6-302013: Built outbound TCP connection 123 for outside:10.0.0.1/80 to inside:192.168.1.1/12345 CISCO_ASA_RE = re.compile( r"%ASA-\d+-\d+:\s+.*?(?PBuilt|Teardown|Denied|Deny|Allowed|Permit).*?" r"(?PTCP|UDP|ICMP).*?" r"(?:for\s+(?P\w+):)?(?P[0-9.]+)/(?P\d+)\s+" r"to\s+(?P\w+):(?P[0-9.]+)/(?P\d+)", re.IGNORECASE, ) # Squid: 1704108600.123 200 192.168.1.1 TCP_MISS/200 1234 GET http://example.com/ - DIRECT/93.184.216.34 text/html SQUID_RE = re.compile( r"^(?P[\d.]+)\s+" r"(?P-?\d+)\s+" r"(?P[0-9a-fA-F.:]+)\s+" r"(?P\S+)\s+" r"(?P\d+)\s+" r"(?P\d+)\s+" r"(?P\w+)\s+" r"(?P\S+)\s+", ) # Nginx proxy: 192.168.1.1 - - [01/Jan/2024:12:00:00 +0000] "GET / HTTP/1.1" 200 1234 "-" "curl/7.68.0" NGINX_RE = re.compile( r"^(?P[0-9a-fA-F.:]+)\s+.*?\s+" r"\[(?P[^\]]+)\]\s+" r'"(?P\w+)\s+(?P\S+)\s+HTTP/[\d.]+"\s+' r"(?P\d+)\s+(?P\d+)", ) TIMESTAMP_FORMATS = [ "%d/%b/%Y:%H:%M:%S %z", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%d %H:%M:%S", ] def _to_int(val: str | None) -> int | None: if val is None: return None try: return int(val) except ValueError: return None def _parse_timestamp(ts_str: str) -> datetime | None: for fmt in TIMESTAMP_FORMATS: try: return datetime.strptime(ts_str, fmt) except ValueError: continue # Try unix float try: return datetime.utcfromtimestamp(float(ts_str)) except (ValueError, OSError, OverflowError): pass return None def parse_line(line: str) -> Optional[Dict[str, Any]]: line = line.strip() if not line: return None # iptables / kernel / ufw m = IPTABLES_RE.search(line) if m: return { "log_type": "firewall", "source_ip": m.group("src"), "destination_ip": m.group("dst"), "protocol": m.group("proto"), "source_port": _to_int(m.group("spt")), "destination_port": _to_int(m.group("dpt")), "action": m.group("action").upper(), "url": None, "method": None, "status_code": None, "bytes_size": None, "timestamp": None, "raw_line": line, } m = PFSENSE_RE.search(line) if m: return { "log_type": "firewall", "source_ip": m.group("src"), "destination_ip": m.group("dst"), "protocol": m.group("proto").upper() if m.group("proto") else None, "source_port": _to_int(m.group("spt")), "destination_port": _to_int(m.group("dpt")), "action": m.group("action").upper() if m.group("action") else "UNKNOWN", "url": None, "method": None, "status_code": None, "bytes_size": None, "timestamp": None, "raw_line": line, } m = CISCO_ASA_RE.search(line) if m: return { "log_type": "firewall", "source_ip": m.group("src"), "destination_ip": m.group("dst"), "protocol": m.group("proto").upper() if m.group("proto") else None, "source_port": _to_int(m.group("spt")), "destination_port": _to_int(m.group("dpt")), "action": "ALLOW" if m.group("action") and m.group("action").lower() in ("built", "allowed", "permit") else "DENY", "url": None, "method": None, "status_code": None, "bytes_size": None, "timestamp": None, "raw_line": line, } m = SQUID_RE.match(line) if m: return { "log_type": "proxy", "source_ip": m.group("src"), "destination_ip": None, "protocol": None, "source_port": None, "destination_port": None, "action": None, "url": m.group("url"), "method": m.group("method"), "status_code": _to_int(m.group("status")), "bytes_size": _to_int(m.group("size")), "timestamp": _parse_timestamp(m.group("ts")), "raw_line": line, } m = NGINX_RE.match(line) if m: return { "log_type": "proxy", "source_ip": m.group("src"), "destination_ip": None, "protocol": None, "source_port": None, "destination_port": None, "action": None, "url": m.group("url"), "method": m.group("method"), "status_code": _to_int(m.group("status")), "bytes_size": _to_int(m.group("size")), "timestamp": _parse_timestamp(m.group("ts")), "raw_line": line, } return None def parse_lines(lines: List[str]) -> List[Dict[str, Any]]: results = [] for line in lines: parsed = parse_line(line) if parsed: results.append(parsed) return results