Initial commit: Natiris AI Agent Orchestration System
This commit is contained in:
75
admin/api_auth.py
Normal file
75
admin/api_auth.py
Normal file
@@ -0,0 +1,75 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Admin-Auth-Middleware für Natiris API
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from fastapi import FastAPI, Request, HTTPException
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
PATHS = {
|
||||
"config": os.path.expanduser("~/natiris/config/admin_config.json"),
|
||||
}
|
||||
|
||||
def load_admin_config():
|
||||
try:
|
||||
with open(PATHS["config"]) as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
return {"admin_passphrase": ""}
|
||||
|
||||
def verify_admin(passphrase):
|
||||
config = load_admin_config()
|
||||
return passphrase == config.get("admin_passphrase", "")
|
||||
|
||||
def verify_admin_env():
|
||||
return os.environ.get("NATIRIS_ADMIN_PASS", "") == load_admin_config().get("admin_passphrase", "")
|
||||
|
||||
class NatirisAPI:
|
||||
def __init__(self, base_app):
|
||||
self.app = base_app
|
||||
self._register_auth_routes()
|
||||
|
||||
def _register_auth_routes(self):
|
||||
@self.app.post("/api/v1/admin/auth")
|
||||
async def admin_auth(request: Request):
|
||||
try:
|
||||
body = await request.json()
|
||||
passphrase = body.get("passphrase", "")
|
||||
if verify_admin(passphrase):
|
||||
return {
|
||||
"authenticated": True,
|
||||
"admin_user": "admin_user_primary",
|
||||
"max_trust": 10,
|
||||
"max_affection": 10,
|
||||
"can_override": load_admin_config().get("can_override", {}),
|
||||
"timestamp": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
else:
|
||||
raise HTTPException(status_code=401, detail="Invalid passphrase")
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
@self.app.get("/api/v1/admin/status")
|
||||
async def admin_status(request: Request):
|
||||
# Auto-check via header or query
|
||||
auth_header = request.headers.get("x-admin-passphrase", "")
|
||||
if verify_admin(auth_header):
|
||||
return {
|
||||
"status": "admin",
|
||||
"trust_level": 10,
|
||||
"affection_level": 10,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
raise HTTPException(status_code=401, detail="Admin authentication required")
|
||||
|
||||
def main():
|
||||
# Quick test
|
||||
import subprocess
|
||||
result = subprocess.run(["python3", os.path.expanduser("~/natiris/admin/auth.py"), "NatirisSicherheit2026!Lübeck"], capture_output=True, text=True)
|
||||
print(result.stdout.strip())
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user