Files
log-analyzer-backend/backend/services/llm_service.py
T
2026-05-07 10:14:57 +02:00

53 lines
2.6 KiB
Python

import httpx
from typing import List, Dict, Any
from config import settings
class LLMService:
def __init__(self, base_url: str = settings.ollama_url, model: str = settings.ollama_model):
self.base_url = base_url.rstrip("/")
self.model = model
async def analyze_logs(self, log_snippets: List[str], stats: Dict[str, Any]) -> str:
prompt = self._build_analysis_prompt(log_snippets, stats)
payload = {
"model": self.model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.3,
"num_predict": 1024,
},
}
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(f"{self.base_url}/api/generate", json=payload)
resp.raise_for_status()
data = resp.json()
return data.get("response", "No response from LLM.").strip()
except httpx.HTTPStatusError as e:
return f"LLM HTTP error: {e.response.status_code}"
except httpx.ConnectError:
return "LLM service unreachable. Ensure Ollama is running and accessible."
except Exception as e:
return f"LLM analysis error: {type(e).__name__}: {str(e)}"
def _build_analysis_prompt(self, snippets: List[str], stats: Dict[str, Any]) -> str:
top_sources = ", ".join([f"{s['source_ip']} ({s['count']})" for s in stats.get("top_sources", [])[:5]])
top_dests = ", ".join([f"{d['destination_ip']} ({d['count']})" for d in stats.get("top_destinations", [])[:5]])
top_ports = ", ".join([f"{p['destination_port']} ({p['count']})" for p in stats.get("top_ports", [])[:5]])
lines = "\n".join(snippets[:20])
return (
"You are a network security analyst. Analyze the following firewall/proxy log snippets and statistics. "
"Summarize the most important observations in 3-5 bullet points. Identify potential anomalies, scan patterns, "
"or top talkers. Be concise and factual. Use German or English depending on the log content.\n\n"
f"=== Statistics ===\n"
f"Top Sources: {top_sources}\n"
f"Top Destinations: {top_dests}\n"
f"Top Ports: {top_ports}\n"
f"Total Entries: {stats.get('total_entries', 0)}\n"
f"Unique Sources: {stats.get('unique_sources', 0)}\n"
f"Unique Destinations: {stats.get('unique_destinations', 0)}\n\n"
f"=== Sample Logs ===\n{lines}\n\n"
"=== Analysis ==="
)