Skip to content

UPIR

The main UPIR class that combines specifications and architectures.


Overview

The UPIR class is the core abstraction that ties together:

  • Formal specifications (temporal properties and constraints)
  • System architecture (components and connections)
  • Metadata and configuration

Class Documentation

upir.core.upir.UPIR dataclass

Universal Plan Intermediate Representation for distributed systems.

UPIR integrates formal specifications, architectural designs, evidence from various sources, and reasoning graphs to enable automated synthesis, verification, and continuous optimization of distributed systems.

Based on TD Commons disclosure, UPIR maintains: - Formal specifications (invariants, properties, constraints) - Architecture representation (components, connections, deployment) - Evidence tracking (benchmarks, tests, production data, proofs) - Reasoning graph (decisions, rationale, confidence propagation)

Attributes:

Name Type Description
specification Optional[FormalSpecification]

Formal specification with invariants and properties

architecture Optional[Architecture]

Architecture representation (components, connections)

id str

Unique identifier (auto-generated UUID if not provided)

name str

Human-readable name for this UPIR instance (default: "")

description str

Description of the system being represented (default: "")

evidence Dict[str, Evidence]

Dictionary of evidence keyed by evidence ID

reasoning Dict[str, ReasoningNode]

Dictionary of reasoning nodes keyed by node ID

metadata Dict[str, Any]

Additional metadata (tags, owner, project, etc.)

created_at datetime

When this UPIR was created (UTC)

updated_at datetime

When this UPIR was last modified (UTC)

Example

Simple usage with defaults

upir = UPIR( ... specification=FormalSpecification(...), ... architecture=Architecture(...) ... )

Or with explicit metadata

upir = UPIR( ... specification=spec, ... architecture=arch, ... name="E-commerce Platform", ... description="High-throughput e-commerce system" ... ) upir.validate() True

References: - TD Commons: UPIR structure and components

Source code in upir/core/upir.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
@dataclass
class UPIR:
    """
    Universal Plan Intermediate Representation for distributed systems.

    UPIR integrates formal specifications, architectural designs, evidence
    from various sources, and reasoning graphs to enable automated synthesis,
    verification, and continuous optimization of distributed systems.

    Based on TD Commons disclosure, UPIR maintains:
    - Formal specifications (invariants, properties, constraints)
    - Architecture representation (components, connections, deployment)
    - Evidence tracking (benchmarks, tests, production data, proofs)
    - Reasoning graph (decisions, rationale, confidence propagation)

    Attributes:
        specification: Formal specification with invariants and properties
        architecture: Architecture representation (components, connections)
        id: Unique identifier (auto-generated UUID if not provided)
        name: Human-readable name for this UPIR instance (default: "")
        description: Description of the system being represented (default: "")
        evidence: Dictionary of evidence keyed by evidence ID
        reasoning: Dictionary of reasoning nodes keyed by node ID
        metadata: Additional metadata (tags, owner, project, etc.)
        created_at: When this UPIR was created (UTC)
        updated_at: When this UPIR was last modified (UTC)

    Example:
        >>> # Simple usage with defaults
        >>> upir = UPIR(
        ...     specification=FormalSpecification(...),
        ...     architecture=Architecture(...)
        ... )
        >>> # Or with explicit metadata
        >>> upir = UPIR(
        ...     specification=spec,
        ...     architecture=arch,
        ...     name="E-commerce Platform",
        ...     description="High-throughput e-commerce system"
        ... )
        >>> upir.validate()
        True

    References:
    - TD Commons: UPIR structure and components
    """
    specification: Optional[FormalSpecification] = None
    architecture: Optional[Architecture] = None
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    description: str = ""
    evidence: Dict[str, Evidence] = field(default_factory=dict)
    reasoning: Dict[str, ReasoningNode] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)

    def __post_init__(self):
        """Validate UPIR fields."""
        if not self.id:
            raise ValueError("ID cannot be empty")

    @staticmethod
    def generate_id() -> str:
        """
        Generate a unique ID for a UPIR instance.

        Returns:
            UUID string

        Example:
            >>> upir_id = UPIR.generate_id()
            >>> len(upir_id) == 36  # UUID format
            True
        """
        return str(uuid.uuid4())

    def add_evidence(self, evidence: Evidence) -> str:
        """
        Add evidence to this UPIR instance.

        Generates a unique ID for the evidence and adds it to the evidence
        dictionary. Updates the updated_at timestamp.

        Args:
            evidence: Evidence instance to add

        Returns:
            Generated evidence ID

        Example:
            >>> upir = UPIR(id="upir-1", name="test", description="test")
            >>> evidence = Evidence("test", "benchmark", {}, 0.8)
            >>> evidence_id = upir.add_evidence(evidence)
            >>> len(evidence_id) == 36  # UUID
            True
            >>> evidence_id in upir.evidence
            True
        """
        evidence_id = str(uuid.uuid4())
        self.evidence[evidence_id] = evidence
        self.updated_at = datetime.utcnow()
        return evidence_id

    def add_reasoning(self, node: ReasoningNode) -> str:
        """
        Add reasoning node to this UPIR instance.

        Adds the node to the reasoning dictionary using its ID. Updates the
        updated_at timestamp.

        Args:
            node: ReasoningNode instance to add

        Returns:
            The node's ID (from node.id)

        Example:
            >>> upir = UPIR(id="upir-1", name="test", description="test")
            >>> node = ReasoningNode(
            ...     id=ReasoningNode.generate_id(),
            ...     decision="Use PostgreSQL",
            ...     rationale="Strong consistency needed"
            ... )
            >>> node_id = upir.add_reasoning(node)
            >>> node_id in upir.reasoning
            True
        """
        self.reasoning[node.id] = node
        self.updated_at = datetime.utcnow()
        return node.id

    def compute_overall_confidence(self) -> float:
        """
        Compute overall confidence using harmonic mean of leaf node confidences.

        Leaf nodes are reasoning nodes that are not referenced as parents by
        any other nodes - they represent final decisions or conclusions.

        The harmonic mean is more conservative than arithmetic or geometric mean,
        heavily penalizing low confidences. Formula: n / sum(1/c_i)

        Returns:
            Overall confidence in [0, 1], or 0.0 if no leaf nodes

        Example:
            >>> upir = UPIR(id="upir-1", name="test", description="test")
            >>> node1 = ReasoningNode("n1", "decision1", "rationale", confidence=0.8)
            >>> node2 = ReasoningNode("n2", "decision2", "rationale", confidence=0.9)
            >>> upir.add_reasoning(node1)
            'n1'
            >>> upir.add_reasoning(node2)
            'n2'
            >>> conf = upir.compute_overall_confidence()
            >>> 0.84 < conf < 0.85  # Harmonic mean of 0.8 and 0.9
            True

        References:
        - Harmonic mean: More conservative aggregation for confidences
        - Leaf nodes: Final conclusions in reasoning DAG
        """
        if not self.reasoning:
            return 0.0

        # Find all nodes referenced as parents
        referenced_nodes: Set[str] = set()
        for node in self.reasoning.values():
            referenced_nodes.update(node.parent_ids)

        # Leaf nodes are those NOT referenced as parents
        leaf_nodes = [
            node for node_id, node in self.reasoning.items()
            if node_id not in referenced_nodes
        ]

        if not leaf_nodes:
            return 0.0

        # Filter out zero confidences (would cause division by zero)
        confidences = [node.confidence for node in leaf_nodes]
        if any(c == 0.0 for c in confidences):
            return 0.0

        # Compute harmonic mean: n / sum(1/c_i)
        n = len(confidences)
        reciprocal_sum = sum(1.0 / c for c in confidences)
        harmonic_mean = n / reciprocal_sum

        return harmonic_mean

    def validate(self) -> bool:
        """
        Validate the UPIR instance for consistency.

        Performs multiple validation checks:
        1. Specification validation (if present)
        2. Reasoning DAG cycle detection
        3. Evidence reference validation (all referenced evidence exists)

        Returns:
            True if all validations pass

        Raises:
            ValueError: If any validation check fails

        Example:
            >>> upir = UPIR(
            ...     id="upir-1",
            ...     name="test",
            ...     description="test",
            ...     specification=FormalSpecification()
            ... )
            >>> upir.validate()
            True

        References:
        - Cycle detection: DFS with recursion stack
        - Standard graph algorithm for DAG validation
        """
        # Validate specification if present
        if self.specification is not None:
            self.specification.validate()

        # Check for cycles in reasoning DAG
        self._check_dag_cycles()

        # Validate evidence references
        for node in self.reasoning.values():
            for evidence_id in node.evidence_ids:
                if evidence_id not in self.evidence:
                    raise ValueError(
                        f"Reasoning node '{node.id}' references non-existent "
                        f"evidence '{evidence_id}'"
                    )

        return True

    def _check_dag_cycles(self) -> None:
        """
        Check for cycles in the reasoning DAG using DFS.

        Uses depth-first search with a recursion stack to detect cycles.
        If a node is encountered that's already in the recursion stack,
        a cycle exists.

        Raises:
            ValueError: If a cycle is detected

        Algorithm:
        - visited: Set of all nodes we've completely processed
        - rec_stack: Set of nodes in current DFS path (recursion stack)
        - For each unvisited node, do DFS
        - If we visit a node already in rec_stack -> cycle

        References:
        - Standard DFS cycle detection for directed graphs
        - Cormen et al., "Introduction to Algorithms"
        """
        visited: Set[str] = set()
        rec_stack: Set[str] = set()

        def dfs(node_id: str) -> None:
            """DFS helper to detect cycles."""
            visited.add(node_id)
            rec_stack.add(node_id)

            # Visit all parent nodes
            if node_id in self.reasoning:
                node = self.reasoning[node_id]
                for parent_id in node.parent_ids:
                    if parent_id not in visited:
                        # Recursively visit parent
                        dfs(parent_id)
                    elif parent_id in rec_stack:
                        # Found a back edge -> cycle
                        raise ValueError(
                            f"Cycle detected in reasoning graph involving "
                            f"node '{parent_id}'"
                        )

            # Remove from recursion stack when done
            rec_stack.remove(node_id)

        # Check all nodes
        for node_id in self.reasoning.keys():
            if node_id not in visited:
                dfs(node_id)

    def generate_signature(self) -> str:
        """
        Generate cryptographic signature (SHA-256) of this UPIR.

        Creates a deterministic hash of the entire UPIR state including
        specification, architecture, evidence, and reasoning. Useful for
        integrity verification and change detection.

        Returns:
            Hexadecimal SHA-256 hash string

        Example:
            >>> upir = UPIR(id="upir-1", name="test", description="test")
            >>> sig1 = upir.generate_signature()
            >>> sig2 = upir.generate_signature()
            >>> sig1 == sig2  # Deterministic
            True
            >>> len(sig1) == 64  # SHA-256
            True

        References:
        - SHA-256: Industry standard cryptographic hash
        - Python hashlib: https://docs.python.org/3/library/hashlib.html
        """
        # Convert to dict representation
        upir_dict = {
            "id": self.id,
            "name": self.name,
            "description": self.description,
            "specification": (
                self.specification.to_dict() if self.specification else None
            ),
            "architecture": (
                self.architecture.to_dict() if self.architecture else None
            ),
            "evidence": {
                eid: ev.to_dict() for eid, ev in sorted(self.evidence.items())
            },
            "reasoning": {
                nid: node.to_dict() for nid, node in sorted(self.reasoning.items())
            },
            "metadata": dict(sorted(self.metadata.items()))
        }

        # Create deterministic JSON
        json_str = json.dumps(upir_dict, sort_keys=True)

        # Compute SHA-256
        hash_obj = hashlib.sha256(json_str.encode('utf-8'))
        return hash_obj.hexdigest()

    def to_json(self) -> str:
        """
        Serialize UPIR to JSON string.

        Returns:
            JSON string representation

        Example:
            >>> upir = UPIR(id="upir-1", name="test", description="desc")
            >>> json_str = upir.to_json()
            >>> "upir-1" in json_str
            True
        """
        upir_dict = {
            "id": self.id,
            "name": self.name,
            "description": self.description,
            "specification": (
                self.specification.to_dict() if self.specification else None
            ),
            "architecture": (
                self.architecture.to_dict() if self.architecture else None
            ),
            "evidence": {
                eid: ev.to_dict() for eid, ev in self.evidence.items()
            },
            "reasoning": {
                nid: node.to_dict() for nid, node in self.reasoning.items()
            },
            "metadata": self.metadata,
            "created_at": self.created_at.isoformat(),
            "updated_at": self.updated_at.isoformat()
        }
        return json.dumps(upir_dict, indent=2)

    @classmethod
    def from_json(cls, json_str: str) -> "UPIR":
        """
        Deserialize UPIR from JSON string.

        Args:
            json_str: JSON string representation

        Returns:
            UPIR instance

        Example:
            >>> original = UPIR(id="upir-1", name="test", description="desc")
            >>> json_str = original.to_json()
            >>> restored = UPIR.from_json(json_str)
            >>> restored.id == original.id
            True
        """
        data = json.loads(json_str)

        return cls(
            id=data["id"],
            name=data["name"],
            description=data["description"],
            specification=(
                FormalSpecification.from_dict(data["specification"])
                if data.get("specification") else None
            ),
            architecture=(
                Architecture.from_dict(data["architecture"])
                if data.get("architecture") else None
            ),
            evidence={
                eid: Evidence.from_dict(ev_data)
                for eid, ev_data in data.get("evidence", {}).items()
            },
            reasoning={
                nid: ReasoningNode.from_dict(node_data)
                for nid, node_data in data.get("reasoning", {}).items()
            },
            metadata=data.get("metadata", {}),
            created_at=datetime.fromisoformat(data["created_at"]),
            updated_at=datetime.fromisoformat(data["updated_at"])
        )

    def __str__(self) -> str:
        """Human-readable string representation."""
        parts = [f"'{self.name}'"]
        if self.specification:
            parts.append("with spec")
        if self.architecture:
            parts.append("with arch")
        if self.evidence:
            parts.append(f"{len(self.evidence)} evidence")
        if self.reasoning:
            parts.append(f"{len(self.reasoning)} reasoning nodes")

        return f"UPIR({', '.join(parts)})"

    def __repr__(self) -> str:
        """Developer-friendly representation."""
        return (
            f"UPIR(id='{self.id}', name='{self.name}', "
            f"spec={self.specification is not None}, "
            f"arch={self.architecture is not None}, "
            f"evidence={len(self.evidence)}, "
            f"reasoning={len(self.reasoning)})"
        )

Functions

__init__(specification=None, architecture=None, id=(lambda: str(uuid.uuid4()))(), name='', description='', evidence=dict(), reasoning=dict(), metadata=dict(), created_at=datetime.utcnow(), updated_at=datetime.utcnow())

validate()

Validate the UPIR instance for consistency.

Performs multiple validation checks: 1. Specification validation (if present) 2. Reasoning DAG cycle detection 3. Evidence reference validation (all referenced evidence exists)

Returns:

Type Description
bool

True if all validations pass

Raises:

Type Description
ValueError

If any validation check fails

Example

upir = UPIR( ... id="upir-1", ... name="test", ... description="test", ... specification=FormalSpecification() ... ) upir.validate() True

References: - Cycle detection: DFS with recursion stack - Standard graph algorithm for DAG validation

Source code in upir/core/upir.py
def validate(self) -> bool:
    """
    Validate the UPIR instance for consistency.

    Performs multiple validation checks:
    1. Specification validation (if present)
    2. Reasoning DAG cycle detection
    3. Evidence reference validation (all referenced evidence exists)

    Returns:
        True if all validations pass

    Raises:
        ValueError: If any validation check fails

    Example:
        >>> upir = UPIR(
        ...     id="upir-1",
        ...     name="test",
        ...     description="test",
        ...     specification=FormalSpecification()
        ... )
        >>> upir.validate()
        True

    References:
    - Cycle detection: DFS with recursion stack
    - Standard graph algorithm for DAG validation
    """
    # Validate specification if present
    if self.specification is not None:
        self.specification.validate()

    # Check for cycles in reasoning DAG
    self._check_dag_cycles()

    # Validate evidence references
    for node in self.reasoning.values():
        for evidence_id in node.evidence_ids:
            if evidence_id not in self.evidence:
                raise ValueError(
                    f"Reasoning node '{node.id}' references non-existent "
                    f"evidence '{evidence_id}'"
                )

    return True

to_json()

Serialize UPIR to JSON string.

Returns:

Type Description
str

JSON string representation

Example

upir = UPIR(id="upir-1", name="test", description="desc") json_str = upir.to_json() "upir-1" in json_str True

Source code in upir/core/upir.py
def to_json(self) -> str:
    """
    Serialize UPIR to JSON string.

    Returns:
        JSON string representation

    Example:
        >>> upir = UPIR(id="upir-1", name="test", description="desc")
        >>> json_str = upir.to_json()
        >>> "upir-1" in json_str
        True
    """
    upir_dict = {
        "id": self.id,
        "name": self.name,
        "description": self.description,
        "specification": (
            self.specification.to_dict() if self.specification else None
        ),
        "architecture": (
            self.architecture.to_dict() if self.architecture else None
        ),
        "evidence": {
            eid: ev.to_dict() for eid, ev in self.evidence.items()
        },
        "reasoning": {
            nid: node.to_dict() for nid, node in self.reasoning.items()
        },
        "metadata": self.metadata,
        "created_at": self.created_at.isoformat(),
        "updated_at": self.updated_at.isoformat()
    }
    return json.dumps(upir_dict, indent=2)

from_json(json_str) classmethod

Deserialize UPIR from JSON string.

Parameters:

Name Type Description Default
json_str str

JSON string representation

required

Returns:

Type Description
UPIR

UPIR instance

Example

original = UPIR(id="upir-1", name="test", description="desc") json_str = original.to_json() restored = UPIR.from_json(json_str) restored.id == original.id True

Source code in upir/core/upir.py
@classmethod
def from_json(cls, json_str: str) -> "UPIR":
    """
    Deserialize UPIR from JSON string.

    Args:
        json_str: JSON string representation

    Returns:
        UPIR instance

    Example:
        >>> original = UPIR(id="upir-1", name="test", description="desc")
        >>> json_str = original.to_json()
        >>> restored = UPIR.from_json(json_str)
        >>> restored.id == original.id
        True
    """
    data = json.loads(json_str)

    return cls(
        id=data["id"],
        name=data["name"],
        description=data["description"],
        specification=(
            FormalSpecification.from_dict(data["specification"])
            if data.get("specification") else None
        ),
        architecture=(
            Architecture.from_dict(data["architecture"])
            if data.get("architecture") else None
        ),
        evidence={
            eid: Evidence.from_dict(ev_data)
            for eid, ev_data in data.get("evidence", {}).items()
        },
        reasoning={
            nid: ReasoningNode.from_dict(node_data)
            for nid, node_data in data.get("reasoning", {}).items()
        },
        metadata=data.get("metadata", {}),
        created_at=datetime.fromisoformat(data["created_at"]),
        updated_at=datetime.fromisoformat(data["updated_at"])
    )

Usage Example

from upir import UPIR, FormalSpecification, Architecture
from upir.core.temporal import TemporalProperty, TemporalOperator

# Create specification
spec = FormalSpecification(
    properties=[
        TemporalProperty(
            operator=TemporalOperator.ALWAYS,
            predicate="data_consistent"
        )
    ],
    constraints={"latency_p99": {"max": 100.0}}
)

# Create architecture
arch = Architecture(
    components=[
        {"id": "api", "type": "api_gateway", "latency_ms": 10}
    ],
    connections=[]
)

# Create UPIR instance
upir = UPIR(specification=spec, architecture=arch)

# Validate
is_valid = upir.validate()

# Serialize
upir_json = upir.to_json()

See Also