Skip to content

RL Optimizer

Reinforcement learning-based architecture optimization.


Overview

The ArchitectureLearner uses PPO (Proximal Policy Optimization) to optimize architectures from production metrics.


Class Documentation

upir.learning.learner.ArchitectureLearner

Learn optimal architectures using PPO reinforcement learning.

The learner encodes UPIR architectures as state vectors, uses PPO to select architectural modifications (actions), and learns from performance metrics (rewards) to optimize the architecture over time.

State encoding extracts features like component count, latency, throughput. Actions represent architectural changes (adjust parallelism, change types, etc.). Rewards combine constraint satisfaction and performance improvements.

Attributes:

Name Type Description
state_dim

Fixed dimension of state vectors

action_dim

Number of possible actions

ppo

PPO agent for policy learning

experience_buffer Deque[Experience]

Replay buffer for experiences

config

PPO configuration

Example

learner = ArchitectureLearner(state_dim=64, action_dim=40)

After deploying architecture and collecting metrics:

optimized_upir = learner.learn_from_metrics(upir, metrics)

References: - PPO: Policy gradient method with clipped objective - TD Commons: Architecture optimization approach - OpenAI Spinning Up: RL best practices

Source code in upir/learning/learner.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
class ArchitectureLearner:
    """
    Learn optimal architectures using PPO reinforcement learning.

    The learner encodes UPIR architectures as state vectors, uses PPO to
    select architectural modifications (actions), and learns from performance
    metrics (rewards) to optimize the architecture over time.

    State encoding extracts features like component count, latency, throughput.
    Actions represent architectural changes (adjust parallelism, change types, etc.).
    Rewards combine constraint satisfaction and performance improvements.

    Attributes:
        state_dim: Fixed dimension of state vectors
        action_dim: Number of possible actions
        ppo: PPO agent for policy learning
        experience_buffer: Replay buffer for experiences
        config: PPO configuration

    Example:
        >>> learner = ArchitectureLearner(state_dim=64, action_dim=40)
        >>> # After deploying architecture and collecting metrics:
        >>> optimized_upir = learner.learn_from_metrics(upir, metrics)

    References:
    - PPO: Policy gradient method with clipped objective
    - TD Commons: Architecture optimization approach
    - OpenAI Spinning Up: RL best practices
    """

    def __init__(
        self,
        state_dim: int = 64,
        action_dim: int = 40,
        config: PPOConfig = None,
        buffer_size: int = 1000
    ):
        """
        Initialize architecture learner.

        Args:
            state_dim: Dimension of state encoding (default 64)
            action_dim: Number of possible actions (default 40)
            config: PPO configuration (uses defaults if None)
            buffer_size: Maximum size of experience buffer
        """
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.config = config or PPOConfig()

        # Initialize PPO agent
        self.ppo = PPO(state_dim=state_dim, action_dim=action_dim, config=self.config)

        # Experience buffer for training
        self.experience_buffer: Deque[Experience] = deque(maxlen=buffer_size)

        # Feature normalization stats (updated online)
        self.feature_stats = {
            "num_components_max": 100.0,
            "num_connections_max": 200.0,
            "avg_latency_max": 10000.0,  # 10 seconds in ms
            "total_throughput_max": 100000.0,  # 100k QPS
            "complexity_max": 1000.0,
        }

        logger.info(
            f"Initialized ArchitectureLearner: state_dim={state_dim}, "
            f"action_dim={action_dim}, buffer_size={buffer_size}"
        )

    def encode_state(self, upir: UPIR) -> np.ndarray:
        """
        Encode UPIR architecture as fixed-size state vector.

        Extracts architectural features and normalizes to [0, 1] range:
        - Number of components
        - Number of connections
        - Average component latency
        - Total throughput capacity
        - Deployment complexity score

        The state is padded to fixed size (state_dim) for consistent input.

        Args:
            upir: UPIR to encode

        Returns:
            State vector (state_dim,) with values in [0, 1]

        Example:
            >>> learner = ArchitectureLearner()
            >>> upir = UPIR(id="test", name="Test", description="Test")
            >>> state = learner.encode_state(upir)
            >>> state.shape
            (64,)
            >>> assert np.all((state >= 0) & (state <= 1))
        """
        if upir.architecture is None:
            # No architecture - return zero state
            return np.zeros(self.state_dim)

        arch = upir.architecture

        # Extract basic features
        num_components = len(arch.components)
        num_connections = len(arch.connections)

        # Compute average latency (if components have latency info)
        total_latency = 0.0
        latency_count = 0
        for comp in arch.components:
            if isinstance(comp, dict) and "latency_ms" in comp:
                total_latency += comp["latency_ms"]
                latency_count += 1
        avg_latency = total_latency / latency_count if latency_count > 0 else 0.0

        # Compute total throughput (if components have throughput info)
        total_throughput = 0.0
        for comp in arch.components:
            if isinstance(comp, dict) and "throughput_qps" in comp:
                total_throughput += comp["throughput_qps"]

        # Compute deployment complexity (simple heuristic)
        # Based on number of components and connections
        complexity = num_components * 10 + num_connections * 5

        # Normalize features to [0, 1]
        features = np.array([
            num_components / self.feature_stats["num_components_max"],
            num_connections / self.feature_stats["num_connections_max"],
            avg_latency / self.feature_stats["avg_latency_max"],
            total_throughput / self.feature_stats["total_throughput_max"],
            complexity / self.feature_stats["complexity_max"],
        ])

        # Clip to [0, 1] range
        features = np.clip(features, 0.0, 1.0)

        # Add per-component features (latency, throughput, parallelism)
        component_features = []
        for comp in arch.components[:10]:  # Max 10 components
            if isinstance(comp, dict):
                comp_latency = comp.get("latency_ms", 0.0)
                comp_throughput = comp.get("throughput_qps", 0.0)
                comp_parallelism = comp.get("parallelism", 1)

                component_features.extend([
                    comp_latency / self.feature_stats["avg_latency_max"],
                    comp_throughput / self.feature_stats["total_throughput_max"],
                    comp_parallelism / 100.0,  # Normalize to [0, 1] assuming max 100
                ])

        # Pad to fixed size
        all_features = np.concatenate([features, component_features])

        # Clip all features to [0, 1] range
        all_features = np.clip(all_features, 0.0, 1.0)
        if len(all_features) < self.state_dim:
            # Pad with zeros
            all_features = np.pad(
                all_features,
                (0, self.state_dim - len(all_features)),
                mode="constant"
            )
        else:
            # Truncate if too long
            all_features = all_features[:self.state_dim]

        return all_features.astype(np.float32)

    def decode_action(self, action: int, upir: UPIR) -> UPIR:
        """
        Decode action and apply architectural modification.

        Actions represent different architectural changes:
        - 0-9: Increase parallelism of component i
        - 10-19: Decrease parallelism of component i
        - 20-29: Change component type (e.g., batch -> streaming)
        - 30-39: Modify connection (add/remove)

        Args:
            action: Action index (0 to action_dim-1)
            upir: Current UPIR

        Returns:
            Modified UPIR with architectural change applied

        Example:
            >>> learner = ArchitectureLearner()
            >>> upir = UPIR(id="test", name="Test", description="Test")
            >>> modified = learner.decode_action(0, upir)
        """
        # Create copy to avoid modifying original
        modified_upir = copy.deepcopy(upir)

        if modified_upir.architecture is None:
            logger.warning("No architecture to modify")
            return modified_upir

        arch = modified_upir.architecture
        num_components = len(arch.components)

        if num_components == 0:
            logger.warning("No components to modify")
            return modified_upir

        # Decode action type and target
        if action < 10:
            # Increase parallelism of component (action % num_components)
            component_idx = action % num_components
            comp = arch.components[component_idx]
            current_parallelism = comp.get("parallelism", 1)
            new_parallelism = min(current_parallelism + 1, 100)
            comp["parallelism"] = new_parallelism
            logger.debug(
                f"Action {action}: Increase parallelism of {comp.get('name', f'comp_{component_idx}')} "
                f"from {current_parallelism} to {new_parallelism}"
            )

        elif action < 20:
            # Decrease parallelism of component
            component_idx = (action - 10) % num_components
            comp = arch.components[component_idx]
            current_parallelism = comp.get("parallelism", 1)
            new_parallelism = max(current_parallelism - 1, 1)
            comp["parallelism"] = new_parallelism
            logger.debug(
                f"Action {action}: Decrease parallelism of {comp.get('name', f'comp_{component_idx}')} "
                f"from {current_parallelism} to {new_parallelism}"
            )

        elif action < 30:
            # Change component type (simplified: toggle between batch/streaming)
            component_idx = (action - 20) % num_components
            comp = arch.components[component_idx]
            current_type = comp.get("type", "processor")
            if "streaming" in current_type.lower():
                new_type = "batch_processor"
            else:
                new_type = "streaming_processor"
            comp["type"] = new_type
            logger.debug(
                f"Action {action}: Change type of {comp.get('name', f'comp_{component_idx}')} "
                f"from {current_type} to {new_type}"
            )

        else:
            # Modify connection (simplified: toggle connection property)
            if len(arch.connections) > 0:
                connection_idx = (action - 30) % len(arch.connections)
                conn = arch.connections[connection_idx]
                # Toggle "batched" property
                current_batched = conn.get("batched", False)
                conn["batched"] = not current_batched
                logger.debug(
                    f"Action {action}: Toggle batched for connection "
                    f"{conn.get('from', '?')}->{conn.get('to', '?')} to {not current_batched}"
                )

        return modified_upir

    def compute_reward(
        self,
        metrics: Dict[str, float],
        spec: FormalSpecification,
        previous_metrics: Dict[str, float] = None
    ) -> float:
        """
        Compute reward from performance metrics and specification.

        Reward structure:
        - Base reward: 1.0
        - Constraint satisfaction: +0.1 per met constraint, -0.5 per violation
        - Performance improvement: +delta/target for improvements

        Rewards are clipped to [-1, 1] range for stability.

        Args:
            metrics: Current performance metrics
                - latency_p99: 99th percentile latency (ms)
                - throughput_qps: Queries per second
                - error_rate: Error rate (0-1)
                - cost: Deployment cost
            spec: Formal specification with constraints
            previous_metrics: Previous metrics for computing deltas

        Returns:
            Reward in [-1, 1]

        Example:
            >>> learner = ArchitectureLearner()
            >>> metrics = {"latency_p99": 100, "throughput_qps": 1000}
            >>> spec = FormalSpecification()
            >>> reward = learner.compute_reward(metrics, spec)
        """
        reward = 1.0  # Base reward

        # Check constraint satisfaction
        constraints_met = 0
        constraints_violated = 0

        # Check temporal properties for latency constraints
        for prop in spec.properties + spec.invariants:
            if prop.time_bound is not None:
                # Latency constraint
                target_latency = prop.time_bound
                actual_latency = metrics.get("latency_p99", float("inf"))

                if actual_latency <= target_latency:
                    constraints_met += 1
                    reward += 0.1
                else:
                    constraints_violated += 1
                    reward -= 0.5

        # Check throughput requirements (heuristic from predicates)
        has_throughput_req = any(
            "throughput" in prop.predicate.lower()
            for prop in spec.properties + spec.invariants
        )
        if has_throughput_req:
            # Assume target throughput of 1000 QPS (could be extracted from spec)
            target_throughput = 1000.0
            actual_throughput = metrics.get("throughput_qps", 0.0)

            if actual_throughput >= target_throughput:
                constraints_met += 1
                reward += 0.1
            else:
                constraints_violated += 1
                reward -= 0.5

        # Reward performance improvements over previous metrics
        if previous_metrics is not None:
            # Latency reduction (lower is better)
            prev_latency = previous_metrics.get("latency_p99", 0.0)
            curr_latency = metrics.get("latency_p99", 0.0)
            if prev_latency > 0:
                latency_delta = (prev_latency - curr_latency) / prev_latency
                reward += latency_delta  # Positive if latency reduced

            # Throughput increase (higher is better)
            prev_throughput = previous_metrics.get("throughput_qps", 0.0)
            curr_throughput = metrics.get("throughput_qps", 0.0)
            if prev_throughput > 0:
                throughput_delta = (curr_throughput - prev_throughput) / prev_throughput
                reward += throughput_delta  # Positive if throughput increased

        # Penalty for high error rate
        error_rate = metrics.get("error_rate", 0.0)
        if error_rate > 0.01:  # More than 1% errors
            reward -= error_rate * 10  # Heavy penalty

        # Clip reward to [-1, 1]
        reward = np.clip(reward, -1.0, 1.0)

        logger.debug(
            f"Computed reward: {reward:.3f} "
            f"(constraints_met={constraints_met}, violated={constraints_violated})"
        )

        return reward

    def learn_from_metrics(
        self,
        upir: UPIR,
        metrics: Dict[str, float],
        previous_metrics: Dict[str, float] = None
    ) -> UPIR:
        """
        Learn from performance metrics and return optimized architecture.

        Main entry point for architecture optimization. This method:
        1. Encodes current architecture as state
        2. Selects action using PPO policy
        3. Decodes action to modify architecture
        4. Computes reward from metrics
        5. Stores experience for learning
        6. Updates PPO policy when buffer is full
        7. Returns optimized architecture

        Args:
            upir: Current UPIR
            metrics: Performance metrics from deployment
            previous_metrics: Previous metrics for delta computation

        Returns:
            Optimized UPIR with modified architecture

        Example:
            >>> learner = ArchitectureLearner()
            >>> upir = UPIR(id="test", name="Test", description="Test")
            >>> metrics = {"latency_p99": 100, "throughput_qps": 1000}
            >>> optimized = learner.learn_from_metrics(upir, metrics)

        References:
        - PPO: Policy gradient with experience replay
        - TD Commons: Continuous optimization loop
        """
        # 1. Encode current architecture as state
        state = self.encode_state(upir)

        # 2. Select action using PPO policy
        action, log_prob, value = self.ppo.select_action(state)

        # 3. Decode action to modify architecture
        optimized_upir = self.decode_action(action, upir)

        # 4. Compute reward from metrics
        reward = self.compute_reward(
            metrics,
            upir.specification if upir.specification else FormalSpecification(),
            previous_metrics
        )

        # 5. Store experience
        # Check if this is terminal (could be based on convergence criteria)
        done = False  # For now, never terminal (continuous learning)

        experience = Experience(
            state=state,
            action=action,
            reward=reward,
            log_prob=log_prob,
            value=value,
            done=done
        )
        self.experience_buffer.append(experience)

        # 6. Update policy if we have enough experiences
        if len(self.experience_buffer) >= self.config.batch_size:
            self._update_policy()

        logger.info(
            f"Learning step: action={action}, reward={reward:.3f}, "
            f"buffer_size={len(self.experience_buffer)}"
        )

        return optimized_upir

    def _update_policy(self):
        """
        Update PPO policy using experiences from buffer.

        Extracts states, actions, rewards from buffer, computes advantages
        using GAE, and performs PPO update.
        """
        # Extract experiences
        states = np.array([exp.state for exp in self.experience_buffer])
        actions = np.array([exp.action for exp in self.experience_buffer])
        old_log_probs = np.array([exp.log_prob for exp in self.experience_buffer])
        rewards = np.array([exp.reward for exp in self.experience_buffer])
        values = np.array([exp.value for exp in self.experience_buffer])
        dones = np.array([exp.done for exp in self.experience_buffer], dtype=np.float32)

        # Compute advantages using GAE
        advantages, returns = self.ppo.compute_gae(rewards, values, dones)

        # Update PPO policy
        metrics = self.ppo.update(states, actions, old_log_probs, returns, advantages)

        logger.info(
            f"Policy update: policy_loss={metrics['policy_loss']:.4f}, "
            f"value_loss={metrics['value_loss']:.4f}, "
            f"entropy={metrics['entropy']:.4f}"
        )

        # Clear buffer after update
        self.experience_buffer.clear()

    def __str__(self) -> str:
        """String representation."""
        return (
            f"ArchitectureLearner(state_dim={self.state_dim}, "
            f"action_dim={self.action_dim}, "
            f"buffer={len(self.experience_buffer)}/{self.experience_buffer.maxlen})"
        )

    def __repr__(self) -> str:
        """Developer-friendly representation."""
        return (
            f"ArchitectureLearner(state_dim={self.state_dim}, "
            f"action_dim={self.action_dim}, "
            f"config={self.config})"
        )

Functions

__init__(state_dim=64, action_dim=40, config=None, buffer_size=1000)

Initialize architecture learner.

Parameters:

Name Type Description Default
state_dim int

Dimension of state encoding (default 64)

64
action_dim int

Number of possible actions (default 40)

40
config PPOConfig

PPO configuration (uses defaults if None)

None
buffer_size int

Maximum size of experience buffer

1000
Source code in upir/learning/learner.py
def __init__(
    self,
    state_dim: int = 64,
    action_dim: int = 40,
    config: PPOConfig = None,
    buffer_size: int = 1000
):
    """
    Initialize architecture learner.

    Args:
        state_dim: Dimension of state encoding (default 64)
        action_dim: Number of possible actions (default 40)
        config: PPO configuration (uses defaults if None)
        buffer_size: Maximum size of experience buffer
    """
    self.state_dim = state_dim
    self.action_dim = action_dim
    self.config = config or PPOConfig()

    # Initialize PPO agent
    self.ppo = PPO(state_dim=state_dim, action_dim=action_dim, config=self.config)

    # Experience buffer for training
    self.experience_buffer: Deque[Experience] = deque(maxlen=buffer_size)

    # Feature normalization stats (updated online)
    self.feature_stats = {
        "num_components_max": 100.0,
        "num_connections_max": 200.0,
        "avg_latency_max": 10000.0,  # 10 seconds in ms
        "total_throughput_max": 100000.0,  # 100k QPS
        "complexity_max": 1000.0,
    }

    logger.info(
        f"Initialized ArchitectureLearner: state_dim={state_dim}, "
        f"action_dim={action_dim}, buffer_size={buffer_size}"
    )

encode_state(upir)

Encode UPIR architecture as fixed-size state vector.

Extracts architectural features and normalizes to [0, 1] range: - Number of components - Number of connections - Average component latency - Total throughput capacity - Deployment complexity score

The state is padded to fixed size (state_dim) for consistent input.

Parameters:

Name Type Description Default
upir UPIR

UPIR to encode

required

Returns:

Type Description
ndarray

State vector (state_dim,) with values in [0, 1]

Example

learner = ArchitectureLearner() upir = UPIR(id="test", name="Test", description="Test") state = learner.encode_state(upir) state.shape (64,) assert np.all((state >= 0) & (state <= 1))

Source code in upir/learning/learner.py
def encode_state(self, upir: UPIR) -> np.ndarray:
    """
    Encode UPIR architecture as fixed-size state vector.

    Extracts architectural features and normalizes to [0, 1] range:
    - Number of components
    - Number of connections
    - Average component latency
    - Total throughput capacity
    - Deployment complexity score

    The state is padded to fixed size (state_dim) for consistent input.

    Args:
        upir: UPIR to encode

    Returns:
        State vector (state_dim,) with values in [0, 1]

    Example:
        >>> learner = ArchitectureLearner()
        >>> upir = UPIR(id="test", name="Test", description="Test")
        >>> state = learner.encode_state(upir)
        >>> state.shape
        (64,)
        >>> assert np.all((state >= 0) & (state <= 1))
    """
    if upir.architecture is None:
        # No architecture - return zero state
        return np.zeros(self.state_dim)

    arch = upir.architecture

    # Extract basic features
    num_components = len(arch.components)
    num_connections = len(arch.connections)

    # Compute average latency (if components have latency info)
    total_latency = 0.0
    latency_count = 0
    for comp in arch.components:
        if isinstance(comp, dict) and "latency_ms" in comp:
            total_latency += comp["latency_ms"]
            latency_count += 1
    avg_latency = total_latency / latency_count if latency_count > 0 else 0.0

    # Compute total throughput (if components have throughput info)
    total_throughput = 0.0
    for comp in arch.components:
        if isinstance(comp, dict) and "throughput_qps" in comp:
            total_throughput += comp["throughput_qps"]

    # Compute deployment complexity (simple heuristic)
    # Based on number of components and connections
    complexity = num_components * 10 + num_connections * 5

    # Normalize features to [0, 1]
    features = np.array([
        num_components / self.feature_stats["num_components_max"],
        num_connections / self.feature_stats["num_connections_max"],
        avg_latency / self.feature_stats["avg_latency_max"],
        total_throughput / self.feature_stats["total_throughput_max"],
        complexity / self.feature_stats["complexity_max"],
    ])

    # Clip to [0, 1] range
    features = np.clip(features, 0.0, 1.0)

    # Add per-component features (latency, throughput, parallelism)
    component_features = []
    for comp in arch.components[:10]:  # Max 10 components
        if isinstance(comp, dict):
            comp_latency = comp.get("latency_ms", 0.0)
            comp_throughput = comp.get("throughput_qps", 0.0)
            comp_parallelism = comp.get("parallelism", 1)

            component_features.extend([
                comp_latency / self.feature_stats["avg_latency_max"],
                comp_throughput / self.feature_stats["total_throughput_max"],
                comp_parallelism / 100.0,  # Normalize to [0, 1] assuming max 100
            ])

    # Pad to fixed size
    all_features = np.concatenate([features, component_features])

    # Clip all features to [0, 1] range
    all_features = np.clip(all_features, 0.0, 1.0)
    if len(all_features) < self.state_dim:
        # Pad with zeros
        all_features = np.pad(
            all_features,
            (0, self.state_dim - len(all_features)),
            mode="constant"
        )
    else:
        # Truncate if too long
        all_features = all_features[:self.state_dim]

    return all_features.astype(np.float32)

decode_action(action, upir)

Decode action and apply architectural modification.

Actions represent different architectural changes: - 0-9: Increase parallelism of component i - 10-19: Decrease parallelism of component i - 20-29: Change component type (e.g., batch -> streaming) - 30-39: Modify connection (add/remove)

Parameters:

Name Type Description Default
action int

Action index (0 to action_dim-1)

required
upir UPIR

Current UPIR

required

Returns:

Type Description
UPIR

Modified UPIR with architectural change applied

Example

learner = ArchitectureLearner() upir = UPIR(id="test", name="Test", description="Test") modified = learner.decode_action(0, upir)

Source code in upir/learning/learner.py
def decode_action(self, action: int, upir: UPIR) -> UPIR:
    """
    Decode action and apply architectural modification.

    Actions represent different architectural changes:
    - 0-9: Increase parallelism of component i
    - 10-19: Decrease parallelism of component i
    - 20-29: Change component type (e.g., batch -> streaming)
    - 30-39: Modify connection (add/remove)

    Args:
        action: Action index (0 to action_dim-1)
        upir: Current UPIR

    Returns:
        Modified UPIR with architectural change applied

    Example:
        >>> learner = ArchitectureLearner()
        >>> upir = UPIR(id="test", name="Test", description="Test")
        >>> modified = learner.decode_action(0, upir)
    """
    # Create copy to avoid modifying original
    modified_upir = copy.deepcopy(upir)

    if modified_upir.architecture is None:
        logger.warning("No architecture to modify")
        return modified_upir

    arch = modified_upir.architecture
    num_components = len(arch.components)

    if num_components == 0:
        logger.warning("No components to modify")
        return modified_upir

    # Decode action type and target
    if action < 10:
        # Increase parallelism of component (action % num_components)
        component_idx = action % num_components
        comp = arch.components[component_idx]
        current_parallelism = comp.get("parallelism", 1)
        new_parallelism = min(current_parallelism + 1, 100)
        comp["parallelism"] = new_parallelism
        logger.debug(
            f"Action {action}: Increase parallelism of {comp.get('name', f'comp_{component_idx}')} "
            f"from {current_parallelism} to {new_parallelism}"
        )

    elif action < 20:
        # Decrease parallelism of component
        component_idx = (action - 10) % num_components
        comp = arch.components[component_idx]
        current_parallelism = comp.get("parallelism", 1)
        new_parallelism = max(current_parallelism - 1, 1)
        comp["parallelism"] = new_parallelism
        logger.debug(
            f"Action {action}: Decrease parallelism of {comp.get('name', f'comp_{component_idx}')} "
            f"from {current_parallelism} to {new_parallelism}"
        )

    elif action < 30:
        # Change component type (simplified: toggle between batch/streaming)
        component_idx = (action - 20) % num_components
        comp = arch.components[component_idx]
        current_type = comp.get("type", "processor")
        if "streaming" in current_type.lower():
            new_type = "batch_processor"
        else:
            new_type = "streaming_processor"
        comp["type"] = new_type
        logger.debug(
            f"Action {action}: Change type of {comp.get('name', f'comp_{component_idx}')} "
            f"from {current_type} to {new_type}"
        )

    else:
        # Modify connection (simplified: toggle connection property)
        if len(arch.connections) > 0:
            connection_idx = (action - 30) % len(arch.connections)
            conn = arch.connections[connection_idx]
            # Toggle "batched" property
            current_batched = conn.get("batched", False)
            conn["batched"] = not current_batched
            logger.debug(
                f"Action {action}: Toggle batched for connection "
                f"{conn.get('from', '?')}->{conn.get('to', '?')} to {not current_batched}"
            )

    return modified_upir

compute_reward(metrics, spec, previous_metrics=None)

Compute reward from performance metrics and specification.

Reward structure: - Base reward: 1.0 - Constraint satisfaction: +0.1 per met constraint, -0.5 per violation - Performance improvement: +delta/target for improvements

Rewards are clipped to [-1, 1] range for stability.

Parameters:

Name Type Description Default
metrics Dict[str, float]

Current performance metrics - latency_p99: 99th percentile latency (ms) - throughput_qps: Queries per second - error_rate: Error rate (0-1) - cost: Deployment cost

required
spec FormalSpecification

Formal specification with constraints

required
previous_metrics Dict[str, float]

Previous metrics for computing deltas

None

Returns:

Type Description
float

Reward in [-1, 1]

Example

learner = ArchitectureLearner() metrics = {"latency_p99": 100, "throughput_qps": 1000} spec = FormalSpecification() reward = learner.compute_reward(metrics, spec)

Source code in upir/learning/learner.py
def compute_reward(
    self,
    metrics: Dict[str, float],
    spec: FormalSpecification,
    previous_metrics: Dict[str, float] = None
) -> float:
    """
    Compute reward from performance metrics and specification.

    Reward structure:
    - Base reward: 1.0
    - Constraint satisfaction: +0.1 per met constraint, -0.5 per violation
    - Performance improvement: +delta/target for improvements

    Rewards are clipped to [-1, 1] range for stability.

    Args:
        metrics: Current performance metrics
            - latency_p99: 99th percentile latency (ms)
            - throughput_qps: Queries per second
            - error_rate: Error rate (0-1)
            - cost: Deployment cost
        spec: Formal specification with constraints
        previous_metrics: Previous metrics for computing deltas

    Returns:
        Reward in [-1, 1]

    Example:
        >>> learner = ArchitectureLearner()
        >>> metrics = {"latency_p99": 100, "throughput_qps": 1000}
        >>> spec = FormalSpecification()
        >>> reward = learner.compute_reward(metrics, spec)
    """
    reward = 1.0  # Base reward

    # Check constraint satisfaction
    constraints_met = 0
    constraints_violated = 0

    # Check temporal properties for latency constraints
    for prop in spec.properties + spec.invariants:
        if prop.time_bound is not None:
            # Latency constraint
            target_latency = prop.time_bound
            actual_latency = metrics.get("latency_p99", float("inf"))

            if actual_latency <= target_latency:
                constraints_met += 1
                reward += 0.1
            else:
                constraints_violated += 1
                reward -= 0.5

    # Check throughput requirements (heuristic from predicates)
    has_throughput_req = any(
        "throughput" in prop.predicate.lower()
        for prop in spec.properties + spec.invariants
    )
    if has_throughput_req:
        # Assume target throughput of 1000 QPS (could be extracted from spec)
        target_throughput = 1000.0
        actual_throughput = metrics.get("throughput_qps", 0.0)

        if actual_throughput >= target_throughput:
            constraints_met += 1
            reward += 0.1
        else:
            constraints_violated += 1
            reward -= 0.5

    # Reward performance improvements over previous metrics
    if previous_metrics is not None:
        # Latency reduction (lower is better)
        prev_latency = previous_metrics.get("latency_p99", 0.0)
        curr_latency = metrics.get("latency_p99", 0.0)
        if prev_latency > 0:
            latency_delta = (prev_latency - curr_latency) / prev_latency
            reward += latency_delta  # Positive if latency reduced

        # Throughput increase (higher is better)
        prev_throughput = previous_metrics.get("throughput_qps", 0.0)
        curr_throughput = metrics.get("throughput_qps", 0.0)
        if prev_throughput > 0:
            throughput_delta = (curr_throughput - prev_throughput) / prev_throughput
            reward += throughput_delta  # Positive if throughput increased

    # Penalty for high error rate
    error_rate = metrics.get("error_rate", 0.0)
    if error_rate > 0.01:  # More than 1% errors
        reward -= error_rate * 10  # Heavy penalty

    # Clip reward to [-1, 1]
    reward = np.clip(reward, -1.0, 1.0)

    logger.debug(
        f"Computed reward: {reward:.3f} "
        f"(constraints_met={constraints_met}, violated={constraints_violated})"
    )

    return reward

learn_from_metrics(upir, metrics, previous_metrics=None)

Learn from performance metrics and return optimized architecture.

Main entry point for architecture optimization. This method: 1. Encodes current architecture as state 2. Selects action using PPO policy 3. Decodes action to modify architecture 4. Computes reward from metrics 5. Stores experience for learning 6. Updates PPO policy when buffer is full 7. Returns optimized architecture

Parameters:

Name Type Description Default
upir UPIR

Current UPIR

required
metrics Dict[str, float]

Performance metrics from deployment

required
previous_metrics Dict[str, float]

Previous metrics for delta computation

None

Returns:

Type Description
UPIR

Optimized UPIR with modified architecture

Example

learner = ArchitectureLearner() upir = UPIR(id="test", name="Test", description="Test") metrics = {"latency_p99": 100, "throughput_qps": 1000} optimized = learner.learn_from_metrics(upir, metrics)

References: - PPO: Policy gradient with experience replay - TD Commons: Continuous optimization loop

Source code in upir/learning/learner.py
def learn_from_metrics(
    self,
    upir: UPIR,
    metrics: Dict[str, float],
    previous_metrics: Dict[str, float] = None
) -> UPIR:
    """
    Learn from performance metrics and return optimized architecture.

    Main entry point for architecture optimization. This method:
    1. Encodes current architecture as state
    2. Selects action using PPO policy
    3. Decodes action to modify architecture
    4. Computes reward from metrics
    5. Stores experience for learning
    6. Updates PPO policy when buffer is full
    7. Returns optimized architecture

    Args:
        upir: Current UPIR
        metrics: Performance metrics from deployment
        previous_metrics: Previous metrics for delta computation

    Returns:
        Optimized UPIR with modified architecture

    Example:
        >>> learner = ArchitectureLearner()
        >>> upir = UPIR(id="test", name="Test", description="Test")
        >>> metrics = {"latency_p99": 100, "throughput_qps": 1000}
        >>> optimized = learner.learn_from_metrics(upir, metrics)

    References:
    - PPO: Policy gradient with experience replay
    - TD Commons: Continuous optimization loop
    """
    # 1. Encode current architecture as state
    state = self.encode_state(upir)

    # 2. Select action using PPO policy
    action, log_prob, value = self.ppo.select_action(state)

    # 3. Decode action to modify architecture
    optimized_upir = self.decode_action(action, upir)

    # 4. Compute reward from metrics
    reward = self.compute_reward(
        metrics,
        upir.specification if upir.specification else FormalSpecification(),
        previous_metrics
    )

    # 5. Store experience
    # Check if this is terminal (could be based on convergence criteria)
    done = False  # For now, never terminal (continuous learning)

    experience = Experience(
        state=state,
        action=action,
        reward=reward,
        log_prob=log_prob,
        value=value,
        done=done
    )
    self.experience_buffer.append(experience)

    # 6. Update policy if we have enough experiences
    if len(self.experience_buffer) >= self.config.batch_size:
        self._update_policy()

    logger.info(
        f"Learning step: action={action}, reward={reward:.3f}, "
        f"buffer_size={len(self.experience_buffer)}"
    )

    return optimized_upir

__str__()

String representation.

Source code in upir/learning/learner.py
def __str__(self) -> str:
    """String representation."""
    return (
        f"ArchitectureLearner(state_dim={self.state_dim}, "
        f"action_dim={self.action_dim}, "
        f"buffer={len(self.experience_buffer)}/{self.experience_buffer.maxlen})"
    )

__repr__()

Developer-friendly representation.

Source code in upir/learning/learner.py
def __repr__(self) -> str:
    """Developer-friendly representation."""
    return (
        f"ArchitectureLearner(state_dim={self.state_dim}, "
        f"action_dim={self.action_dim}, "
        f"config={self.config})"
    )

Usage Example

from upir import UPIR
from upir.learning.learner import ArchitectureLearner

# Create learner
learner = ArchitectureLearner(
    upir,
    learning_rate=0.0003,
    gamma=0.99
)

# Simulate production metrics
metrics = {
    "latency_p99": 85.0,
    "monthly_cost": 4500.0,
    "throughput_qps": 12000.0
}

# Learn to optimize
optimized_upir = learner.learn(
    metrics,
    episodes=100,
    steps_per_episode=50
)

# Compare
print(f"Original cost: ${upir.architecture.total_cost}")
print(f"Optimized cost: ${optimized_upir.architecture.total_cost}")

See Also

  • PPO - PPO implementation