import io from typing import List from fastapi import APIRouter, UploadFile, File, Depends, HTTPException, Query from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from models import LogEntry from services.log_parser import parse_lines from services.analyzer import StatsAnalyzer from services.llm_service import LLMService from database import get_db router = APIRouter(prefix="/api", tags=["logs"]) class StatsResponse(BaseModel): overview: dict top_sources: List[dict] top_destinations: List[dict] top_ports: List[dict] top_urls: List[dict] actions: List[dict] timeline: List[dict] unique_counts: dict class AnalysisResponse(BaseModel): analysis: str class UploadResponse(BaseModel): message: str parsed_lines: int @router.post("/upload", response_model=UploadResponse) async def upload_logs( file: UploadFile = File(...), db: AsyncSession = Depends(get_db), ): if not file.filename: raise HTTPException(status_code=400, detail="No file provided") try: content = (await file.read()).decode("utf-8", errors="ignore") except Exception as e: raise HTTPException(status_code=400, detail=f"Cannot read file: {e}") lines = content.splitlines() parsed = parse_lines(lines) for batch in _batches(parsed, 500): entries = [LogEntry(**row) for row in batch] db.add_all(entries) await db.commit() return UploadResponse( message=f"Uploaded and parsed {len(parsed)} log lines.", parsed_lines=len(parsed), ) @router.get("/stats", response_model=StatsResponse) async def get_stats( limit: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), ): overview = await StatsAnalyzer.overview(db) sources = await StatsAnalyzer.top_sources(db, limit) dests = await StatsAnalyzer.top_destinations(db, limit) ports = await StatsAnalyzer.top_ports(db, limit) urls = await StatsAnalyzer.top_urls(db, limit) actions = await StatsAnalyzer.action_distribution(db) timeline = await StatsAnalyzer.timeline(db, "hour") uniq = await StatsAnalyzer.unique_counts(db) return StatsResponse( overview=overview, top_sources=sources, top_destinations=dests, top_ports=ports, top_urls=urls, actions=actions, timeline=timeline, unique_counts=uniq, ) @router.get("/stats/sources", response_model=List[dict]) async def get_sources(limit: int = Query(20, ge=1, le=500), db: AsyncSession = Depends(get_db)): return await StatsAnalyzer.top_sources(db, limit) @router.get("/stats/destinations", response_model=List[dict]) async def get_destinations(limit: int = Query(20, ge=1, le=500), db: AsyncSession = Depends(get_db)): return await StatsAnalyzer.top_destinations(db, limit) @router.get("/stats/ports", response_model=List[dict]) async def get_ports(limit: int = Query(20, ge=1, le=500), db: AsyncSession = Depends(get_db)): return await StatsAnalyzer.top_ports(db, limit) @router.post("/analyze", response_model=AnalysisResponse) async def analyze_logs( log_type: str = Query("firewall", enum=["firewall", "proxy", "all"]), limit: int = Query(100, ge=1, le=500), db: AsyncSession = Depends(get_db), ): from sqlalchemy import select if log_type != "all": stmt = select(LogEntry).where(LogEntry.log_type == log_type).limit(limit) else: stmt = select(LogEntry).limit(limit) result = await db.execute(stmt) rows = result.scalars().all() if not rows: raise HTTPException(status_code=404, detail="No logs found for analysis") stats = { "total_entries": (await StatsAnalyzer.overview(db))["total_entries"], "unique_sources": (await StatsAnalyzer.unique_counts(db))["unique_sources"], "unique_destinations": (await StatsAnalyzer.unique_counts(db))["unique_destinations"], "top_sources": await StatsAnalyzer.top_sources(db, 10), "top_destinations": await StatsAnalyzer.top_destinations(db, 10), "top_ports": await StatsAnalyzer.top_ports(db, 10), } snippets = [r.raw_line for r in rows] llm = LLMService() analysis = await llm.analyze_logs(snippets, stats) return AnalysisResponse(analysis=analysis) def _batches(data: List[dict], size: int): for i in range(0, len(data), size): yield data[i : i + size]