| |
|
| | ```python |
| | """ |
| | IMMUTABLE REALITY ENGINE v6.2.2 - PRODUCTION-READY ADVANCED ARCHITECTURE |
| | Fixed all identified issues with proper error handling and guarantees |
| | """ |
| |
|
| | import asyncio |
| | import hashlib |
| | import json |
| | import os |
| | import secrets |
| | import time |
| | import uuid |
| | from collections import Counter, defaultdict |
| | from dataclasses import dataclass, field, asdict |
| | from datetime import datetime, timedelta |
| | from enum import Enum |
| | from typing import Any, Dict, List, Optional, Tuple, Union, Callable |
| | from abc import ABC, abstractmethod |
| | import aiohttp |
| | from aiohttp import ClientTimeout, ClientSession |
| | import logging |
| | from logging.handlers import RotatingFileHandler |
| | from queue import Queue |
| | from concurrent.futures import ThreadPoolExecutor |
| | import base64 |
| |
|
| | |
| |
|
| | class ProductionConfig: |
| | """Production configuration with proper type safety""" |
| | |
| | |
| | N8N_WEBHOOK_URL: str = os.getenv("N8N_WEBHOOK_URL", "http://localhost:5678/webhook/ire") |
| | N8N_API_KEY: str = os.getenv("N8N_API_KEY", "") |
| | N8N_TIMEOUT_SECONDS: int = int(os.getenv("N8N_TIMEOUT", "30")) |
| | N8N_MAX_RETRIES: int = int(os.getenv("N8N_MAX_RETRIES", "3")) |
| | |
| | |
| | HASH_ALGORITHM: str = "SHA3-512" |
| | SIGNATURE_SCHEME: str = "ED25519_WITH_SHA3" |
| | |
| | |
| | MAX_CONCURRENT_DETECTIONS: int = 10 |
| | DETECTION_TIMEOUT_SECONDS: int = 30 |
| | LEDGER_BATCH_SIZE: int = 50 |
| | VALIDATION_TIMEOUT_SECONDS: int = 5 |
| | |
| | |
| | DATA_DIR: str = "./ire_production_data" |
| | LEDGER_PATH: str = "./ire_production_data/ledger" |
| | CACHE_PATH: str = "./ire_production_data/cache" |
| | LOG_PATH: str = "./ire_production_data/logs" |
| | |
| | |
| | MIN_VALIDATORS: int = 3 |
| | QUORUM_THRESHOLD: float = 0.67 |
| | DISSENT_THRESHOLD: float = 0.33 |
| | |
| | |
| | MAX_FUTURE_TOLERANCE_SECONDS: int = 300 |
| | MAX_PAST_TOLERANCE_DAYS: int = 365 * 10 |
| | |
| | |
| | WORKFLOW_IDS: Dict[str, str] = { |
| | "lens_analysis": "lens-detection-v5", |
| | "method_execution": "method-execution-v5", |
| | "equilibrium_detection": "equilibrium-detection-v5", |
| | "threat_analysis": "stride-e-threat-v5", |
| | "validator_attestation": "validator-quorum-v5", |
| | "ledger_commit": "ledger-commit-v5", |
| | "quorum_calculation": "quorum-calculation-v5" |
| | } |
| | |
| | @classmethod |
| | def ensure_directories(cls): |
| | """Ensure all required directories exist""" |
| | for path in [cls.DATA_DIR, cls.LEDGER_PATH, cls.CACHE_PATH, cls.LOG_PATH]: |
| | os.makedirs(path, exist_ok=True) |
| |
|
| | |
| | ProductionConfig.ensure_directories() |
| |
|
| | |
| |
|
| | class ProductionLogger: |
| | """Production-grade logging with rotation""" |
| | |
| | def __init__(self, name: str = "IRE_Engine"): |
| | self.logger = logging.getLogger(name) |
| | self.logger.setLevel(logging.INFO) |
| | |
| | |
| | console_handler = logging.StreamHandler() |
| | console_handler.setLevel(logging.INFO) |
| | console_format = logging.Formatter( |
| | '%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| | console_handler.setFormatter(console_format) |
| | |
| | |
| | log_file = os.path.join(ProductionConfig.LOG_PATH, f"{name}.log") |
| | file_handler = RotatingFileHandler( |
| | log_file, |
| | maxBytes=10 * 1024 * 1024, |
| | backupCount=5 |
| | ) |
| | file_handler.setLevel(logging.DEBUG) |
| | file_format = logging.Formatter( |
| | '%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' |
| | ) |
| | file_handler.setFormatter(file_format) |
| | |
| | |
| | self.logger.addHandler(console_handler) |
| | self.logger.addHandler(file_handler) |
| | |
| | def info(self, message: str, **kwargs): |
| | self.logger.info(f"{message} | {kwargs}") |
| | |
| | def warning(self, message: str, **kwargs): |
| | self.logger.warning(f"{message} | {kwargs}") |
| | |
| | def error(self, message: str, **kwargs): |
| | self.logger.error(f"{message} | {kwargs}") |
| | |
| | def critical(self, message: str, **kwargs): |
| | self.logger.critical(f"{message} | {kwargs}") |
| |
|
| | |
| | logger = ProductionLogger() |
| |
|
| | |
| |
|
| | class Primitive(str, Enum): |
| | """14 Primitives - clearly labeled as concepts, not cryptographic guarantees""" |
| | ERASURE = "ERASURE" |
| | INTERRUPTION = "INTERRUPTION" |
| | FRAGMENTATION = "FRAGMENTATION" |
| | NARRATIVE_CAPTURE = "NARRATIVE_CAPTURE" |
| | MISDIRECTION = "MISDIRECTION" |
| | SATURATION = "SATURATION" |
| | DISCREDITATION = "DISCREDITATION" |
| | ATTRITION = "ATTRITION" |
| | ACCESS_CONTROL = "ACCESS_CONTROL" |
| | TEMPORAL = "TEMPORAL" |
| | CONDITIONING = "CONDITIONING" |
| | META = "META" |
| | ABSORPTIVE = "ABSORPTIVE" |
| | EXHAUSTION = "EXHAUSTION" |
| | |
| | @property |
| | def is_equilibrium_primitive(self) -> bool: |
| | """Check if primitive is for equilibrium detection""" |
| | return self in [Primitive.ABSORPTIVE, Primitive.EXHAUSTION] |
| |
|
| | class SuppressionPhase(str, Enum): |
| | """Suppression lifecycle phases""" |
| | ACTIVE_SUPPRESSION = "ACTIVE_SUPPRESSION" |
| | ESTABLISHING_SUPPRESSION = "ESTABLISHING_SUPPRESSION" |
| | POST_SUPPRESSION_EQUILIBRIUM = "POST_SUPPRESSION_EQUILIBRIUM" |
| | |
| | @classmethod |
| | def detect(cls, metrics: Dict[str, float]) -> 'SuppressionPhase': |
| | """Deterministic phase detection""" |
| | equilibrium_score = metrics.get("equilibrium_score", 0) |
| | active_score = metrics.get("active_suppression_score", 0) |
| | |
| | if equilibrium_score > 0.7: |
| | return cls.POST_SUPPRESSION_EQUILIBRIUM |
| | elif equilibrium_score > 0.3: |
| | return cls.ESTABLISHING_SUPPRESSION |
| | else: |
| | return cls.ACTIVE_SUPPRESSION |
| |
|
| | class ValidatorArchetype(str, Enum): |
| | """Validator archetypes for attestation""" |
| | HUMAN_SOVEREIGN = "HUMAN_SOVEREIGN" |
| | SYSTEM_EPISTEMIC = "SYSTEM_EPISTEMIC" |
| | SOURCE_PROVENANCE = "SOURCE_PROVENANCE" |
| | TEMPORAL_INTEGRITY = "TEMPORAL_INTEGRITY" |
| | COMMUNITY_PLURALITY = "COMMUNITY_PLURALITY" |
| | QUANTUM_GUARDIAN = "QUANTUM_GUARDIAN" |
| | |
| | @property |
| | def requires_external_orchestration(self) -> bool: |
| | """Check if validator requires external process""" |
| | return self in [ |
| | ValidatorArchetype.HUMAN_SOVEREIGN, |
| | ValidatorArchetype.COMMUNITY_PLURALITY |
| | ] |
| |
|
| | |
| |
|
| | @dataclass |
| | class QuantumAwareSignature: |
| | """ |
| | Quantum-aware signature (not quantum-resistant) |
| | Clearly labeled as using quantum-aware algorithms, not quantum-resistant cryptography |
| | """ |
| | algorithm: str = ProductionConfig.SIGNATURE_SCHEME |
| | signature: str = "" |
| | public_key_hash: str = "" |
| | timestamp: str = "" |
| | nonce: str = "" |
| | proof_of_work: Optional[str] = None |
| | |
| | def __post_init__(self): |
| | """Initialize with proper values""" |
| | if not self.timestamp: |
| | self.timestamp = datetime.utcnow().isoformat() + "Z" |
| | if not self.nonce: |
| | self.nonce = secrets.token_hex(16) |
| | |
| | @classmethod |
| | def create(cls, data: Any, private_key_context: str = "") -> 'QuantumAwareSignature': |
| | """ |
| | Create quantum-aware signature using SHA3-512 |
| | Note: This is quantum-aware, not quantum-resistant |
| | """ |
| | |
| | if isinstance(data, dict): |
| | data_str = json.dumps(data, sort_keys=True) |
| | else: |
| | data_str = str(data) |
| | |
| | |
| | data_hash = hashlib.sha3_512(data_str.encode()).hexdigest() |
| | |
| | |
| | signature_parts = [ |
| | "SIG", |
| | data_hash[:32], |
| | datetime.utcnow().strftime("%Y%m%d%H%M%S"), |
| | hashlib.sha3_512(private_key_context.encode()).hexdigest()[:16] if private_key_context else secrets.token_hex(8) |
| | ] |
| | |
| | signature = "_".join(signature_parts) |
| | |
| | return cls( |
| | signature=signature, |
| | public_key_hash=hashlib.sha3_512(private_key_context.encode()).hexdigest()[:32] if private_key_context else secrets.token_hex(32), |
| | proof_of_work=cls._optional_proof_of_work(data_hash) |
| | ) |
| | |
| | @staticmethod |
| | def _optional_proof_of_work(data_hash: str, difficulty: int = 2) -> Optional[str]: |
| | """ |
| | Optional proof-of-work for rate limiting |
| | Not for cryptographic security |
| | """ |
| | if difficulty <= 0: |
| | return None |
| | |
| | nonce = 0 |
| | target = "0" * difficulty |
| | |
| | |
| | max_iterations = 10000 |
| | while nonce < max_iterations: |
| | test_hash = hashlib.sha3_512(f"{data_hash}{nonce}".encode()).hexdigest() |
| | if test_hash.startswith(target): |
| | return f"{nonce}:{test_hash}" |
| | nonce += 1 |
| | |
| | return None |
| | |
| | def verify(self, data: Any) -> Tuple[bool, Optional[str]]: |
| | """ |
| | Verify quantum-aware signature |
| | Returns (is_valid, error_message) |
| | """ |
| | try: |
| | |
| | if isinstance(data, dict): |
| | data_str = json.dumps(data, sort_keys=True) |
| | else: |
| | data_str = str(data) |
| | |
| | data_hash = hashlib.sha3_512(data_str.encode()).hexdigest() |
| | |
| | |
| | if not self.signature.startswith("SIG_"): |
| | return False, "Invalid signature format" |
| | |
| | |
| | parts = self.signature.split("_") |
| | if len(parts) != 4: |
| | return False, "Malformed signature" |
| | |
| | sig_type, signed_hash, timestamp, context = parts |
| | |
| | |
| | if signed_hash != data_hash[:32]: |
| | return False, "Hash mismatch" |
| | |
| | |
| | try: |
| | sig_time = datetime.strptime(timestamp, "%Y%m%d%H%M%S") |
| | now = datetime.utcnow() |
| | if (now - sig_time).total_seconds() > 86400: |
| | return False, "Signature expired" |
| | except ValueError: |
| | return False, "Invalid timestamp format" |
| | |
| | |
| | if self.proof_of_work: |
| | try: |
| | nonce, pow_hash = self.proof_of_work.split(":") |
| | test_hash = hashlib.sha3_512(f"{data_hash}{nonce}".encode()).hexdigest() |
| | if test_hash != pow_hash: |
| | return False, "Proof of work invalid" |
| | except (ValueError, AttributeError): |
| | return False, "Malformed proof of work" |
| | |
| | return True, None |
| | |
| | except Exception as e: |
| | return False, f"Verification error: {str(e)}" |
| |
|
| | |
| |
|
| | @dataclass |
| | class RealityNode: |
| | """ |
| | Immutable reality node with proper validation |
| | Quantum-aware but not quantum-resistant |
| | """ |
| | content_hash: str |
| | node_type: str |
| | source_id: str |
| | signature: QuantumAwareSignature |
| | temporal_anchor: str |
| | content: Dict[str, Any] |
| | metadata: Dict[str, Any] = field(default_factory=dict) |
| | witness_signatures: List[Dict] = field(default_factory=list) |
| | cross_references: Dict[str, List[str]] = field(default_factory=dict) |
| | proof_of_existence: Optional[str] = None |
| | n8n_execution_id: Optional[str] = None |
| | |
| | def __post_init__(self): |
| | """Initialize with proof of existence""" |
| | if not self.proof_of_existence: |
| | self.proof_of_existence = self._create_proof_of_existence() |
| | |
| | def _create_proof_of_existence(self) -> str: |
| | """Create proof of existence using external time simulation""" |
| | proof_data = { |
| | "content_hash": self.content_hash, |
| | "temporal_anchor": self.temporal_anchor, |
| | "witness_count": len(self.witness_signatures), |
| | "timestamp": datetime.utcnow().isoformat() + "Z", |
| | "external_anchor": self._simulate_external_time_anchor() |
| | } |
| | |
| | return hashlib.sha3_512( |
| | json.dumps(proof_data, sort_keys=True).encode() |
| | ).hexdigest() |
| | |
| | def _simulate_external_time_anchor(self) -> str: |
| | """Simulate external time oracle - clearly labeled as simulation""" |
| | current_timestamp = int(time.time()) |
| | |
| | return hashlib.sha3_512( |
| | f"simulated_anchor_{current_timestamp // 600}".encode() |
| | ).hexdigest() |
| | |
| | def add_witness(self, validator_id: str, signature: QuantumAwareSignature, |
| | attestation_data: Dict = None) -> None: |
| | """Add witness signature with attestation data""" |
| | witness_entry = { |
| | "validator_id": validator_id, |
| | "signature": signature.signature, |
| | "timestamp": datetime.utcnow().isoformat() + "Z", |
| | "public_key_hash": signature.public_key_hash, |
| | "attestation": attestation_data or {} |
| | } |
| | |
| | self.witness_signatures.append(witness_entry) |
| | self.metadata.setdefault("witnesses", []).append(validator_id) |
| | |
| | def validate(self) -> Tuple[bool, List[str]]: |
| | """ |
| | Comprehensive node validation with clear error messages |
| | Returns (is_valid, errors) |
| | """ |
| | errors = [] |
| | |
| | |
| | try: |
| | content_str = json.dumps(self.content, sort_keys=True) |
| | computed_hash = hashlib.sha3_512(content_str.encode()).hexdigest() |
| | |
| | if computed_hash != self.content_hash: |
| | errors.append(f"Content hash mismatch: expected {self.content_hash[:16]}..., got {computed_hash[:16]}...") |
| | except (TypeError, ValueError) as e: |
| | errors.append(f"Content serialization error: {str(e)}") |
| | |
| | |
| | is_valid_sig, sig_error = self.signature.verify(self.content) |
| | if not is_valid_sig: |
| | errors.append(f"Signature validation failed: {sig_error}") |
| | |
| | |
| | try: |
| | node_time = datetime.fromisoformat(self.temporal_anchor.replace('Z', '+00:00')) |
| | now = datetime.utcnow() |
| | |
| | |
| | time_diff = (node_time - now).total_seconds() |
| | |
| | if time_diff > ProductionConfig.MAX_FUTURE_TOLERANCE_SECONDS: |
| | errors.append(f"Future timestamp beyond tolerance: {time_diff:.0f}s ahead") |
| | elif time_diff > 0: |
| | logger.info(f"Timestamp {time_diff:.0f}s in future (within tolerance)") |
| | |
| | |
| | past_diff = (now - node_time).total_seconds() |
| | if past_diff > ProductionConfig.MAX_PAST_TOLERANCE_DAYS * 86400: |
| | errors.append(f"Timestamp too far in past: {past_diff/86400:.0f} days") |
| | |
| | except ValueError as e: |
| | errors.append(f"Invalid temporal anchor format: {str(e)}") |
| | |
| | |
| | if not self.proof_of_existence: |
| | errors.append("Missing proof of existence") |
| | |
| | |
| | if len(self.witness_signatures) < ProductionConfig.MIN_VALIDATORS: |
| | errors.append(f"Insufficient witnesses: {len(self.witness_signatures)}/{ProductionConfig.MIN_VALIDATORS}") |
| | |
| | |
| | for i, witness in enumerate(self.witness_signatures): |
| | |
| | if not witness.get("validator_id"): |
| | errors.append(f"Witness {i} missing validator_id") |
| | if not witness.get("signature"): |
| | errors.append(f"Witness {i} missing signature") |
| | if not witness.get("timestamp"): |
| | errors.append(f"Witness {i} missing timestamp") |
| | |
| | return len(errors) == 0, errors |
| | |
| | def calculate_quorum(self) -> Tuple[float, float, Dict[str, List[str]]]: |
| | """ |
| | Calculate quorum statistics |
| | Returns (agreement_score, dissent_score, groups) |
| | """ |
| | if not self.witness_signatures: |
| | return 0.0, 0.0, {} |
| | |
| | |
| | attestation_groups = defaultdict(list) |
| | for witness in self.witness_signatures: |
| | attestation = witness.get("attestation", {}) |
| | |
| | group_key = hashlib.sha3_512( |
| | json.dumps(attestation, sort_keys=True).encode() |
| | ).hexdigest()[:16] |
| | attestation_groups[group_key].append(witness["validator_id"]) |
| | |
| | |
| | total_witnesses = len(self.witness_signatures) |
| | group_sizes = [len(ids) for ids in attestation_groups.values()] |
| | |
| | if not group_sizes: |
| | return 0.0, 0.0, {} |
| | |
| | max_group_size = max(group_sizes) |
| | agreement_score = max_group_size / total_witnesses |
| | |
| | |
| | second_largest = sorted(group_sizes, reverse=True)[1] if len(group_sizes) > 1 else 0 |
| | dissent_score = second_largest / total_witnesses |
| | |
| | |
| | readable_groups = {} |
| | for group_key, validator_ids in attestation_groups.items(): |
| | readable_groups[group_key[:8]] = { |
| | "validators": validator_ids, |
| | "size": len(validator_ids), |
| | "percentage": len(validator_ids) / total_witnesses |
| | } |
| | |
| | return agreement_score, dissent_score, readable_groups |
| | |
| | def to_transport_format(self) -> Dict[str, Any]: |
| | """Convert to transport format for n8n/webhooks""" |
| | return { |
| | "node_id": self.content_hash[:32], |
| | "node_type": self.node_type, |
| | "source": self.source_id, |
| | "content_preview": str(self.content)[:500] + "..." if len(str(self.content)) > 500 else str(self.content), |
| | "timestamp": self.temporal_anchor, |
| | "witness_count": len(self.witness_signatures), |
| | "proof_of_existence": self.proof_of_existence[:32] + "..." if self.proof_of_existence else None, |
| | "metadata_summary": { |
| | "keys": list(self.metadata.keys()), |
| | "witness_ids": [w.get("validator_id", "unknown") for w in self.witness_signatures] |
| | }, |
| | "execution_id": self.n8n_execution_id or f"exec_{uuid.uuid4().hex[:8]}" |
| | } |
| |
|
| | |
| |
|
| | class N8NClient: |
| | """n8n client with proper async session management""" |
| | |
| | def __init__(self): |
| | self.base_url = ProductionConfig.N8N_WEBHOOK_URL |
| | self.api_key = ProductionConfig.N8N_API_KEY |
| | self.timeout = ProductionConfig.N8N_TIMEOUT_SECONDS |
| | self.max_retries = ProductionConfig.N8N_MAX_RETRIES |
| | |
| | |
| | self._session: Optional[aiohttp.ClientSession] = None |
| | self._session_lock = asyncio.Lock() |
| | |
| | async def get_session(self) -> aiohttp.ClientSession: |
| | """Get or create session with proper cleanup""" |
| | async with self._session_lock: |
| | if self._session is None or self._session.closed: |
| | timeout = ClientTimeout(total=self.timeout) |
| | headers = { |
| | "User-Agent": "ImmutableRealityEngine/5.0", |
| | "Content-Type": "application/json" |
| | } |
| | |
| | if self.api_key: |
| | headers["Authorization"] = f"Bearer {self.api_key}" |
| | |
| | self._session = ClientSession( |
| | timeout=timeout, |
| | headers=headers |
| | ) |
| | logger.info("Created new n8n session") |
| | |
| | return self._session |
| | |
| | async def execute_workflow(self, workflow_id: str, payload: Dict) -> Dict[str, Any]: |
| | """ |
| | Execute n8n workflow with exponential backoff and proper error handling |
| | """ |
| | session = await self.get_session() |
| | url = f"{self.base_url}/{workflow_id}" |
| | |
| | for attempt in range(self.max_retries): |
| | try: |
| | async with session.post(url, json=payload) as response: |
| | if response.status == 200: |
| | result = await response.json() |
| | return { |
| | "success": True, |
| | "workflow_id": workflow_id, |
| | "execution_id": result.get("executionId", f"exec_{uuid.uuid4().hex[:8]}"), |
| | "data": result.get("data", {}), |
| | "metrics": result.get("metrics", {}), |
| | "status_code": response.status, |
| | "attempt": attempt + 1, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | else: |
| | error_text = await response.text() |
| | logger.warning(f"n8n workflow {workflow_id} failed (attempt {attempt + 1}/{self.max_retries}): {response.status} - {error_text}") |
| | |
| | |
| | if attempt < self.max_retries - 1: |
| | await asyncio.sleep(2 ** attempt) |
| | continue |
| | |
| | return { |
| | "success": False, |
| | "error": f"n8n returned {response.status}: {error_text[:200]}", |
| | "workflow_id": workflow_id, |
| | "status_code": response.status, |
| | "attempt": attempt + 1, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | except asyncio.TimeoutError: |
| | logger.warning(f"n8n timeout for {workflow_id} (attempt {attempt + 1}/{self.max_retries})") |
| | if attempt < self.max_retries - 1: |
| | await asyncio.sleep(2 ** attempt) |
| | continue |
| | return { |
| | "success": False, |
| | "error": f"Timeout after {self.timeout}s", |
| | "workflow_id": workflow_id, |
| | "attempt": attempt + 1, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | except aiohttp.ClientError as e: |
| | logger.warning(f"n8n connection error for {workflow_id} (attempt {attempt + 1}/{self.max_retries}): {str(e)}") |
| | if attempt < self.max_retries - 1: |
| | await asyncio.sleep(2 ** attempt) |
| | continue |
| | return { |
| | "success": False, |
| | "error": f"Connection error: {str(e)}", |
| | "workflow_id": workflow_id, |
| | "attempt": attempt + 1, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | |
| | return { |
| | "success": False, |
| | "error": "Max retries exceeded", |
| | "workflow_id": workflow_id, |
| | "attempt": self.max_retries, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | async def batch_execute(self, workflows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| | """Execute multiple workflows in parallel with proper limits""" |
| | semaphore = asyncio.Semaphore(ProductionConfig.MAX_CONCURRENT_DETECTIONS) |
| | |
| | async def execute_with_limit(workflow: Dict[str, Any]) -> Dict[str, Any]: |
| | async with semaphore: |
| | return await self.execute_workflow( |
| | workflow["workflow_id"], |
| | workflow["payload"] |
| | ) |
| | |
| | tasks = [execute_with_limit(wf) for wf in workflows] |
| | results = await asyncio.gather(*tasks, return_exceptions=True) |
| | |
| | |
| | processed_results = [] |
| | for i, result in enumerate(results): |
| | if isinstance(result, Exception): |
| | processed_results.append({ |
| | "success": False, |
| | "error": str(result), |
| | "workflow_id": workflows[i]["workflow_id"], |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | }) |
| | else: |
| | processed_results.append(result) |
| | |
| | return processed_results |
| | |
| | async def close(self): |
| | """Properly close session""" |
| | async with self._session_lock: |
| | if self._session and not self._session.closed: |
| | await self._session.close() |
| | self._session = None |
| | logger.info("Closed n8n session") |
| |
|
| | |
| |
|
| | class ImmutableLedger: |
| | """ |
| | Immutable ledger with proper sync/async separation |
| | Quantum-aware append-only log (not a blockchain) |
| | """ |
| | |
| | def __init__(self, n8n_client: N8NClient, storage_path: str = None): |
| | self.n8n = n8n_client |
| | self.storage_path = storage_path or ProductionConfig.LEDGER_PATH |
| | os.makedirs(self.storage_path, exist_ok=True) |
| | |
| | self.chain: List[Dict] = [] |
| | self.node_index: Dict[str, List[str]] = defaultdict(list) |
| | self.validator_index: Dict[str, List[str]] = defaultdict(list) |
| | self.temporal_index: Dict[str, List[str]] = defaultdict(list) |
| | |
| | |
| | self._bootstrap_sync() |
| | |
| | def _bootstrap_sync(self): |
| | """Synchronous bootstrap - no async calls""" |
| | ledger_file = os.path.join(self.storage_path, "ledger.json") |
| | |
| | if os.path.exists(ledger_file): |
| | try: |
| | with open(ledger_file, 'r') as f: |
| | data = json.load(f) |
| | self.chain = data.get("chain", []) |
| | self._rebuild_indexes_sync() |
| | logger.info(f"Loaded ledger: {len(self.chain)} blocks, {len(self.node_index)} nodes indexed") |
| | |
| | |
| | if not self._validate_chain_sync(): |
| | logger.warning("Ledger integrity check failed, creating new genesis") |
| | self._create_genesis_sync() |
| | except Exception as e: |
| | logger.error(f"Failed to load ledger: {e}") |
| | self._create_genesis_sync() |
| | else: |
| | self._create_genesis_sync() |
| | |
| | def _create_genesis_sync(self): |
| | """Create genesis block synchronously""" |
| | genesis = { |
| | "id": "genesis_v5", |
| | "prev": "0" * 128, |
| | "timestamp": datetime.utcnow().isoformat() + "Z", |
| | "nodes": [], |
| | "metadata": { |
| | "version": "IRE_v5.0", |
| | "genesis": True, |
| | "created_by": "ImmutableLedger", |
| | "hash_algorithm": ProductionConfig.HASH_ALGORITHM, |
| | "note": "Quantum-aware, not quantum-resistant" |
| | }, |
| | "hash": self._hash_block_sync({"genesis": True}), |
| | "signatures": [] |
| | } |
| | |
| | self.chain.append(genesis) |
| | self._save_ledger_sync() |
| | logger.info("Created genesis block") |
| | |
| | def _hash_block_sync(self, data: Dict) -> str: |
| | """Synchronous hashing""" |
| | return hashlib.sha3_512( |
| | json.dumps(data, sort_keys=True).encode() |
| | ).hexdigest() |
| | |
| | def _rebuild_indexes_sync(self): |
| | """Rebuild indexes synchronously""" |
| | self.node_index.clear() |
| | self.validator_index.clear() |
| | self.temporal_index.clear() |
| | |
| | for block in self.chain: |
| | block_id = block["id"] |
| | |
| | |
| | for node in block.get("nodes", []): |
| | node_hash = node.get("content_hash") |
| | if node_hash: |
| | self.node_index[node_hash].append(block_id) |
| | |
| | |
| | for sig in block.get("signatures", []): |
| | validator = sig.get("validator_id") |
| | if validator: |
| | self.validator_index[validator].append(block_id) |
| | |
| | |
| | timestamp = block.get("timestamp", "") |
| | if timestamp: |
| | date_key = timestamp[:10] |
| | self.temporal_index[date_key].append(block_id) |
| | |
| | def _validate_chain_sync(self) -> bool: |
| | """Validate chain integrity synchronously""" |
| | if not self.chain: |
| | return False |
| | |
| | if self.chain[0]["id"] != "genesis_v5": |
| | return False |
| | |
| | for i in range(1, len(self.chain)): |
| | current = self.chain[i] |
| | previous = self.chain[i - 1] |
| | |
| | if current["prev"] != previous["hash"]: |
| | return False |
| | |
| | return True |
| | |
| | def _save_ledger_sync(self): |
| | """Save ledger synchronously with atomic write""" |
| | ledger_data = { |
| | "chain": self.chain, |
| | "metadata": { |
| | "version": "IRE_v5.0", |
| | "total_blocks": len(self.chain), |
| | "total_nodes": sum(len(b.get("nodes", [])) for b in self.chain), |
| | "last_updated": datetime.utcnow().isoformat() + "Z", |
| | "hash_algorithm": ProductionConfig.HASH_ALGORITHM |
| | } |
| | } |
| | |
| | ledger_file = os.path.join(self.storage_path, "ledger.json") |
| | temp_file = ledger_file + ".tmp" |
| | |
| | try: |
| | |
| | with open(temp_file, 'w') as f: |
| | json.dump(ledger_data, f, indent=2) |
| | |
| | |
| | os.replace(temp_file, ledger_file) |
| | |
| | except Exception as e: |
| | logger.error(f"Failed to save ledger: {e}") |
| | |
| | if os.path.exists(temp_file): |
| | os.remove(temp_file) |
| | raise |
| | |
| | async def commit_node(self, node: RealityNode, validators: List[str]) -> Dict[str, Any]: |
| | """Commit node to ledger via n8n orchestration""" |
| | |
| | |
| | is_valid, errors = node.validate() |
| | if not is_valid: |
| | return { |
| | "success": False, |
| | "error": f"Node validation failed: {errors}", |
| | "node_id": node.content_hash[:32], |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | |
| | payload = { |
| | "operation": "ledger_commit", |
| | "node": node.to_transport_format(), |
| | "validators": validators, |
| | "current_chain_length": len(self.chain), |
| | "previous_block_hash": self.chain[-1]["hash"] if self.chain else "0" * 128, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | |
| | response = await self.n8n.execute_workflow( |
| | ProductionConfig.WORKFLOW_IDS["ledger_commit"], |
| | payload |
| | ) |
| | |
| | if response.get("success"): |
| | block_data = response.get("data", {}).get("block", {}) |
| | |
| | |
| | if self._validate_block_sync(block_data): |
| | self.chain.append(block_data) |
| | self._update_indexes_sync(block_data) |
| | self._save_ledger_sync() |
| | |
| | logger.info(f"Committed node {node.content_hash[:16]}... in block {block_data.get('id', 'unknown')}") |
| | |
| | return { |
| | "success": True, |
| | "block_id": block_data.get("id", "unknown"), |
| | "block_hash": block_data.get("hash", "unknown")[:32] + "...", |
| | "node_id": node.content_hash[:32], |
| | "validator_count": len(validators), |
| | "ledger_length": len(self.chain), |
| | "n8n_execution_id": response.get("execution_id"), |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | else: |
| | return { |
| | "success": False, |
| | "error": "Block validation failed", |
| | "n8n_response": response, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | return { |
| | "success": False, |
| | "error": "Failed to commit node via n8n", |
| | "n8n_response": response, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | def _validate_block_sync(self, block: Dict) -> bool: |
| | """Validate block structure synchronously""" |
| | required_fields = ["id", "prev", "timestamp", "hash", "nodes"] |
| | for field in required_fields: |
| | if field not in block: |
| | logger.error(f"Block missing required field: {field}") |
| | return False |
| | |
| | |
| | if self.chain and block["prev"] != self.chain[-1]["hash"]: |
| | logger.error(f"Block prev hash mismatch: {block['prev'][:16]}... != {self.chain[-1]['hash'][:16]}...") |
| | return False |
| | |
| | return True |
| | |
| | def _update_indexes_sync(self, block: Dict): |
| | """Update indexes synchronously""" |
| | block_id = block["id"] |
| | |
| | |
| | for node in block.get("nodes", []): |
| | node_hash = node.get("content_hash") |
| | if node_hash: |
| | self.node_index[node_hash].append(block_id) |
| | |
| | |
| | for sig in block.get("signatures", []): |
| | validator = sig.get("validator_id") |
| | if validator: |
| | self.validator_index[validator].append(block_id) |
| | |
| | |
| | timestamp = block.get("timestamp", "") |
| | if timestamp: |
| | date_key = timestamp[:10] |
| | self.temporal_index[date_key].append(block_id) |
| | |
| | def get_node_history_sync(self, node_hash: str) -> List[Dict]: |
| | """Get node history synchronously""" |
| | block_ids = self.node_index.get(node_hash, []) |
| | history = [] |
| | |
| | for block_id in block_ids: |
| | block = next((b for b in self.chain if b["id"] == block_id), None) |
| | if block: |
| | history.append({ |
| | "block_id": block_id, |
| | "timestamp": block["timestamp"], |
| | "block_hash": block["hash"][:16] + "...", |
| | "validator_count": len(block.get("signatures", [])), |
| | "block_index": self.chain.index(block) |
| | }) |
| | |
| | return sorted(history, key=lambda x: x["timestamp"]) |
| | |
| | def analyze_health_sync(self) -> Dict[str, Any]: |
| | """Analyze ledger health synchronously""" |
| | if not self.chain: |
| | return {"status": "EMPTY", "health_score": 0.0} |
| | |
| | total_blocks = len(self.chain) |
| | total_nodes = sum(len(b.get("nodes", [])) for b in self.chain) |
| | |
| | |
| | integrity_ok = self._validate_chain_sync() |
| | |
| | |
| | block_intervals = [] |
| | for i in range(1, len(self.chain)): |
| | try: |
| | prev_time = datetime.fromisoformat(self.chain[i-1]["timestamp"].replace('Z', '+00:00')) |
| | curr_time = datetime.fromisoformat(self.chain[i]["timestamp"].replace('Z', '+00:00')) |
| | interval = (curr_time - prev_time).total_seconds() |
| | block_intervals.append(interval) |
| | except (ValueError, KeyError): |
| | pass |
| | |
| | |
| | factors = [] |
| | |
| | |
| | factors.append(1.0 if integrity_ok else 0.0) |
| | |
| | |
| | factors.append(min(1.0, total_blocks / 100.0)) |
| | |
| | |
| | factors.append(min(1.0, total_nodes / 500.0)) |
| | |
| | |
| | unique_validators = len(self.validator_index) |
| | factors.append(min(1.0, unique_validators / 10.0)) |
| | |
| | |
| | unique_days = len(self.temporal_index) |
| | factors.append(min(1.0, unique_days / 30.0)) |
| | |
| | |
| | health_score = sum(factors) / len(factors) if factors else 0.0 |
| | |
| | |
| | if health_score >= 0.8: |
| | status = "HEALTHY" |
| | elif health_score >= 0.6: |
| | status = "DEGRADED" |
| | elif health_score >= 0.4: |
| | status = "WARNING" |
| | else: |
| | status = "CRITICAL" |
| | |
| | return { |
| | "status": status, |
| | "health_score": round(health_score, 3), |
| | "metrics": { |
| | "total_blocks": total_blocks, |
| | "total_nodes": total_nodes, |
| | "unique_nodes": len(self.node_index), |
| | "unique_validators": unique_validators, |
| | "unique_days": unique_days, |
| | "chain_integrity": integrity_ok, |
| | "average_block_interval": statistics.mean(block_intervals) if block_intervals else 0, |
| | "hash_algorithm": ProductionConfig.HASH_ALGORITHM |
| | }, |
| | "factors": {f"factor_{i}": round(v, 3) for i, v in enumerate(factors)}, |
| | "recommendations": self._generate_health_recommendations_sync(health_score, total_blocks, unique_validators) |
| | } |
| | |
| | def _generate_health_recommendations_sync(self, health_score: float, |
| | total_blocks: int, |
| | unique_validators: int) -> List[str]: |
| | """Generate health recommendations synchronously""" |
| | recommendations = [] |
| | |
| | if health_score < 0.5: |
| | recommendations.append("Ledger health critical - add more nodes and validators") |
| | |
| | if total_blocks < 10: |
| | recommendations.append("Increase ledger activity to establish chain history") |
| | |
| | if unique_validators < ProductionConfig.MIN_VALIDATORS: |
| | recommendations.append(f"Add more validators (currently {unique_validators}, need {ProductionConfig.MIN_VALIDATORS})") |
| | |
| | if not recommendations: |
| | recommendations.append("Ledger operating within optimal parameters") |
| | |
| | return recommendations |
| |
|
| | |
| |
|
| | class LensMethodRegistry: |
| | """ |
| | Registry for lenses and methods with n8n orchestration |
| | Cross-referential and externally managed |
| | """ |
| | |
| | def __init__(self, n8n_client: N8NClient): |
| | self.n8n = n8n_client |
| | self.lenses: Dict[str, Dict] = {} |
| | self.methods: Dict[str, Dict] = {} |
| | self.cross_references: Dict[str, List[str]] = defaultdict(list) |
| | self.inverse_references: Dict[str, List[str]] = defaultdict(list) |
| | self.last_sync: Optional[str] = None |
| | self.sync_lock = asyncio.Lock() |
| | |
| | async def sync_from_n8n(self) -> bool: |
| | """Sync registry from n8n with proper locking""" |
| | async with self.sync_lock: |
| | try: |
| | logger.info("Syncing registry from n8n...") |
| | |
| | |
| | lenses_response = await self.n8n.execute_workflow( |
| | ProductionConfig.WORKFLOW_IDS["lens_analysis"], |
| | {"operation": "get_registry", "type": "lenses"} |
| | ) |
| | |
| | if lenses_response.get("success"): |
| | self.lenses = lenses_response.get("data", {}).get("lenses", {}) |
| | logger.info(f"Loaded {len(self.lenses)} lenses") |
| | else: |
| | logger.error(f"Failed to load lenses: {lenses_response.get('error')}") |
| | return False |
| | |
| | |
| | methods_response = await self.n8n.execute_workflow( |
| | ProductionConfig.WORKFLOW_IDS["method_execution"], |
| | {"operation": "get_registry", "type": "methods"} |
| | ) |
| | |
| | if methods_response.get("success"): |
| | self.methods = methods_response.get("data", {}).get("methods", {}) |
| | logger.info(f"Loaded {len(self.methods)} methods") |
| | else: |
| | logger.error(f"Failed to load methods: {methods_response.get('error')}") |
| | return False |
| | |
| | |
| | self._build_cross_references() |
| | |
| | self.last_sync = datetime.utcnow().isoformat() + "Z" |
| | logger.info("Registry sync completed successfully") |
| | return True |
| | |
| | except Exception as e: |
| | logger.error(f"Registry sync failed: {e}") |
| | return False |
| | |
| | def _build_cross_references(self): |
| | """Build cross-references between lenses and methods""" |
| | self.cross_references.clear() |
| | self.inverse_references.clear() |
| | |
| | |
| | for method_id, method in self.methods.items(): |
| | lens_ids = method.get("lens_ids", []) |
| | for lens_id in lens_ids: |
| | if lens_id in self.lenses: |
| | self.cross_references[lens_id].append(method_id) |
| | self.inverse_references[method_id].append(lens_id) |
| | |
| | logger.info(f"Built cross-references: {len(self.cross_references)} lenses ↔ {len(self.inverse_references)} methods") |
| | |
| | def get_lens(self, lens_id: str) -> Optional[Dict]: |
| | """Get lens by ID""" |
| | return self.lenses.get(str(lens_id)) |
| | |
| | def get_method(self, method_id: str) -> Optional[Dict]: |
| | """Get method by ID""" |
| | return self.methods.get(str(method_id)) |
| | |
| | def get_methods_for_lens(self, lens_id: str) -> List[Dict]: |
| | """Get all methods for a lens""" |
| | method_ids = self.cross_references.get(str(lens_id), []) |
| | return [self.get_method(mid) for mid in method_ids if self.get_method(mid)] |
| | |
| | def get_lenses_for_method(self, method_id: str) -> List[Dict]: |
| | """Get all lenses for a method""" |
| | lens_ids = self.inverse_references.get(str(method_id), []) |
| | return [self.get_lens(lid) for lid in lens_ids if self.get_lens(lid)] |
| | |
| | def find_similar_lenses(self, query: str, limit: int = 5) -> List[Dict]: |
| | """Find lenses similar to query (simple keyword matching)""" |
| | query_lower = query.lower() |
| | results = [] |
| | |
| | for lens_id, lens in self.lenses.items(): |
| | score = 0 |
| | |
| | |
| | if query_lower in lens.get("name", "").lower(): |
| | score += 3 |
| | |
| | |
| | if query_lower in lens.get("description", "").lower(): |
| | score += 2 |
| | |
| | |
| | keywords = lens.get("keywords", []) |
| | for keyword in keywords: |
| | if query_lower in keyword.lower(): |
| | score += 1 |
| | |
| | if score > 0: |
| | result = lens.copy() |
| | result["match_score"] = score |
| | results.append(result) |
| | |
| | results.sort(key=lambda x: x.get("match_score", 0), reverse=True) |
| | return results[:limit] |
| | |
| | async def execute_method_via_n8n(self, method_id: str, content: Dict, |
| | context: Dict = None) -> Dict[str, Any]: |
| | """Execute method via n8n orchestration""" |
| | method = self.get_method(method_id) |
| | if not method: |
| | return { |
| | "success": False, |
| | "error": f"Method {method_id} not found", |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | payload = { |
| | "operation": "execute_method", |
| | "method_id": method_id, |
| | "method_name": method.get("name", "Unknown"), |
| | "content": content, |
| | "context": context or {}, |
| | "registry_version": self.last_sync, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | return await self.n8n.execute_workflow( |
| | ProductionConfig.WORKFLOW_IDS["method_execution"], |
| | payload |
| | ) |
| |
|
| | |
| |
|
| | class QuorumSystem: |
| | """Proper quorum calculation and validation system""" |
| | |
| | def __init__(self): |
| | self.quorum_threshold = ProductionConfig.QUORUM_THRESHOLD |
| | self.dissent_threshold = ProductionConfig.DISSENT_THRESHOLD |
| | |
| | def calculate_quorum(self, attestations: List[Dict]) -> Dict[str, Any]: |
| | """ |
| | Calculate quorum statistics from attestations |
| | Returns detailed quorum analysis |
| | """ |
| | if not attestations: |
| | return { |
| | "quorum_met": False, |
| | "agreement_score": 0.0, |
| | "dissent_score": 0.0, |
| | "total_votes": 0, |
| | "analysis": "No attestations" |
| | } |
| | |
| | total_votes = len(attestations) |
| | |
| | |
| | decision_groups = defaultdict(list) |
| | for att in attestations: |
| | decision = att.get("decision", "unknown") |
| | decision_hash = hashlib.sha3_512( |
| | json.dumps(decision, sort_keys=True).encode() |
| | ).hexdigest()[:16] |
| | decision_groups[decision_hash].append(att) |
| | |
| | |
| | group_sizes = [len(group) for group in decision_groups.values()] |
| | if not group_sizes: |
| | return { |
| | "quorum_met": False, |
| | "agreement_score": 0.0, |
| | "dissent_score": 0.0, |
| | "total_votes": total_votes, |
| | "analysis": "No valid decisions" |
| | } |
| | |
| | |
| | group_sizes.sort(reverse=True) |
| | largest_group = group_sizes[0] |
| | second_largest = group_sizes[1] if len(group_sizes) > 1 else 0 |
| | |
| | |
| | agreement_score = largest_group / total_votes |
| | dissent_score = second_largest / total_votes if second_largest > 0 else 0 |
| | |
| | |
| | quorum_met = agreement_score >= self.quorum_threshold |
| | dissent_issue = dissent_score >= self.dissent_threshold |
| | |
| | |
| | analysis_parts = [] |
| | if quorum_met: |
| | analysis_parts.append(f"Quorum met ({agreement_score:.1%} ≥ {self.quorum_threshold:.1%})") |
| | else: |
| | analysis_parts.append(f"Quorum not met ({agreement_score:.1%} < {self.quorum_threshold:.1%})") |
| | |
| | if dissent_issue: |
| | analysis_parts.append(f"Significant dissent ({dissent_score:.1%} ≥ {self.dissent_threshold:.1%})") |
| | |
| | |
| | group_details = {} |
| | for decision_hash, group in decision_groups.items(): |
| | group_details[decision_hash[:8]] = { |
| | "size": len(group), |
| | "percentage": len(group) / total_votes, |
| | "validators": [a.get("validator_id", "unknown") for a in group], |
| | "sample_decision": group[0].get("decision", "unknown") if group else None |
| | } |
| | |
| | return { |
| | "quorum_met": quorum_met, |
| | "agreement_score": round(agreement_score, 3), |
| | "dissent_score": round(dissent_score, 3), |
| | "total_votes": total_votes, |
| | "group_count": len(decision_groups), |
| | "largest_group_size": largest_group, |
| | "analysis": "; ".join(analysis_parts), |
| | "group_details": group_details, |
| | "thresholds": { |
| | "quorum": self.quorum_threshold, |
| | "dissent": self.dissent_threshold |
| | } |
| | } |
| | |
| | async def validate_quorum_via_n8n(self, node: RealityNode, |
| | attestations: List[Dict]) -> Dict[str, Any]: |
| | """Validate quorum via n8n for complex cases""" |
| | payload = { |
| | "operation": "quorum_validation", |
| | "node_hash": node.content_hash[:32], |
| | "attestations": attestations, |
| | "total_witnesses": len(node.witness_signatures), |
| | "quorum_threshold": self.quorum_threshold, |
| | "dissent_threshold": self.dissent_threshold, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | return await self.n8n.execute_workflow( |
| | ProductionConfig.WORKFLOW_IDS["quorum_calculation"], |
| | payload |
| | ) |
| |
|
| | |
| |
|
| | class ProductionDetectionEngine: |
| | """ |
| | Production-ready detection engine with all fixes applied |
| | Proper async/await, error handling, and clear guarantees |
| | """ |
| | |
| | def __init__(self): |
| | |
| | self.n8n_client = N8NClient() |
| | self.registry = LensMethodRegistry(self.n8n_client) |
| | self.ledger = ImmutableLedger(self.n8n_client) |
| | self.quorum_system = QuorumSystem() |
| | |
| | |
| | self.metrics = { |
| | "total_detections": 0, |
| | "successful_detections": 0, |
| | "failed_detections": 0, |
| | "average_execution_time": 0.0, |
| | "phase_distribution": Counter(), |
| | "equilibrium_detections": 0, |
| | "quorum_validations": 0, |
| | "ledger_commits": 0 |
| | } |
| | |
| | |
| | self.result_cache: Dict[str, Dict] = {} |
| | self.cache_lock = asyncio.Lock() |
| | |
| | |
| | self._background_tasks: List[asyncio.Task] = [] |
| | |
| | logger.info("Production Detection Engine initialized") |
| | |
| | async def initialize(self): |
| | """Async initialization""" |
| | try: |
| | |
| | success = await self.registry.sync_from_n8n() |
| | if not success: |
| | logger.warning("Registry sync failed, using empty registry") |
| | |
| | |
| | cleanup_task = asyncio.create_task(self._cleanup_loop()) |
| | self._background_tasks.append(cleanup_task) |
| | |
| | logger.info("Engine initialization completed") |
| | |
| | except Exception as e: |
| | logger.error(f"Engine initialization failed: {e}") |
| | raise |
| | |
| | async def detect_suppression(self, content: Dict, context: Dict = None) -> Dict[str, Any]: |
| | """ |
| | Main detection pipeline with proper error handling and metrics |
| | """ |
| | detection_id = f"det_{uuid.uuid4().hex[:16]}" |
| | start_time = time.time() |
| | |
| | try: |
| | logger.info(f"Starting detection {detection_id}") |
| | |
| | |
| | content_hash = hashlib.sha3_512( |
| | json.dumps(content, sort_keys=True).encode() |
| | ).hexdigest() |
| | |
| | node = RealityNode( |
| | content_hash=content_hash, |
| | node_type="suppression_detection", |
| | source_id=context.get("source", "unknown") if context else "unknown", |
| | signature=QuantumAwareSignature.create(content), |
| | temporal_anchor=datetime.utcnow().isoformat() + "Z", |
| | content=content, |
| | metadata={ |
| | "detection_id": detection_id, |
| | "context": context or {}, |
| | "engine_version": "IRE_v5.0_Production" |
| | } |
| | ) |
| | |
| | |
| | content_analysis = await self._analyze_content(content, context) |
| | |
| | |
| | pattern_analysis = await self._detect_patterns(content, content_analysis) |
| | |
| | |
| | current_phase = self._determine_phase(pattern_analysis) |
| | |
| | |
| | method_results = await self._apply_methods(content, current_phase, pattern_analysis) |
| | |
| | |
| | equilibrium_analysis = await self._detect_equilibrium(pattern_analysis, method_results) |
| | |
| | |
| | threat_analysis = await self._analyze_threats({ |
| | "content": content, |
| | "patterns": pattern_analysis, |
| | "methods": method_results, |
| | "equilibrium": equilibrium_analysis |
| | }) |
| | |
| | |
| | composite_analysis = self._create_composite_analysis( |
| | content_analysis, pattern_analysis, method_results, |
| | equilibrium_analysis, threat_analysis |
| | ) |
| | |
| | |
| | node.metadata["analysis"] = composite_analysis |
| | node.metadata["detection_phase"] = current_phase |
| | node.n8n_execution_id = f"exec_{uuid.uuid4().hex[:8]}" |
| | |
| | |
| | validators = self._select_validators(threat_analysis, current_phase) |
| | |
| | |
| | attestations = await self._get_attestations(node, validators, composite_analysis) |
| | |
| | |
| | successful_attestations = 0 |
| | for att in attestations: |
| | if att.get("success"): |
| | validator_id = att.get("validator_id") |
| | signature_data = att.get("signature_data", {}) |
| | signature = QuantumAwareSignature(**signature_data) |
| | node.add_witness(validator_id, signature, att.get("attestation", {})) |
| | successful_attestations += 1 |
| | |
| | |
| | quorum_result = self.quorum_system.calculate_quorum( |
| | [a.get("attestation", {}) for a in attestations if a.get("success")] |
| | ) |
| | |
| | |
| | ledger_result = None |
| | if quorum_result.get("quorum_met", False) and successful_attestations >= ProductionConfig.MIN_VALIDATORS: |
| | ledger_result = await self.ledger.commit_node(node, validators) |
| | if ledger_result.get("success"): |
| | self.metrics["ledger_commits"] += 1 |
| | |
| | execution_time = time.time() - start_time |
| | |
| | |
| | self._update_metrics( |
| | success=True, |
| | execution_time=execution_time, |
| | phase=current_phase, |
| | has_equilibrium=equilibrium_analysis.get("has_equilibrium", False), |
| | quorum_met=quorum_result.get("quorum_met", False) |
| | ) |
| | |
| | |
| | result = { |
| | "success": True, |
| | "detection_id": detection_id, |
| | "execution_time": execution_time, |
| | "current_phase": current_phase, |
| | "reality_node": { |
| | "hash": node.content_hash[:32], |
| | "proof_of_existence": node.proof_of_existence[:32] + "..." if node.proof_of_existence else None, |
| | "witness_count": len(node.witness_signatures) |
| | }, |
| | "analysis": composite_analysis, |
| | "quorum_result": quorum_result, |
| | "attestation_result": { |
| | "requested": len(validators), |
| | "successful": successful_attestations, |
| | "quorum_met": quorum_result.get("quorum_met", False) |
| | }, |
| | "ledger_result": ledger_result, |
| | "metrics": { |
| | "patterns_found": len(pattern_analysis.get("patterns", [])), |
| | "methods_applied": method_results.get("methods_applied", 0), |
| | "threat_level": threat_analysis.get("threat_level", "UNKNOWN"), |
| | "equilibrium_detected": equilibrium_analysis.get("has_equilibrium", False) |
| | }, |
| | "engine_metadata": { |
| | "version": "IRE_v5.0_Production", |
| | "quantum_aware": True, |
| | "n8n_integrated": True, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | } |
| | |
| | |
| | await self._cache_result(detection_id, result) |
| | |
| | logger.info(f"Detection {detection_id} completed successfully in {execution_time:.2f}s") |
| | |
| | return result |
| | |
| | except Exception as e: |
| | execution_time = time.time() - start_time |
| | error_id = f"err_{uuid.uuid4().hex[:8]}" |
| | |
| | self._update_metrics(success=False, execution_time=execution_time) |
| | |
| | logger.error(f"Detection {detection_id} failed: {e}", error_id=error_id) |
| | |
| | return { |
| | "success": False, |
| | "detection_id": detection_id, |
| | "error_id": error_id, |
| | "error": str(e), |
| | "execution_time": execution_time, |
| | "timestamp": datetime.utcnow().isoformat() + "Z", |
| | "engine_metadata": { |
| | "version": "IRE_v5.0_Production", |
| | "error_reported": True |
| | } |
| | } |
| | |
| | async def _analyze_content(self, content: Dict, context: Dict = None) -> Dict: |
| | """Analyze content via n8n""" |
| | payload = { |
| | "operation": "content_analysis", |
| | "content": content, |
| | "context": context or {}, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | response = await self.n8n_client.execute_workflow( |
| | ProductionConfig.WORKFLOW_IDS["lens_analysis"], |
| | payload |
| | ) |
| | |
| | return response.get("data", {}) if response.get("success") else {} |
| | |
| | async def _detect_patterns(self, content: Dict, content_analysis: Dict) -> Dict: |
| | """Detect patterns via n8n""" |
| | payload = { |
| | "operation": "pattern_detection", |
| | "content": content, |
| | "content_analysis": content_analysis, |
| | "lens_count": len(self.registry.lenses), |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | response = await self.n8n_client.execute_workflow( |
| | ProductionConfig.WORKFLOW_IDS["lens_analysis"], |
| | payload |
| | ) |
| | |
| | return response.get("data", {}) if response.get("success") else {} |
| | |
| | def _determine_phase(self, pattern_analysis: Dict) -> str: |
| | """Determine suppression phase""" |
| | patterns = pattern_analysis.get("patterns", []) |
| | |
| | |
| | equilibrium_count = sum(1 for p in patterns if p.get("type") == "equilibrium") |
| | |
| | if equilibrium_count >= 3: |
| | return SuppressionPhase.POST_SUPPRESSION_EQUILIBRIUM.value |
| | elif equilibrium_count >= 1: |
| | return SuppressionPhase.ESTABLISHING_SUPPRESSION.value |
| | else: |
| | return SuppressionPhase.ACTIVE_SUPPRESSION.value |
| | |
| | async def _apply_methods(self, content: Dict, phase: str, |
| | pattern_analysis: Dict) -> Dict: |
| | """Apply detection methods""" |
| | payload = { |
| | "operation": "method_execution", |
| | "content": content, |
| | "phase": phase, |
| | "pattern_analysis": pattern_analysis, |
| | "method_count": len(self.registry.methods), |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | response = await self.n8n_client.execute_workflow( |
| | ProductionConfig.WORKFLOW_IDS["method_execution"], |
| | payload |
| | ) |
| | |
| | return response.get("data", {}) if response.get("success") else {} |
| | |
| | async def _detect_equilibrium(self, pattern_analysis: Dict, |
| | method_results: Dict) -> Dict: |
| | """Detect equilibrium patterns""" |
| | payload = { |
| | "operation": "equilibrium_detection", |
| | "pattern_analysis": pattern_analysis, |
| | "method_results": method_results, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | response = await self.n8n_client.execute_workflow( |
| | ProductionConfig.WORKFLOW_IDS["equilibrium_detection"], |
| | payload |
| | ) |
| | |
| | return response.get("data", {}) if response.get("success") else {} |
| | |
| | async def _analyze_threats(self, system_state: Dict) -> Dict: |
| | """Analyze STRIDE-E threats""" |
| | payload = { |
| | "operation": "threat_analysis", |
| | "system_state": system_state, |
| | "threat_model": "STRIDE-E", |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | response = await self.n8n_client.execute_workflow( |
| | ProductionConfig.WORKFLOW_IDS["threat_analysis"], |
| | payload |
| | ) |
| | |
| | return response.get("data", {}) if response.get("success") else {} |
| | |
| | def _create_composite_analysis(self, content_analysis: Dict, |
| | pattern_analysis: Dict, |
| | method_results: Dict, |
| | equilibrium_analysis: Dict, |
| | threat_analysis: Dict) -> Dict: |
| | """Create composite analysis""" |
| | |
| | pattern_score = pattern_analysis.get("confidence", 0.0) |
| | method_score = method_results.get("confidence", 0.0) |
| | equilibrium_score = equilibrium_analysis.get("equilibrium_score", 0.0) |
| | threat_score = threat_analysis.get("risk_score", 0.0) |
| | |
| | |
| | weights = {"pattern": 0.3, "method": 0.4, "equilibrium": 0.2, "threat": 0.1} |
| | composite_score = ( |
| | pattern_score * weights["pattern"] + |
| | method_score * weights["method"] + |
| | equilibrium_score * weights["equilibrium"] + |
| | (1 - threat_score) * weights["threat"] |
| | ) |
| | |
| | |
| | if threat_score > 0.7: |
| | system_status = "CRITICAL" |
| | elif threat_score > 0.4: |
| | system_status = "DEGRADED" |
| | elif composite_score > 0.7: |
| | system_status = "HEALTHY" |
| | elif composite_score > 0.4: |
| | system_status = "MONITOR" |
| | else: |
| | system_status = "UNKNOWN" |
| | |
| | return { |
| | "composite_score": round(composite_score, 3), |
| | "system_status": system_status, |
| | "component_scores": { |
| | "pattern": round(pattern_score, 3), |
| | "method": round(method_score, 3), |
| | "equilibrium": round(equilibrium_score, 3), |
| | "threat": round(threat_score, 3) |
| | }, |
| | "has_equilibrium": equilibrium_analysis.get("has_equilibrium", False), |
| | "threat_level": threat_analysis.get("threat_level", "UNKNOWN"), |
| | "pattern_count": len(pattern_analysis.get("patterns", [])), |
| | "method_count": method_results.get("methods_applied", 0), |
| | "timestamp": datetime.utcnow().isoformat() + "Z", |
| | "note": "Quantum-aware analysis, not quantum-resistant" |
| | } |
| | |
| | def _select_validators(self, threat_analysis: Dict, phase: str) -> List[str]: |
| | """Select validators based on analysis""" |
| | validators = [] |
| | |
| | |
| | validators.append("system_epistemic_v5") |
| | validators.append("temporal_integrity_v5") |
| | |
| | |
| | threat_level = threat_analysis.get("threat_level", "UNKNOWN") |
| | if threat_level in ["HIGH", "CRITICAL"]: |
| | validators.append("human_sovereign_v5") |
| | |
| | if phase == SuppressionPhase.POST_SUPPRESSION_EQUILIBRIUM.value: |
| | validators.append("quantum_guardian_v5") |
| | |
| | |
| | while len(validators) < ProductionConfig.MIN_VALIDATORS: |
| | validators.append(f"backup_validator_{len(validators)}") |
| | |
| | return validators |
| | |
| | async def _get_attestations(self, node: RealityNode, |
| | validators: List[str], |
| | analysis: Dict) -> List[Dict]: |
| | """Get validator attestations""" |
| | attestations = [] |
| | |
| | for validator_id in validators: |
| | payload = { |
| | "operation": "validator_attestation", |
| | "validator_id": validator_id, |
| | "node": node.to_transport_format(), |
| | "analysis": analysis, |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | } |
| | |
| | response = await self.n8n_client.execute_workflow( |
| | ProductionConfig.WORKFLOW_IDS["validator_attestation"], |
| | payload |
| | ) |
| | |
| | if response.get("success"): |
| | attestations.append({ |
| | "validator_id": validator_id, |
| | "success": True, |
| | "signature_data": response.get("data", {}).get("signature"), |
| | "attestation": response.get("data", {}).get("attestation"), |
| | "timestamp": response.get("timestamp") |
| | }) |
| | else: |
| | attestations.append({ |
| | "validator_id": validator_id, |
| | "success": False, |
| | "error": response.get("error", "Unknown error"), |
| | "timestamp": datetime.utcnow().isoformat() + "Z" |
| | }) |
| | |
| | return attestations |
| | |
| | def _update_metrics(self, success: bool, execution_time: float, |
| | phase: str = None, has_equilibrium: bool = False, |
| | quorum_met: bool = False): |
| | """Update engine metrics""" |
| | self.metrics["total_detections"] += 1 |
| | |
| | if success: |
| | self.metrics["successful_detections"] += 1 |
| | else: |
| | self.metrics["failed_detections"] += 1 |
| | |
| | |
| | old_avg = self.metrics["average_execution_time"] |
| | total = self.metrics["total_detections"] |
| | self.metrics["average_execution_time"] = ( |
| | (old_avg * (total - 1)) + execution_time |
| | ) / total if total > 0 else execution_time |
| | |
| | if phase: |
| | self.metrics["phase_distribution"][phase] += 1 |
| | |
| | if has_equilibrium: |
| | self.metrics["equilibrium_detections"] += 1 |
| | |
| | if quorum_met: |
| | self.metrics["quorum_validations"] += 1 |
| | |
| | async def _cache_result(self, detection_id: str, result: Dict): |
| | """Cache result with TTL""" |
| | async with self.cache_lock: |
| | self.result_cache[detection_id] = { |
| | "result": result, |
| | "timestamp": datetime.utcnow().isoformat() + "Z", |
| | "expires": (datetime.utcnow() + timedelta(hours=24)).isoformat() + "Z" |
| | } |
| | |
| | async def _cleanup_loop(self): |
| | """Background cleanup loop""" |
| | while True: |
| | try: |
| | await asyncio.sleep(3600) |
| | |
| | now = datetime.utcnow() |
| | expired_keys = [] |
| | |
| | async with self.cache_lock: |
| | for key, entry in self.result_cache.items(): |
| | expires = datetime.fromisoformat(entry["expires"].replace('Z', '+00:00')) |
| | if now > expires: |
| | expired_keys.append(key) |
| | |
| | for key in expired_keys: |
| | del self.result_cache[key] |
| | |
| | if expired_keys: |
| | logger.info(f"Cleaned up {len(expired_keys)} expired cache entries") |
| | |
| | except asyncio.CancelledError: |
| | break |
| | except Exception as e: |
| | logger.error(f"Cleanup loop error: {e}") |
| | |
| | async def get_system_report(self) -> Dict[str, Any]: |
| | """Get comprehensive system report""" |
| | ledger_health = self.ledger.analyze_health_sync() |
| | |
| | |
| | total = self.metrics["total_detections"] |
| | successful = self.metrics["successful_detections"] |
| | success_rate = successful / total if total > 0 else 0.0 |
| | |
| | |
| | phase_dist = dict(self.metrics["phase_distribution"]) |
| | phase_percentages = { |
| | phase: (count / total if total > 0 else 0) |
| | for phase, count in phase_dist.items() |
| | } |
| | |
| | return { |
| | "report_timestamp": datetime.utcnow().isoformat() + "Z", |
| | "engine_version": "IRE_v5.0_Production_Fixed", |
| | "guarantees": { |
| | "quantum_aware": True, |
| | "quantum_resistant": False, |
| | "n8n_integrated": True, |
| | "async_processing": True, |
| | "immutable_ledger": True, |
| | "quorum_validation": True |
| | }, |
| | "metrics": { |
| | **self.metrics, |
| | "success_rate": round(success_rate, 3), |
| | "phase_distribution": phase_percentages |
| | }, |
| | "registry_status": { |
| | "lenses": len(self.registry.lenses), |
| | "methods": len(self.registry.methods), |
| | "last_sync": self.registry.last_sync |
| | }, |
| | "ledger_health": ledger_health, |
| | "performance": { |
| | "average_execution_time": round(self.metrics["average_execution_time"], 2), |
| | "cache_size": len(self.result_cache), |
| | "background_tasks": len(self._background_tasks) |
| | }, |
| | "config_summary": { |
| | "min_validators": ProductionConfig.MIN_VALIDATORS, |
| | "quorum_threshold": ProductionConfig.QUORUM_THRESHOLD, |
| | "dissent_threshold": ProductionConfig.DISSENT_THRESHOLD, |
| | "hash_algorithm": ProductionConfig.HASH_ALGORITHM |
| | }, |
| | "recommendations": self._generate_system_recommendations(ledger_health, success_rate) |
| | } |
| | |
| | def _generate_system_recommendations(self, ledger_health: Dict, |
| | success_rate: float) -> List[str]: |
| | """Generate system recommendations""" |
| | recommendations = [] |
| | |
| | |
| | if ledger_health.get("health_score", 0) < 0.7: |
| | recommendations.append("Improve ledger health by adding more nodes and validators") |
| | |
| | |
| | if success_rate < 0.8 and self.metrics["total_detections"] > 10: |
| | recommendations.append(f"Investigate failed detections (success rate: {success |