#!/usr/bin/env python3 """ Admin-Auth-Middleware für Natiris API """ import json import os from datetime import datetime, timezone from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse PATHS = { "config": os.path.expanduser("~/natiris/config/admin_config.json"), } def load_admin_config(): try: with open(PATHS["config"]) as f: return json.load(f) except Exception: return {"admin_passphrase": ""} def verify_admin(passphrase): config = load_admin_config() return passphrase == config.get("admin_passphrase", "") def verify_admin_env(): return os.environ.get("NATIRIS_ADMIN_PASS", "") == load_admin_config().get("admin_passphrase", "") class NatirisAPI: def __init__(self, base_app): self.app = base_app self._register_auth_routes() def _register_auth_routes(self): @self.app.post("/api/v1/admin/auth") async def admin_auth(request: Request): try: body = await request.json() passphrase = body.get("passphrase", "") if verify_admin(passphrase): return { "authenticated": True, "admin_user": "admin_user_primary", "max_trust": 10, "max_affection": 10, "can_override": load_admin_config().get("can_override", {}), "timestamp": datetime.now(timezone.utc).isoformat() } else: raise HTTPException(status_code=401, detail="Invalid passphrase") except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @self.app.get("/api/v1/admin/status") async def admin_status(request: Request): # Auto-check via header or query auth_header = request.headers.get("x-admin-passphrase", "") if verify_admin(auth_header): return { "status": "admin", "trust_level": 10, "affection_level": 10, "timestamp": datetime.now(timezone.utc).isoformat() } raise HTTPException(status_code=401, detail="Admin authentication required") def main(): # Quick test import subprocess result = subprocess.run(["python3", "/home/arch_agent_system/natiris/admin/auth.py", "NatirisSicherheit2026!Lübeck"], capture_output=True, text=True) print(result.stdout.strip()) if __name__ == "__main__": main()