- NatirisMaster.py aktualisiert - NaturalLanguageEngine optimiert - PsychologyEngine & Arousal-Engine - WebUI (FastAPI) mit Chat-API - Bridges: ComfyUI, Ollama, Vision - Admin-Auth System - .gitignore hinzugefügt (checkpoints, logs, generated)
359 lines
13 KiB
Python
Executable File
359 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
ComfyBridge v3 – Mit IPAdapter für Gesichtskonsistenz
|
||
|
||
Features v3:
|
||
- IPAdapter Integration für Gesichtskonsistenz
|
||
- CLIP Vision für Bildverarbeitung
|
||
- Trust-basierte IPAdapter-Stärke (0.4 - 0.8)
|
||
- Fallback wenn IPAdapter nicht verfügbar
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import time
|
||
import uuid
|
||
import subprocess
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Dict, Optional, List
|
||
|
||
try:
|
||
import requests
|
||
REQUESTS_AVAILABLE = True
|
||
except ImportError:
|
||
REQUESTS_AVAILABLE = False
|
||
print("Warning: requests not available")
|
||
|
||
PATHS = {
|
||
"state": os.path.expanduser("~/natiris/core/natiris_full_state.json"),
|
||
"config": os.path.expanduser("~/natiris/config/character_genesis.json"),
|
||
"output_dir": os.path.expanduser("~/natiris/generated/"),
|
||
"output": os.path.expanduser("~/natiris/bridges/comfy_response.json"),
|
||
"base_images": os.path.expanduser("~/natiris/assets/base_images/"),
|
||
"vision_script": os.path.expanduser("~/natiris/bridges/VisionBridge_v2.py"),
|
||
}
|
||
|
||
COMFY_API = os.getenv("COMFY_API_URL", "http://localhost:8188")
|
||
|
||
TRUST_MAP = [
|
||
{"range": [0, 3], "style": "neutral_portrait", "prompt_add": "neutral expression, professional lighting", "ipadapter_weight": 0.4},
|
||
{"range": [4, 7], "style": "personal_context", "prompt_add": "relaxed expression, warm lighting", "ipadapter_weight": 0.6},
|
||
{"range": [8, 10], "style": "intimate", "prompt_add": "warm smile, intimate lighting", "ipadapter_weight": 0.8}
|
||
]
|
||
|
||
class ComfyBridgeV3:
|
||
def __init__(self):
|
||
self.client_id = f"natiris_{uuid.uuid4().hex[:8]}"
|
||
self.base_images_dir = Path(PATHS["base_images"])
|
||
self.output_dir = Path(PATHS["output_dir"])
|
||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||
self.has_ipadapter = False
|
||
self.check_comfy_nodes()
|
||
|
||
def check_comfy_nodes(self):
|
||
"""Prüft welche ComfyUI Nodes verfügbar"""
|
||
try:
|
||
response = requests.get(f"{COMFY_API}/object_info", timeout=5)
|
||
nodes = response.json()
|
||
self.has_ipadapter = "IPAdapterAdvanced" in nodes or "IPAdapter" in nodes
|
||
self.has_controlnet_openpose = "ControlNetLoader" in nodes
|
||
print(f"IPAdapter verfügbar: {self.has_ipadapter}")
|
||
print(f"ControlNet verfügbar: {self.has_controlnet_openpose}")
|
||
except Exception as e:
|
||
print(f"Node-Check fehlgeschlagen: {e}")
|
||
self.has_ipadapter = False
|
||
self.has_controlnet_openpose = False
|
||
|
||
def check_health(self):
|
||
try:
|
||
response = requests.get(f"{COMFY_API}/system_stats", timeout=5)
|
||
data = response.json()
|
||
return {"reachable": True, "version": data.get("system", {}).get("comfyui_version", "unknown")}
|
||
except Exception as e:
|
||
return {"reachable": False, "error": str(e)}
|
||
|
||
def get_style_config(self, trust):
|
||
for entry in TRUST_MAP:
|
||
if entry["range"][0] <= trust <= entry["range"][1]:
|
||
return entry
|
||
return TRUST_MAP[1]
|
||
|
||
def build_prompt(self, state):
|
||
core = state.get("core_state", {})
|
||
trust = core.get("trust", 7.0)
|
||
mood = core.get("mood", 5)
|
||
|
||
style = self.get_style_config(trust)
|
||
|
||
character = (
|
||
"young woman, natural beauty, warm eyes, "
|
||
"consistent facial features, same person, "
|
||
f"{style['prompt_add']}, "
|
||
f"mood: {'happy' if mood >=6 else 'neutral' if mood >=4 else 'melancholic'}, "
|
||
"high detail, cinematic"
|
||
)
|
||
|
||
negative = (
|
||
"blurry, distorted, deformed, extra limbs, "
|
||
"different person, inconsistent face, ugly"
|
||
)
|
||
|
||
return {
|
||
"positive": character,
|
||
"negative": negative,
|
||
"style": style["style"],
|
||
"trust": trust,
|
||
"width": 512,
|
||
"height": 768 if trust > 7 else 512,
|
||
"ipadapter_weight": style["ipadapter_weight"]
|
||
}
|
||
|
||
def build_workflow_basic(self, prompt_data: Dict) -> Dict:
|
||
"""Basis-Workflow ohne IPAdapter"""
|
||
seed = int(time.time()) % 2147483647
|
||
|
||
return {
|
||
"1": {
|
||
"inputs": {"text": prompt_data["positive"], "clip": ["12", 1]},
|
||
"class_type": "CLIPTextEncode"
|
||
},
|
||
"2": {
|
||
"inputs": {"text": prompt_data["negative"], "clip": ["12", 1]},
|
||
"class_type": "CLIPTextEncode"
|
||
},
|
||
"3": {
|
||
"inputs": {
|
||
"seed": seed,
|
||
"steps": 25,
|
||
"cfg": 7.0,
|
||
"sampler_name": "euler_ancestral",
|
||
"scheduler": "karras",
|
||
"denoise": 1.0,
|
||
"model": ["12", 0],
|
||
"positive": ["1", 0],
|
||
"negative": ["2", 0],
|
||
"latent_image": ["13", 0]
|
||
},
|
||
"class_type": "KSampler"
|
||
},
|
||
"4": {
|
||
"inputs": {"samples": ["3", 0], "vae": ["12", 2]},
|
||
"class_type": "VAEDecode"
|
||
},
|
||
"5": {
|
||
"inputs": {
|
||
"filename_prefix": f"natiris_{prompt_data['style']}",
|
||
"images": ["4", 0]
|
||
},
|
||
"class_type": "SaveImage"
|
||
},
|
||
"12": {
|
||
"inputs": {"ckpt_name": "realisticVisionV60B1_v51HyperVAE.safetensors"},
|
||
"class_type": "CheckpointLoaderSimple"
|
||
},
|
||
"13": {
|
||
"inputs": {
|
||
"width": prompt_data["width"],
|
||
"height": prompt_data["height"],
|
||
"batch_size": 1
|
||
},
|
||
"class_type": "EmptyLatentImage"
|
||
}
|
||
}
|
||
|
||
def build_workflow_ipadapter(self, prompt_data: Dict, face_path: str) -> Dict:
|
||
"""Workflow mit IPAdapter für Gesichtskonsistenz"""
|
||
workflow = self.build_workflow_basic(prompt_data)
|
||
|
||
weight = prompt_data.get("ipadapter_weight", 0.6)
|
||
|
||
# IPAdapter Nodes hinzufügen
|
||
ipadapter_nodes = {
|
||
# Load Face Image
|
||
"20": {
|
||
"inputs": {"image": face_path},
|
||
"class_type": "LoadImage"
|
||
},
|
||
# IPAdapter Model Loader
|
||
"21": {
|
||
"inputs": {"ipadapter_file": "ip-adapter_sd15_light.pth"},
|
||
"class_type": "IPAdapterModelLoader"
|
||
},
|
||
# CLIP Vision Loader
|
||
"22": {
|
||
"inputs": {"clip_name": "CLIP-ViT-H-14-laion2B-s32B-b79K.safetensors"},
|
||
"class_type": "CLIPVisionLoader"
|
||
},
|
||
# IPAdapter Advanced - applied to model before KSampler
|
||
"23": {
|
||
"inputs": {
|
||
"model": ["12", 0],
|
||
"ipadapter": ["21", 0],
|
||
"image": ["20", 0],
|
||
"weight": weight,
|
||
"start_at": 0.0,
|
||
"end_at": 1.0,
|
||
"weight_type": "original"
|
||
},
|
||
"class_type": "IPAdapter"
|
||
}
|
||
}
|
||
|
||
# IPAdapter-Output als Model für KSampler
|
||
workflow["3"]["inputs"]["model"] = ["23", 0]
|
||
|
||
workflow.update(ipadapter_nodes)
|
||
return workflow
|
||
|
||
def build_workflow(self, prompt_data: Dict, base_images: Dict) -> Dict:
|
||
"""Wählt Workflow basierend auf Verfügbarkeit"""
|
||
face_path = base_images.get("face_path", "")
|
||
|
||
if self.has_ipadapter and face_path and os.path.exists(face_path):
|
||
print(f"Using IPAdapter workflow (weight: {prompt_data.get('ipadapter_weight', 0.6)})")
|
||
return self.build_workflow_ipadapter(prompt_data, face_path)
|
||
else:
|
||
print("Using basic workflow (IPAdapter not available)")
|
||
return self.build_workflow_basic(prompt_data)
|
||
|
||
def submit_workflow(self, workflow):
|
||
try:
|
||
data = {"prompt": workflow, "client_id": self.client_id}
|
||
response = requests.post(f"{COMFY_API}/prompt", json=data, timeout=10)
|
||
result = response.json()
|
||
|
||
if "prompt_id" in result:
|
||
return {"success": True, "prompt_id": result["prompt_id"]}
|
||
return {"success": False, "error": result.get("error", "Unknown")}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def poll_result(self, prompt_id, max_wait=300):
|
||
start_time = time.time()
|
||
while time.time() - start_time < max_wait:
|
||
try:
|
||
queue = requests.get(f"{COMFY_API}/queue", timeout=5).json()
|
||
history = requests.get(f"{COMFY_API}/history", timeout=5).json()
|
||
|
||
if prompt_id in history:
|
||
return {"completed": True, "data": history[prompt_id]}
|
||
|
||
print(" ... processing")
|
||
time.sleep(1)
|
||
except Exception as e:
|
||
return {"completed": False, "error": str(e)}
|
||
return {"completed": False, "error": "Timeout"}
|
||
|
||
def download_image(self, filename, subfolder=""):
|
||
try:
|
||
params = {"filename": filename, "subfolder": subfolder, "type": "output"}
|
||
response = requests.get(f"{COMFY_API}/view", params=params, timeout=30)
|
||
return response.content if response.status_code == 200 else None
|
||
except:
|
||
return None
|
||
|
||
def save_image(self, image_data, metadata):
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
filename = f"natiris_{metadata['style']}_{timestamp}.png"
|
||
filepath = self.output_dir / filename
|
||
|
||
try:
|
||
with open(filepath, "wb") as f:
|
||
f.write(image_data)
|
||
|
||
meta_file = self.output_dir / f"{filename}.json"
|
||
with open(meta_file, "w") as f:
|
||
json.dump(metadata, f, indent=2)
|
||
|
||
return {"success": True, "path": str(filepath)}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def generate(self, state_path=None):
|
||
"""Hauptgenerierung mit IPAdapter-Unterstützung"""
|
||
state = {}
|
||
if state_path and os.path.exists(state_path):
|
||
with open(state_path) as f:
|
||
state = json.load(f)
|
||
elif os.path.exists(PATHS["state"]):
|
||
with open(PATHS["state"]) as f:
|
||
state = json.load(f)
|
||
|
||
# Health Check
|
||
health = self.check_health()
|
||
if not health["reachable"]:
|
||
return {"success": False, "error": "ComfyUI not reachable"}
|
||
|
||
# Basisbilder
|
||
face_path = PATHS["base_images"] + "face_base.png"
|
||
base_images = {"face_path": face_path, "face_exists": os.path.exists(face_path)}
|
||
|
||
# Prompt & Workflow
|
||
prompt_data = self.build_prompt(state)
|
||
workflow = self.build_workflow(prompt_data, base_images)
|
||
|
||
# Submit & Poll
|
||
submit = self.submit_workflow(workflow)
|
||
if not submit["success"]:
|
||
return {"success": False, "error": submit.get("error", "Submit failed")}
|
||
|
||
print(f"✓ Submitted: {submit['prompt_id']}")
|
||
poll = self.poll_result(submit["prompt_id"])
|
||
|
||
if not poll["completed"]:
|
||
return {"success": False, "error": "Generation failed"}
|
||
|
||
# Bild extrahieren & speichern
|
||
outputs = poll["data"].get("outputs", {})
|
||
for node_id, node_out in outputs.items():
|
||
if "images" in node_out:
|
||
for img in node_out["images"]:
|
||
image_bytes = self.download_image(img["filename"], img.get("subfolder", ""))
|
||
if image_bytes:
|
||
metadata = {
|
||
"prompt": prompt_data,
|
||
"trust": prompt_data["trust"],
|
||
"style": prompt_data["style"],
|
||
"ipadapter_used": self.has_ipadapter,
|
||
"timestamp": datetime.now(timezone.utc).isoformat()
|
||
}
|
||
save = self.save_image(image_bytes, metadata)
|
||
if save["success"]:
|
||
print(f"✓ Saved: {save['path']}")
|
||
return {"success": True, "image_path": save["path"], "metadata": metadata}
|
||
|
||
return {"success": False, "error": "Image save failed"}
|
||
|
||
|
||
def main():
|
||
import argparse
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--state", default=PATHS["state"])
|
||
parser.add_argument("--check", action="store_true")
|
||
parser.add_argument("--test", action="store_true")
|
||
args = parser.parse_args()
|
||
|
||
bridge = ComfyBridgeV3()
|
||
|
||
if args.check:
|
||
health = bridge.check_health()
|
||
print(f"ComfyUI: {'✓' if health['reachable'] else '✗'}")
|
||
print(f"IPAdapter: {'✓' if bridge.has_ipadapter else '✗'}")
|
||
return
|
||
|
||
if args.test:
|
||
print("ComfyBridge v3 Test")
|
||
print("-" * 40)
|
||
result = bridge.generate(args.state)
|
||
print(json.dumps(result, indent=2))
|
||
return
|
||
|
||
result = bridge.generate(args.state)
|
||
with open(PATHS["output"], "w") as f:
|
||
json.dump(result, f, indent=2)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|