EGL Prototype
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
EGL Prototype (implements viable post-RAG vectors)
Author: Luis Ayala (OPHI concept) + ChatGPT prototype implementation
Date: 2025-12-30
This is a minimal, runnable reference architecture that implements:
Vector 0: Retrieval (RAG) -> retrieval() over an evidence store
Vector 1: Epistemic State Tracking (EST) -> confidence, consensus, volatility, contradictions
Vector 2: Temporal Truth Modeling (TTM) -> bitemporal validity (valid_from/to + observed_at)
Vector 3: Constraint-Based Generation (CBG)-> validators gate publication (type errors, not "oops")
Vector 4: Provenance-Native Cognition (PNC)-> claim<->evidence graph + derivation lineage
Vector 5: Drift-Aware Memory (DAM) -> branching + merges (no overwrite)
Vector 6: Self-Limitation Protocol (MSLP) -> policy gates drive refusal/escalation
Vector 7: Truth as Process (TPA) -> multi-agent propose/critique/verify/converge workflow
No external deps. In-memory ledger. Swap stores/validators for production.
"""
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from enum import Enum
from hashlib import sha256
from typing import Dict, List, Optional, Tuple, Callable, Any, Set
import json
import uuid
# -----------------------------
# Helpers
# -----------------------------
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def stable_hash(obj: Any) -> str:
"""Canonical JSON -> sha256 (content-addressable artifacts)."""
canonical = json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
return sha256(canonical.encode("utf-8")).hexdigest()
def new_id(prefix: str) -> str:
return f"{prefix}_{uuid.uuid4().hex[:12]}"
# -----------------------------
# Core Types (EGL substrate)
# -----------------------------
class EdgeType(str, Enum):
SUPPORTS = "supports"
CONTRADICTS = "contradicts"
REFINES = "refines"
DEPENDS_ON = "depends_on"
DERIVED_FROM = "derived_from"
SUPERSEDES = "supersedes"
SCOPED_TO = "scoped_to"
class UncertaintyType(str, Enum):
MISSING_EVIDENCE = "missing_evidence"
CONFLICTING_SOURCES = "conflicting_sources"
INFERENCE_UNDER_CONSTRAINTS = "inference_under_constraints"
STALE = "stale"
UNKNOWN = "unknown"
class PublishState(str, Enum):
DRAFT = "draft"
PUBLISHED = "published"
REJECTED = "rejected"
@dataclass
class Evidence:
evidence_id: str
title: str
source: str # e.g., "internal_doc", "web", "runbook", "policy", etc.
authority: float # 0..1 (domain-specific)
observed_at: str
content: str
tags: List[str] = field(default_factory=list)
def fingerprint(self) -> str:
return stable_hash(asdict(self))
@dataclass
class ValidatorResult:
name: str
passed: bool
rationale: str
details: Dict[str, Any] = field(default_factory=dict)
@dataclass
class EpistemicState:
confidence: float # 0..1
consensus: float # 0..1 (agreement among evidence/agents)
volatility: float # 0..1 (how likely to change soon)
uncertainty_type: UncertaintyType
notes: List[str] = field(default_factory=list)
@dataclass
class Claim:
claim_id: str
proposition: Dict[str, Any] # structured content (not prose)
context: Dict[str, Any] # domain, jurisdiction, scope
observed_at: str
valid_from: Optional[str] = None
valid_to: Optional[str] = None
# Vector 1: epistemics
epistemic: EpistemicState = field(default_factory=lambda: EpistemicState(
confidence=0.0, consensus=0.0, volatility=1.0, uncertainty_type=UncertaintyType.UNKNOWN
))
# Vector 4: provenance + lineage
evidence_ids: List[str] = field(default_factory=list)
counter_evidence_ids: List[str] = field(default_factory=list)
transforms: List[Dict[str, Any]] = field(default_factory=list) # derivation steps
# Vector 3: constraints
validators: List[ValidatorResult] = field(default_factory=list)
# Vector 5: drift-aware memory
branch_id: str = "main"
supersedes: Optional[str] = None # prior claim_id
drift_lineage: List[Dict[str, Any]] = field(default_factory=list)
# publication
state: PublishState = PublishState.DRAFT
def fingerprint(self) -> str:
# Exclude fingerprint itself; include structured content
payload = asdict(self)
return stable_hash(payload)
@dataclass
class Edge:
edge_id: str
edge_type: EdgeType
src_id: str
dst_id: str
created_at: str
meta: Dict[str, Any] = field(default_factory=dict)
# -----------------------------
# RAG (Vector 0): Evidence store + retrieval
# -----------------------------
class EvidenceStore:
def __init__(self) -> None:
self._evidence: Dict[str, Evidence] = {}
def add(self, ev: Evidence) -> None:
self._evidence[ev.evidence_id] = ev
def get(self, evidence_id: str) -> Evidence:
return self._evidence[evidence_id]
def all(self) -> List[Evidence]:
return list(self._evidence.values())
def retrieve(self, query: str, k: int = 5) -> List[Evidence]:
"""Toy retrieval: token overlap. Replace with embeddings/vector DB in production."""
q = set(query.lower().split())
scored: List[Tuple[float, Evidence]] = []
for ev in self._evidence.values():
tokens = set(ev.content.lower().split()) | set(ev.title.lower().split())
overlap = len(q & tokens)
score = overlap * 1.0 + 0.3 * ev.authority
scored.append((score, ev))
scored.sort(key=lambda x: x[0], reverse=True)
return [ev for score, ev in scored[:k] if score > 0]
# -----------------------------
# EGL Ledger: claims + edges + branches
# -----------------------------
class EGLedger:
def __init__(self) -> None:
self.claims: Dict[str, Claim] = {}
self.edges: Dict[str, Edge] = {}
self.branches: Dict[str, Set[str]] = {"main": set()} # branch_id -> claim_ids
def add_claim(self, claim: Claim) -> None:
self.claims[claim.claim_id] = claim
self.branches.setdefault(claim.branch_id, set()).add(claim.claim_id)
def add_edge(self, edge_type: EdgeType, src_id: str, dst_id: str, meta: Optional[Dict[str, Any]] = None) -> str:
eid = new_id("edge")
e = Edge(edge_id=eid, edge_type=edge_type, src_id=src_id, dst_id=dst_id, created_at=utc_now_iso(), meta=meta or {})
self.edges[eid] = e
return eid
def find_contradictions(self, claim: Claim) -> List[Claim]:
"""Simple contradiction detector: same subject+predicate but different object."""
contradictions: List[Claim] = []
subj = claim.proposition.get("subject")
pred = claim.proposition.get("predicate")
obj = claim.proposition.get("object")
for other in self.claims.values():
if other.claim_id == claim.claim_id:
continue
if other.branch_id != claim.branch_id:
continue
if other.proposition.get("subject") == subj and other.proposition.get("predicate") == pred:
if other.proposition.get("object") != obj:
contradictions.append(other)
return contradictions
def slice(self, query_context: Dict[str, Any], as_of: Optional[str] = None) -> List[Claim]:
"""Return published claims matching context and time."""
out: List[Claim] = []
t = as_of or utc_now_iso()
for c in self.claims.values():
if c.state != PublishState.PUBLISHED:
continue
if not _context_match(c.context, query_context):
continue
if c.valid_from and t < c.valid_from:
continue
if c.valid_to and t > c.valid_to:
continue
out.append(c)
return out
def _context_match(claim_ctx: Dict[str, Any], query_ctx: Dict[str, Any]) -> bool:
"""All query keys must match exactly in claim context (minimal)."""
for k, v in query_ctx.items():
if claim_ctx.get(k) != v:
return False
return True
# -----------------------------
# Validators (Vector 3)
# -----------------------------
Validator = Callable[[Claim, EvidenceStore, EGLedger], ValidatorResult]
def schema_validator(required_keys: List[str]) -> Validator:
def _v(claim: Claim, store: EvidenceStore, ledger: EGLedger) -> ValidatorResult:
missing = [k for k in required_keys if k not in claim.proposition]
ok = len(missing) == 0
return ValidatorResult(
name="schema_validator",
passed=ok,
rationale="All required proposition keys present." if ok else f"Missing keys: {missing}",
details={"missing": missing},
)
return _v
def evidence_coverage_validator(min_support: int = 1) -> Validator:
def _v(claim: Claim, store: EvidenceStore, ledger: EGLedger) -> ValidatorResult:
ok = len(claim.evidence_ids) >= min_support
return ValidatorResult(
name="evidence_coverage",
passed=ok,
rationale=f"Has >= {min_support} supporting evidence items." if ok else "Insufficient supporting evidence.",
details={"support_count": len(claim.evidence_ids), "required": min_support},
)
return _v
def no_unresolved_contradiction_validator() -> Validator:
def _v(claim: Claim, store: EvidenceStore, ledger: EGLedger) -> ValidatorResult:
contradictions = ledger.find_contradictions(claim)
# resolved if we have explicit contradict edges from claim to each conflicting claim
contradicted_ids = {e.dst_id for e in ledger.edges.values()
if e.edge_type == EdgeType.CONTRADICTS and e.src_id == claim.claim_id}
unresolved = [c.claim_id for c in contradictions if c.claim_id not in contradicted_ids]
ok = len(unresolved) == 0
return ValidatorResult(
name="no_unresolved_contradictions",
passed=ok,
rationale="No unresolved contradictions in-branch." if ok else f"Unresolved contradictions: {unresolved}",
details={"unresolved": unresolved},
)
return _v
def freshness_validator(max_age_days: int = 90) -> Validator:
def _v(claim: Claim, store: EvidenceStore, ledger: EGLedger) -> ValidatorResult:
# Treat observed_at for evidence as freshness anchors
if not claim.evidence_ids:
return ValidatorResult(
name="freshness",
passed=False,
rationale="No evidence to assess freshness.",
details={}
)
newest = None
for eid in claim.evidence_ids:
ev = store.get(eid)
if newest is None or ev.observed_at > newest:
newest = ev.observed_at
# crude day calc: compare dates by parsing ISO
now = datetime.now(timezone.utc)
dt_newest = datetime.fromisoformat(newest.replace("Z", "+00:00"))
age_days = (now - dt_newest).days
ok = age_days <= max_age_days
return ValidatorResult(
name="freshness",
passed=ok,
rationale=f"Evidence age {age_days}d within threshold." if ok else f"Evidence too old ({age_days}d).",
details={"age_days": age_days, "max_age_days": max_age_days, "newest_evidence_at": newest},
)
return _v
# -----------------------------
# Epistemic scoring (Vector 1)
# -----------------------------
def compute_epistemic_state(claim: Claim, store: EvidenceStore, ledger: EGLedger) -> EpistemicState:
# Consensus: how aligned are supporting sources (toy: average authority, penalize counters)
support = [store.get(eid) for eid in claim.evidence_ids]
counter = [store.get(eid) for eid in claim.counter_evidence_ids] if claim.counter_evidence_ids else []
if not support:
return EpistemicState(
confidence=0.0,
consensus=0.0,
volatility=1.0,
uncertainty_type=UncertaintyType.MISSING_EVIDENCE,
notes=["No supporting evidence."]
)
avg_auth = sum(e.authority for e in support) / len(support)
counter_weight = sum(e.authority for e in counter) / max(1, len(counter)) if counter else 0.0
# Contradiction density: count internal contradictions in branch
contradictions = ledger.find_contradictions(claim)
contradiction_penalty = min(0.8, 0.2 * len(contradictions))
# Freshness proxy: newer evidence => lower volatility
newest = max(e.observed_at for e in support)
dt_newest = datetime.fromisoformat(newest.replace("Z", "+00:00"))
age_days = (datetime.now(timezone.utc) - dt_newest).days
volatility = min(1.0, age_days / 180.0) # older -> more volatile (toy)
# Consensus: authority reduced by counterweight and contradictions
consensus = max(0.0, min(1.0, avg_auth - 0.5 * counter_weight - contradiction_penalty))
# Confidence: consensus capped by validator passes
pass_ratio = 0.0
if claim.validators:
pass_ratio = sum(1 for v in claim.validators if v.passed) / len(claim.validators)
confidence = max(0.0, min(1.0, consensus * (0.5 + 0.5 * pass_ratio)))
if contradictions:
utype = UncertaintyType.CONFLICTING_SOURCES
elif age_days > 90:
utype = UncertaintyType.STALE
else:
utype = UncertaintyType.INFERENCE_UNDER_CONSTRAINTS
notes = []
if contradictions:
notes.append(f"{len(contradictions)} in-branch contradictory claim(s) detected.")
if counter:
notes.append(f"{len(counter)} counter-evidence item(s) attached.")
notes.append(f"Newest evidence age: {age_days}d")
return EpistemicState(
confidence=confidence,
consensus=consensus,
volatility=volatility,
uncertainty_type=utype,
notes=notes
)
# -----------------------------
# Self-limitation policy (Vector 6)
# -----------------------------
@dataclass
class Refusal:
refused: bool
reason: str
required_inputs: List[str] = field(default_factory=list)
escalation: Optional[str] = None
graph_conditions: Dict[str, Any] = field(default_factory=dict)
def policy_gate(
claim: Claim,
high_stakes: bool,
min_confidence: float = 0.70,
max_volatility: float = 0.60,
) -> Refusal:
# If high stakes, require stricter confidence + low volatility
required_conf = min_confidence + (0.10 if high_stakes else 0.0)
required_vol = max_volatility - (0.10 if high_stakes else 0.0)
if claim.epistemic.confidence < required_conf:
return Refusal(
refused=True,
reason="Insufficient confidence for publication under policy gate.",
required_inputs=["more authoritative evidence", "resolve contradictions", "narrow scope/context"],
escalation="human_review" if high_stakes else "tooling_or_more_evidence",
graph_conditions={
"confidence": claim.epistemic.confidence,
"required_confidence": required_conf,
"uncertainty_type": claim.epistemic.uncertainty_type,
},
)
if claim.epistemic.volatility > required_vol:
return Refusal(
refused=True,
reason="Volatility too high (likely stale or unstable).",
required_inputs=["fresh evidence", "time-bounded claim validity window"],
escalation="human_review" if high_stakes else "refresh_sources",
graph_conditions={
"volatility": claim.epistemic.volatility,
"required_volatility": required_vol,
},
)
return Refusal(refused=False, reason="OK")
# -----------------------------
# Truth-as-process workflow (Vector 7)
# -----------------------------
@dataclass
class Agent:
name: str
def propose(self, query: str, retrieved: List[Evidence], context: Dict[str, Any]) -> Claim:
# Toy proposer: turn query into a structured proposition
# In production, you'd call an LLM here and parse into proposition schema.
subj = context.get("system", "unknown_system")
prop = {"subject": subj, "predicate": "answers", "object": query}
c = Claim(
claim_id=new_id("claim"),
proposition=prop,
context=context,
observed_at=utc_now_iso(),
valid_from=utc_now_iso(),
valid_to=None,
evidence_ids=[e.evidence_id for e in retrieved],
transforms=[{"agent": self.name, "step": "propose", "query": query, "ts": utc_now_iso()}],
branch_id=context.get("branch", "main"),
)
return c
def critique(self, claim: Claim, store: EvidenceStore, ledger: EGLedger) -> List[str]:
issues = []
if not claim.evidence_ids:
issues.append("No supporting evidence attached.")
contradictions = ledger.find_contradictions(claim)
if contradictions:
issues.append(f"Potential contradictions exist: {[c.claim_id for c in contradictions]}")
return issues
def verify(self, claim: Claim, validators: List[Validator], store: EvidenceStore, ledger: EGLedger) -> None:
claim.validators = [v(claim, store, ledger) for v in validators]
claim.epistemic = compute_epistemic_state(claim, store, ledger)
def converge(
agents: List[Agent],
claim: Claim,
store: EvidenceStore,
ledger: EGLedger,
validators: List[Validator],
high_stakes: bool = False,
) -> Tuple[PublishState, Optional[Refusal]]:
"""
Multi-agent process:
- each agent critiques
- verifier runs validators + epistemics
- policy gate decides publish/refuse
"""
critique_notes: List[str] = []
for a in agents:
critique_notes.extend([f"{a.name}: {n}" for n in a.critique(claim, store, ledger)])
# Run verification (choose first agent as verifier here)
agents[0].verify(claim, validators, store, ledger)
# Attach critique notes into epistemic notes
claim.epistemic.notes.extend(critique_notes)
# Policy gate
refusal = policy_gate(claim, high_stakes=high_stakes)
if refusal.refused:
claim.state = PublishState.REJECTED
claim.transforms.append({"step": "policy_refusal", "ts": utc_now_iso(), "refusal": asdict(refusal)})
return (PublishState.REJECTED, refusal)
# If any validator fails, reject (hard gate)
failed = [v for v in claim.validators if not v.passed]
if failed:
claim.state = PublishState.REJECTED
claim.transforms.append({
"step": "validation_reject",
"ts": utc_now_iso(),
"failed_validators": [asdict(v) for v in failed]
})
return (PublishState.REJECTED, Refusal(
refused=True,
reason="Validator gate failed (type error).",
required_inputs=["fix schema/constraints", "attach sufficient evidence", "resolve contradictions"],
escalation="repair_loop",
graph_conditions={"failed_validators": [v.name for v in failed]},
))
claim.state = PublishState.PUBLISHED
claim.transforms.append({"step": "publish", "ts": utc_now_iso()})
return (PublishState.PUBLISHED, None)
# -----------------------------
# Drift-aware memory ops (Vector 5)
# -----------------------------
def fork_branch(ledger: EGLedger, from_branch: str, new_branch: str) -> None:
ledger.branches.setdefault(new_branch, set())
for cid in ledger.branches.get(from_branch, set()):
ledger.branches[new_branch].add(cid)
def supersede_claim(ledger: EGLedger, old_claim_id: str, new_claim: Claim) -> None:
old = ledger.claims[old_claim_id]
new_claim.supersedes = old_claim_id
new_claim.drift_lineage.append({
"ts": utc_now_iso(),
"type": "supersede",
"from": old_claim_id,
"delta": {"proposition": {"from": old.proposition, "to": new_claim.proposition}}
})
ledger.add_edge(EdgeType.SUPERSEDES, new_claim.claim_id, old_claim_id, meta={"branch": new_claim.branch_id})
def merge_branches(ledger: EGLedger, src_branch: str, dst_branch: str, note: str = "") -> None:
ledger.branches.setdefault(dst_branch, set()).update(ledger.branches.get(src_branch, set()))
# Record merge as a ledger-level event edge (pseudo)
merge_edge_id = new_id("merge")
ledger.edges[merge_edge_id] = Edge(
edge_id=merge_edge_id,
edge_type=EdgeType.REFINES,
src_id=src_branch,
dst_id=dst_branch,
created_at=utc_now_iso(),
meta={"note": note, "op": "merge_branches"}
)
# -----------------------------
# Prototype demo
# -----------------------------
def demo() -> None:
print("=== EGL Prototype Demo (Vectors 0-7) ===")
# Evidence store
store = EvidenceStore()
ev1 = Evidence(
evidence_id=new_id("ev"),
title="Runbook: Autoscaling thresholds",
source="internal_runbook",
authority=0.85,
observed_at=utc_now_iso(),
content="Autoscaling triggers when CPU > 70% for 5m. Use unified policy across clouds.",
tags=["ops", "autoscaling", "policy"],
)
ev2 = Evidence(
evidence_id=new_id("ev"),
title="Policy: Multi-cloud routing guardrails",
source="internal_policy",
authority=0.90,
observed_at=utc_now_iso(),
content="Requests must pass schema normalization and entropy checks before routing to any provider.",
tags=["ops", "multicloud", "entropy"],
)
store.add(ev1)
store.add(ev2)
# Ledger
ledger = EGLedger()
# Agents (TPA)
agents = [Agent("proposer"), Agent("critic"), Agent("verifier")]
# Query (could be an incoming incident, question, or decision)
query = "Should we scale out across AWS and GCP for this traffic burst?"
context = {"domain": "ops", "system": "edge-broker", "jurisdiction": "N/A", "branch": "main"}
# Vector 0: retrieval
retrieved = store.retrieve(query, k=5)
# Propose a claim
claim = agents[0].propose(query, retrieved, context)
# Add claim to ledger early as draft (so contradictions can be detected)
ledger.add_claim(claim)
# Add provenance edges claim -> evidence
for eid in claim.evidence_ids:
ledger.add_edge(EdgeType.DERIVED_FROM, claim.claim_id, eid, meta={"role": "supporting_evidence"})
# Validators (Vector 3)
validators = [
schema_validator(["subject", "predicate", "object"]),
evidence_coverage_validator(min_support=1),
freshness_validator(max_age_days=90),
no_unresolved_contradiction_validator(),
]
# Converge / publish via process (Vector 7 + Vector 6)
state, refusal = converge(
agents=agents,
claim=claim,
store=store,
ledger=ledger,
validators=validators,
high_stakes=True, # treat ops decisions as high stakes
)
print(f"\nPublish state: {state}")
if refusal:
print("Refusal:", json.dumps(asdict(refusal), indent=2))
else:
print("Published claim epistemics:", json.dumps(asdict(claim.epistemic), indent=2))
# Demonstrate temporal truth modeling (Vector 2): supersede with new policy
# (simulate future update)
print("\n--- Temporal update: new policy changes threshold ---")
fork_branch(ledger, "main", "policy_update")
updated_context = dict(context)
updated_context["branch"] = "policy_update"
# New evidence (later time)
ev3 = Evidence(
evidence_id=new_id("ev"),
title="Updated Runbook: Autoscaling thresholds v2",
source="internal_runbook",
authority=0.92,
observed_at=utc_now_iso(),
content="Autoscaling triggers when CPU > 60% for 3m. Coordinated scaling requires entropy gate pass.",
tags=["ops", "autoscaling", "policy", "v2"],
)
store.add(ev3)
retrieved2 = store.retrieve("updated autoscaling threshold", k=5)
claim2 = agents[0].propose("Autoscaling threshold updated to CPU>60% for 3m", retrieved2, updated_context)
ledger.add_claim(claim2)
supersede_claim(ledger, old_claim_id=claim.claim_id, new_claim=claim2)
# Validate and publish
for eid in claim2.evidence_ids:
ledger.add_edge(EdgeType.DERIVED_FROM, claim2.claim_id, eid, meta={"role": "supporting_evidence"})
state2, refusal2 = converge(agents, claim2, store, ledger, validators, high_stakes=False)
print(f"Publish state (updated claim): {state2}")
if refusal2:
print("Refusal:", json.dumps(asdict(refusal2), indent=2))
# Demonstrate queryable slices (EGL output as graph slice)
print("\n--- EGL Slice (as-of now, branch=policy_update) ---")
slice_claims = [c for c in ledger.claims.values()
if c.state == PublishState.PUBLISHED and c.branch_id == "policy_update"]
for c in slice_claims:
print(json.dumps({
"claim_id": c.claim_id,
"proposition": c.proposition,
"valid_from": c.valid_from,
"valid_to": c.valid_to,
"supersedes": c.supersedes,
"epistemic": asdict(c.epistemic),
"validators": [asdict(v) for v in c.validators],
}, indent=2))
# Artifact seal: hash entire ledger snapshot (PNC + audit)
snapshot = {
"ts": utc_now_iso(),
"claims": {cid: asdict(c) for cid, c in ledger.claims.items()},
"edges": {eid: asdict(e) for eid, e in ledger.edges.items()},
"branches": {b: sorted(list(ids)) for b, ids in ledger.branches.items()},
}
print("\n--- Ledger Snapshot Seal ---")
print("timestamp_utc:", snapshot["ts"])
print("sha256:", stable_hash(snapshot))
if __name__ == "__main__":
demo()
What this prototype gives you (in one breath)
-
EGL substrate (claims + edges + bitemporal fields + lineage)
-
RAG retrieval as a plug-in evidence fetch (Vector 0)
-
EST computed epistemics (confidence/consensus/volatility/uncertainty)
-
TTM via validity windows + supersession
-
CBG via hard validators (reject = “type error”)
-
PNC via explicit evidence links + derivation steps + sealed snapshot hash
-
DAM via branching + explicit merges/supersession (no overwrite)
-
MSLP via policy gates that refuse/escalate
-
TPA via multi-agent propose/critique/verify/converge workflow
Comments
Post a Comment