Skip to content

Pattern Extractor

Extract architectural patterns from UPIR instances.


Overview

The PatternExtractor extracts reusable patterns from verified architectures using feature extraction and clustering.


Class Documentation

upir.patterns.extractor.PatternExtractor

Extract architectural patterns from UPIR instances using clustering.

Discovers common architectural patterns by: 1. Extracting features from each UPIR architecture 2. Clustering similar architectures using KMeans 3. Analyzing each cluster to identify common structure 4. Creating pattern templates from clusters

The extractor can identify patterns like "streaming ETL", "API gateway", "batch processing", etc., based on structural similarities.

Attributes:

Name Type Description
n_clusters

Number of clusters for KMeans (default 10)

scaler

StandardScaler for feature normalization

kmeans

KMeans clustering model

feature_dim

Dimension of feature vectors

Example

extractor = PatternExtractor(n_clusters=5) upirs = [...] # List of UPIR instances patterns = extractor.discover_patterns(upirs) for pattern in patterns: ... print(f"{pattern.name}: {len(pattern.instances)} instances")

References: - KMeans: Partitional clustering algorithm - TD Commons: Pattern extraction for architecture optimization - Feature engineering for architectural analysis

Source code in upir/patterns/extractor.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
class PatternExtractor:
    """
    Extract architectural patterns from UPIR instances using clustering.

    Discovers common architectural patterns by:
    1. Extracting features from each UPIR architecture
    2. Clustering similar architectures using KMeans
    3. Analyzing each cluster to identify common structure
    4. Creating pattern templates from clusters

    The extractor can identify patterns like "streaming ETL", "API gateway",
    "batch processing", etc., based on structural similarities.

    Attributes:
        n_clusters: Number of clusters for KMeans (default 10)
        scaler: StandardScaler for feature normalization
        kmeans: KMeans clustering model
        feature_dim: Dimension of feature vectors

    Example:
        >>> extractor = PatternExtractor(n_clusters=5)
        >>> upirs = [...]  # List of UPIR instances
        >>> patterns = extractor.discover_patterns(upirs)
        >>> for pattern in patterns:
        ...     print(f"{pattern.name}: {len(pattern.instances)} instances")

    References:
    - KMeans: Partitional clustering algorithm
    - TD Commons: Pattern extraction for architecture optimization
    - Feature engineering for architectural analysis
    """

    def __init__(self, n_clusters: int = 10, feature_dim: int = 32):
        """
        Initialize pattern extractor.

        Args:
            n_clusters: Number of patterns to discover
            feature_dim: Dimension of feature vectors (default 32)
        """
        self.n_clusters = n_clusters
        self.feature_dim = feature_dim
        self.scaler = StandardScaler()
        self.kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)

        logger.info(
            f"Initialized PatternExtractor: n_clusters={n_clusters}, "
            f"feature_dim={feature_dim}"
        )

    def extract_features(self, upir: UPIR) -> np.ndarray:
        """
        Extract feature vector from UPIR architecture.

        Extracts architectural features and creates a fixed-size normalized
        feature vector suitable for clustering. Features include:
        - Component count
        - Connection density (connections / components)
        - Deployment type (one-hot encoding)
        - Component type distribution
        - Constraint profile (latency, throughput requirements)

        Args:
            upir: UPIR instance to extract features from

        Returns:
            Feature vector (feature_dim,) normalized to [0, 1]

        Example:
            >>> extractor = PatternExtractor()
            >>> upir = UPIR(id="test", name="Test", description="Test")
            >>> features = extractor.extract_features(upir)
            >>> features.shape
            (32,)
        """
        features = []

        if upir.architecture is None:
            # No architecture - return zero vector
            return np.zeros(self.feature_dim)

        arch = upir.architecture

        # Basic structural features
        num_components = len(arch.components)
        num_connections = len(arch.connections)

        features.append(num_components)
        features.append(num_connections)

        # Connection density
        connection_density = num_connections / max(num_components, 1)
        features.append(connection_density)

        # Component type distribution (one-hot for common types)
        component_types = Counter()
        for comp in arch.components:
            if isinstance(comp, dict):
                comp_type = comp.get("type", "unknown")
            else:
                comp_type = "unknown"
            component_types[comp_type] += 1

        # Top component types (streaming, batch, api, database, cache)
        common_types = ["streaming", "batch", "api", "database", "cache"]
        for comp_type in common_types:
            count = sum(component_types[k] for k in component_types if comp_type in k.lower())
            features.append(count / max(num_components, 1))

        # Deployment pattern (based on deployment config)
        deployment_types = ["single_region", "multi_region", "distributed", "serverless"]
        deployment = arch.deployment if hasattr(arch, 'deployment') else {}
        if isinstance(deployment, dict):
            deployment_type = deployment.get("type", "unknown")
        else:
            deployment_type = "unknown"

        for dep_type in deployment_types:
            features.append(1.0 if dep_type in deployment_type.lower() else 0.0)

        # Constraint profile from specification
        if upir.specification:
            spec = upir.specification

            # Latency constraints
            has_latency = any(
                prop.time_bound is not None
                for prop in spec.properties + spec.invariants
            )
            features.append(1.0 if has_latency else 0.0)

            # Min latency requirement (normalized)
            min_latency = min(
                (prop.time_bound for prop in spec.properties + spec.invariants
                 if prop.time_bound is not None),
                default=0
            )
            features.append(min_latency / 10000.0)  # Normalize to [0, 1]

            # Throughput requirements (heuristic from predicates)
            has_throughput = any(
                "throughput" in prop.predicate.lower() or "qps" in prop.predicate.lower()
                for prop in spec.properties + spec.invariants
            )
            features.append(1.0 if has_throughput else 0.0)

            # Number of constraints
            num_constraints = len(spec.properties) + len(spec.invariants)
            features.append(num_constraints / 10.0)  # Normalize
        else:
            # No specification - add zeros
            features.extend([0.0] * 4)

        # Pad or truncate to fixed size
        features = np.array(features)
        if len(features) < self.feature_dim:
            features = np.pad(features, (0, self.feature_dim - len(features)), mode='constant')
        else:
            features = features[:self.feature_dim]

        # Clip to [0, 1] for numerical stability
        features = np.clip(features, 0.0, 1.0)

        return features.astype(np.float32)

    def cluster_architectures(self, upirs: List[UPIR]) -> Dict[int, List[UPIR]]:
        """
        Cluster similar architectures using KMeans.

        Groups UPIRs with similar architectural features into clusters.
        Each cluster represents a potential pattern.

        Args:
            upirs: List of UPIR instances to cluster

        Returns:
            Dictionary mapping cluster_id -> list of UPIRs in that cluster

        Example:
            >>> extractor = PatternExtractor(n_clusters=3)
            >>> upirs = [...]  # List of UPIRs
            >>> clusters = extractor.cluster_architectures(upirs)
            >>> for cluster_id, cluster_upirs in clusters.items():
            ...     print(f"Cluster {cluster_id}: {len(cluster_upirs)} UPIRs")

        References:
        - KMeans: Minimizes within-cluster sum of squares
        - Feature normalization improves clustering quality
        """
        if not upirs:
            logger.warning("Empty UPIR list provided for clustering")
            return {}

        if len(upirs) < self.n_clusters:
            logger.warning(
                f"Only {len(upirs)} UPIRs but {self.n_clusters} clusters requested. "
                f"Using {len(upirs)} clusters instead."
            )
            self.n_clusters = len(upirs)
            self.kmeans = KMeans(n_clusters=self.n_clusters, random_state=42, n_init=10)

        # Extract features from all UPIRs
        feature_matrix = np.array([self.extract_features(upir) for upir in upirs])

        # Normalize features
        feature_matrix_normalized = self.scaler.fit_transform(feature_matrix)

        # Cluster
        labels = self.kmeans.fit_predict(feature_matrix_normalized)

        # Group UPIRs by cluster
        clusters = {}
        for cluster_id in range(self.n_clusters):
            clusters[cluster_id] = []

        for upir, label in zip(upirs, labels):
            clusters[label].append(upir)

        logger.info(
            f"Clustered {len(upirs)} UPIRs into {self.n_clusters} clusters. "
            f"Sizes: {[len(clusters[i]) for i in range(self.n_clusters)]}"
        )

        return clusters

    def extract_pattern(self, cluster: List[UPIR], cluster_id: int) -> Pattern:
        """
        Extract common pattern from a cluster of similar architectures.

        Analyzes UPIRs in the cluster to identify common structure and
        creates a reusable pattern template.

        Args:
            cluster: List of UPIRs in the cluster
            cluster_id: Cluster identifier

        Returns:
            Pattern extracted from the cluster

        Example:
            >>> extractor = PatternExtractor()
            >>> cluster_upirs = [...]  # UPIRs from same cluster
            >>> pattern = extractor.extract_pattern(cluster_upirs, cluster_id=0)
            >>> print(pattern.name)

        References:
        - Mode for categorical features
        - Average for numerical features
        - Template parameterization
        """
        if not cluster:
            # Empty cluster - return empty pattern
            return Pattern(
                id=f"pattern-{cluster_id}",
                name=f"Pattern {cluster_id}",
                description="Empty pattern",
                template={}
            )

        # Collect statistics from cluster
        component_types = Counter()
        connection_counts = []
        component_counts = []
        has_specifications = 0
        performance_metrics = []

        for upir in cluster:
            if upir.architecture:
                arch = upir.architecture

                # Count components by type
                for comp in arch.components:
                    if isinstance(comp, dict):
                        comp_type = comp.get("type", "unknown")
                        component_types[comp_type] += 1

                component_counts.append(len(arch.components))
                connection_counts.append(len(arch.connections))

            if upir.specification:
                has_specifications += 1

        # Identify most common component types
        top_types = component_types.most_common(5)

        # Create template structure
        template_components = []
        for comp_type, count in top_types:
            template_components.append({
                "type": comp_type,
                "count": count / len(cluster),  # Average count per UPIR
                "properties": {}
            })

        # Average metrics
        avg_components = np.mean(component_counts) if component_counts else 0
        avg_connections = np.mean(connection_counts) if connection_counts else 0

        # Compute pattern centroid for matching
        feature_matrix = np.array([self.extract_features(upir) for upir in cluster])
        centroid = np.mean(feature_matrix, axis=0).tolist()

        # Create pattern name (heuristic from top component type)
        if top_types:
            top_type = top_types[0][0]
            pattern_name = f"{top_type.replace('_', ' ').title()} Pattern"
        else:
            pattern_name = f"Pattern {cluster_id}"

        # Create description
        description = (
            f"Architectural pattern with {avg_components:.1f} components "
            f"and {avg_connections:.1f} connections on average. "
            f"Common component types: {', '.join(t for t, _ in top_types[:3])}. "
            f"Specification coverage: {has_specifications}/{len(cluster)}."
        )

        # Create pattern
        pattern = Pattern(
            id=f"pattern-{cluster_id}",
            name=pattern_name,
            description=description,
            template={
                "components": template_components,
                "parameters": {
                    "avg_component_count": avg_components,
                    "avg_connection_count": avg_connections,
                },
                "centroid": centroid,
            },
            instances=[upir.id for upir in cluster],
            success_rate=has_specifications / len(cluster) if cluster else 0.0,
            average_performance={}
        )

        logger.debug(
            f"Extracted pattern: {pattern.name} from {len(cluster)} UPIRs"
        )

        return pattern

    def discover_patterns(self, upirs: List[UPIR]) -> List[Pattern]:
        """
        Discover architectural patterns from UPIR instances.

        Main entry point for pattern extraction. Performs clustering and
        pattern extraction in one pipeline.

        Args:
            upirs: List of UPIR instances to analyze

        Returns:
            List of discovered patterns

        Example:
            >>> extractor = PatternExtractor(n_clusters=5)
            >>> upirs = [...]  # Collection of UPIR instances
            >>> patterns = extractor.discover_patterns(upirs)
            >>> print(f"Discovered {len(patterns)} patterns")
            >>> for pattern in patterns:
            ...     print(f"  - {pattern.name}: {len(pattern.instances)} instances")

        References:
        - TD Commons: Pattern-based architecture optimization
        - Unsupervised learning for pattern discovery
        """
        if not upirs:
            logger.warning("No UPIRs provided for pattern discovery")
            return []

        logger.info(f"Starting pattern discovery for {len(upirs)} UPIRs")

        # Cluster architectures
        clusters = self.cluster_architectures(upirs)

        # Extract pattern from each cluster
        patterns = []
        for cluster_id, cluster_upirs in clusters.items():
            if cluster_upirs:  # Skip empty clusters
                pattern = self.extract_pattern(cluster_upirs, cluster_id)
                patterns.append(pattern)

        # Sort patterns by instance count (descending)
        patterns.sort(key=lambda p: len(p.instances), reverse=True)

        logger.info(
            f"Discovered {len(patterns)} patterns. "
            f"Top pattern: {patterns[0].name} with {len(patterns[0].instances)} instances"
            if patterns else "No patterns discovered"
        )

        return patterns

    def classify_upir(self, upir: UPIR) -> int:
        """
        Classify a UPIR into an existing cluster/pattern.

        Uses the trained KMeans model to predict which cluster a new UPIR
        belongs to.

        Args:
            upir: UPIR to classify

        Returns:
            Cluster ID (0 to n_clusters-1)

        Example:
            >>> extractor = PatternExtractor()
            >>> extractor.discover_patterns(training_upirs)
            >>> new_upir = UPIR(...)
            >>> cluster_id = extractor.classify_upir(new_upir)
        """
        features = self.extract_features(upir)
        features_normalized = self.scaler.transform(features.reshape(1, -1))
        cluster_id = self.kmeans.predict(features_normalized)[0]
        return cluster_id

    def __str__(self) -> str:
        """String representation."""
        return f"PatternExtractor(n_clusters={self.n_clusters})"

    def __repr__(self) -> str:
        """Developer-friendly representation."""
        return (
            f"PatternExtractor(n_clusters={self.n_clusters}, "
            f"feature_dim={self.feature_dim})"
        )

Functions

__init__(n_clusters=10, feature_dim=32)

Initialize pattern extractor.

Parameters:

Name Type Description Default
n_clusters int

Number of patterns to discover

10
feature_dim int

Dimension of feature vectors (default 32)

32
Source code in upir/patterns/extractor.py
def __init__(self, n_clusters: int = 10, feature_dim: int = 32):
    """
    Initialize pattern extractor.

    Args:
        n_clusters: Number of patterns to discover
        feature_dim: Dimension of feature vectors (default 32)
    """
    self.n_clusters = n_clusters
    self.feature_dim = feature_dim
    self.scaler = StandardScaler()
    self.kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)

    logger.info(
        f"Initialized PatternExtractor: n_clusters={n_clusters}, "
        f"feature_dim={feature_dim}"
    )

extract_features(upir)

Extract feature vector from UPIR architecture.

Extracts architectural features and creates a fixed-size normalized feature vector suitable for clustering. Features include: - Component count - Connection density (connections / components) - Deployment type (one-hot encoding) - Component type distribution - Constraint profile (latency, throughput requirements)

Parameters:

Name Type Description Default
upir UPIR

UPIR instance to extract features from

required

Returns:

Type Description
ndarray

Feature vector (feature_dim,) normalized to [0, 1]

Example

extractor = PatternExtractor() upir = UPIR(id="test", name="Test", description="Test") features = extractor.extract_features(upir) features.shape (32,)

Source code in upir/patterns/extractor.py
def extract_features(self, upir: UPIR) -> np.ndarray:
    """
    Extract feature vector from UPIR architecture.

    Extracts architectural features and creates a fixed-size normalized
    feature vector suitable for clustering. Features include:
    - Component count
    - Connection density (connections / components)
    - Deployment type (one-hot encoding)
    - Component type distribution
    - Constraint profile (latency, throughput requirements)

    Args:
        upir: UPIR instance to extract features from

    Returns:
        Feature vector (feature_dim,) normalized to [0, 1]

    Example:
        >>> extractor = PatternExtractor()
        >>> upir = UPIR(id="test", name="Test", description="Test")
        >>> features = extractor.extract_features(upir)
        >>> features.shape
        (32,)
    """
    features = []

    if upir.architecture is None:
        # No architecture - return zero vector
        return np.zeros(self.feature_dim)

    arch = upir.architecture

    # Basic structural features
    num_components = len(arch.components)
    num_connections = len(arch.connections)

    features.append(num_components)
    features.append(num_connections)

    # Connection density
    connection_density = num_connections / max(num_components, 1)
    features.append(connection_density)

    # Component type distribution (one-hot for common types)
    component_types = Counter()
    for comp in arch.components:
        if isinstance(comp, dict):
            comp_type = comp.get("type", "unknown")
        else:
            comp_type = "unknown"
        component_types[comp_type] += 1

    # Top component types (streaming, batch, api, database, cache)
    common_types = ["streaming", "batch", "api", "database", "cache"]
    for comp_type in common_types:
        count = sum(component_types[k] for k in component_types if comp_type in k.lower())
        features.append(count / max(num_components, 1))

    # Deployment pattern (based on deployment config)
    deployment_types = ["single_region", "multi_region", "distributed", "serverless"]
    deployment = arch.deployment if hasattr(arch, 'deployment') else {}
    if isinstance(deployment, dict):
        deployment_type = deployment.get("type", "unknown")
    else:
        deployment_type = "unknown"

    for dep_type in deployment_types:
        features.append(1.0 if dep_type in deployment_type.lower() else 0.0)

    # Constraint profile from specification
    if upir.specification:
        spec = upir.specification

        # Latency constraints
        has_latency = any(
            prop.time_bound is not None
            for prop in spec.properties + spec.invariants
        )
        features.append(1.0 if has_latency else 0.0)

        # Min latency requirement (normalized)
        min_latency = min(
            (prop.time_bound for prop in spec.properties + spec.invariants
             if prop.time_bound is not None),
            default=0
        )
        features.append(min_latency / 10000.0)  # Normalize to [0, 1]

        # Throughput requirements (heuristic from predicates)
        has_throughput = any(
            "throughput" in prop.predicate.lower() or "qps" in prop.predicate.lower()
            for prop in spec.properties + spec.invariants
        )
        features.append(1.0 if has_throughput else 0.0)

        # Number of constraints
        num_constraints = len(spec.properties) + len(spec.invariants)
        features.append(num_constraints / 10.0)  # Normalize
    else:
        # No specification - add zeros
        features.extend([0.0] * 4)

    # Pad or truncate to fixed size
    features = np.array(features)
    if len(features) < self.feature_dim:
        features = np.pad(features, (0, self.feature_dim - len(features)), mode='constant')
    else:
        features = features[:self.feature_dim]

    # Clip to [0, 1] for numerical stability
    features = np.clip(features, 0.0, 1.0)

    return features.astype(np.float32)

cluster_architectures(upirs)

Cluster similar architectures using KMeans.

Groups UPIRs with similar architectural features into clusters. Each cluster represents a potential pattern.

Parameters:

Name Type Description Default
upirs List[UPIR]

List of UPIR instances to cluster

required

Returns:

Type Description
Dict[int, List[UPIR]]

Dictionary mapping cluster_id -> list of UPIRs in that cluster

Example

extractor = PatternExtractor(n_clusters=3) upirs = [...] # List of UPIRs clusters = extractor.cluster_architectures(upirs) for cluster_id, cluster_upirs in clusters.items(): ... print(f"Cluster {cluster_id}: {len(cluster_upirs)} UPIRs")

References: - KMeans: Minimizes within-cluster sum of squares - Feature normalization improves clustering quality

Source code in upir/patterns/extractor.py
def cluster_architectures(self, upirs: List[UPIR]) -> Dict[int, List[UPIR]]:
    """
    Cluster similar architectures using KMeans.

    Groups UPIRs with similar architectural features into clusters.
    Each cluster represents a potential pattern.

    Args:
        upirs: List of UPIR instances to cluster

    Returns:
        Dictionary mapping cluster_id -> list of UPIRs in that cluster

    Example:
        >>> extractor = PatternExtractor(n_clusters=3)
        >>> upirs = [...]  # List of UPIRs
        >>> clusters = extractor.cluster_architectures(upirs)
        >>> for cluster_id, cluster_upirs in clusters.items():
        ...     print(f"Cluster {cluster_id}: {len(cluster_upirs)} UPIRs")

    References:
    - KMeans: Minimizes within-cluster sum of squares
    - Feature normalization improves clustering quality
    """
    if not upirs:
        logger.warning("Empty UPIR list provided for clustering")
        return {}

    if len(upirs) < self.n_clusters:
        logger.warning(
            f"Only {len(upirs)} UPIRs but {self.n_clusters} clusters requested. "
            f"Using {len(upirs)} clusters instead."
        )
        self.n_clusters = len(upirs)
        self.kmeans = KMeans(n_clusters=self.n_clusters, random_state=42, n_init=10)

    # Extract features from all UPIRs
    feature_matrix = np.array([self.extract_features(upir) for upir in upirs])

    # Normalize features
    feature_matrix_normalized = self.scaler.fit_transform(feature_matrix)

    # Cluster
    labels = self.kmeans.fit_predict(feature_matrix_normalized)

    # Group UPIRs by cluster
    clusters = {}
    for cluster_id in range(self.n_clusters):
        clusters[cluster_id] = []

    for upir, label in zip(upirs, labels):
        clusters[label].append(upir)

    logger.info(
        f"Clustered {len(upirs)} UPIRs into {self.n_clusters} clusters. "
        f"Sizes: {[len(clusters[i]) for i in range(self.n_clusters)]}"
    )

    return clusters

extract_pattern(cluster, cluster_id)

Extract common pattern from a cluster of similar architectures.

Analyzes UPIRs in the cluster to identify common structure and creates a reusable pattern template.

Parameters:

Name Type Description Default
cluster List[UPIR]

List of UPIRs in the cluster

required
cluster_id int

Cluster identifier

required

Returns:

Type Description
Pattern

Pattern extracted from the cluster

Example

extractor = PatternExtractor() cluster_upirs = [...] # UPIRs from same cluster pattern = extractor.extract_pattern(cluster_upirs, cluster_id=0) print(pattern.name)

References: - Mode for categorical features - Average for numerical features - Template parameterization

Source code in upir/patterns/extractor.py
def extract_pattern(self, cluster: List[UPIR], cluster_id: int) -> Pattern:
    """
    Extract common pattern from a cluster of similar architectures.

    Analyzes UPIRs in the cluster to identify common structure and
    creates a reusable pattern template.

    Args:
        cluster: List of UPIRs in the cluster
        cluster_id: Cluster identifier

    Returns:
        Pattern extracted from the cluster

    Example:
        >>> extractor = PatternExtractor()
        >>> cluster_upirs = [...]  # UPIRs from same cluster
        >>> pattern = extractor.extract_pattern(cluster_upirs, cluster_id=0)
        >>> print(pattern.name)

    References:
    - Mode for categorical features
    - Average for numerical features
    - Template parameterization
    """
    if not cluster:
        # Empty cluster - return empty pattern
        return Pattern(
            id=f"pattern-{cluster_id}",
            name=f"Pattern {cluster_id}",
            description="Empty pattern",
            template={}
        )

    # Collect statistics from cluster
    component_types = Counter()
    connection_counts = []
    component_counts = []
    has_specifications = 0
    performance_metrics = []

    for upir in cluster:
        if upir.architecture:
            arch = upir.architecture

            # Count components by type
            for comp in arch.components:
                if isinstance(comp, dict):
                    comp_type = comp.get("type", "unknown")
                    component_types[comp_type] += 1

            component_counts.append(len(arch.components))
            connection_counts.append(len(arch.connections))

        if upir.specification:
            has_specifications += 1

    # Identify most common component types
    top_types = component_types.most_common(5)

    # Create template structure
    template_components = []
    for comp_type, count in top_types:
        template_components.append({
            "type": comp_type,
            "count": count / len(cluster),  # Average count per UPIR
            "properties": {}
        })

    # Average metrics
    avg_components = np.mean(component_counts) if component_counts else 0
    avg_connections = np.mean(connection_counts) if connection_counts else 0

    # Compute pattern centroid for matching
    feature_matrix = np.array([self.extract_features(upir) for upir in cluster])
    centroid = np.mean(feature_matrix, axis=0).tolist()

    # Create pattern name (heuristic from top component type)
    if top_types:
        top_type = top_types[0][0]
        pattern_name = f"{top_type.replace('_', ' ').title()} Pattern"
    else:
        pattern_name = f"Pattern {cluster_id}"

    # Create description
    description = (
        f"Architectural pattern with {avg_components:.1f} components "
        f"and {avg_connections:.1f} connections on average. "
        f"Common component types: {', '.join(t for t, _ in top_types[:3])}. "
        f"Specification coverage: {has_specifications}/{len(cluster)}."
    )

    # Create pattern
    pattern = Pattern(
        id=f"pattern-{cluster_id}",
        name=pattern_name,
        description=description,
        template={
            "components": template_components,
            "parameters": {
                "avg_component_count": avg_components,
                "avg_connection_count": avg_connections,
            },
            "centroid": centroid,
        },
        instances=[upir.id for upir in cluster],
        success_rate=has_specifications / len(cluster) if cluster else 0.0,
        average_performance={}
    )

    logger.debug(
        f"Extracted pattern: {pattern.name} from {len(cluster)} UPIRs"
    )

    return pattern

discover_patterns(upirs)

Discover architectural patterns from UPIR instances.

Main entry point for pattern extraction. Performs clustering and pattern extraction in one pipeline.

Parameters:

Name Type Description Default
upirs List[UPIR]

List of UPIR instances to analyze

required

Returns:

Type Description
List[Pattern]

List of discovered patterns

Example

extractor = PatternExtractor(n_clusters=5) upirs = [...] # Collection of UPIR instances patterns = extractor.discover_patterns(upirs) print(f"Discovered {len(patterns)} patterns") for pattern in patterns: ... print(f" - {pattern.name}: {len(pattern.instances)} instances")

References: - TD Commons: Pattern-based architecture optimization - Unsupervised learning for pattern discovery

Source code in upir/patterns/extractor.py
def discover_patterns(self, upirs: List[UPIR]) -> List[Pattern]:
    """
    Discover architectural patterns from UPIR instances.

    Main entry point for pattern extraction. Performs clustering and
    pattern extraction in one pipeline.

    Args:
        upirs: List of UPIR instances to analyze

    Returns:
        List of discovered patterns

    Example:
        >>> extractor = PatternExtractor(n_clusters=5)
        >>> upirs = [...]  # Collection of UPIR instances
        >>> patterns = extractor.discover_patterns(upirs)
        >>> print(f"Discovered {len(patterns)} patterns")
        >>> for pattern in patterns:
        ...     print(f"  - {pattern.name}: {len(pattern.instances)} instances")

    References:
    - TD Commons: Pattern-based architecture optimization
    - Unsupervised learning for pattern discovery
    """
    if not upirs:
        logger.warning("No UPIRs provided for pattern discovery")
        return []

    logger.info(f"Starting pattern discovery for {len(upirs)} UPIRs")

    # Cluster architectures
    clusters = self.cluster_architectures(upirs)

    # Extract pattern from each cluster
    patterns = []
    for cluster_id, cluster_upirs in clusters.items():
        if cluster_upirs:  # Skip empty clusters
            pattern = self.extract_pattern(cluster_upirs, cluster_id)
            patterns.append(pattern)

    # Sort patterns by instance count (descending)
    patterns.sort(key=lambda p: len(p.instances), reverse=True)

    logger.info(
        f"Discovered {len(patterns)} patterns. "
        f"Top pattern: {patterns[0].name} with {len(patterns[0].instances)} instances"
        if patterns else "No patterns discovered"
    )

    return patterns

classify_upir(upir)

Classify a UPIR into an existing cluster/pattern.

Uses the trained KMeans model to predict which cluster a new UPIR belongs to.

Parameters:

Name Type Description Default
upir UPIR

UPIR to classify

required

Returns:

Type Description
int

Cluster ID (0 to n_clusters-1)

Example

extractor = PatternExtractor() extractor.discover_patterns(training_upirs) new_upir = UPIR(...) cluster_id = extractor.classify_upir(new_upir)

Source code in upir/patterns/extractor.py
def classify_upir(self, upir: UPIR) -> int:
    """
    Classify a UPIR into an existing cluster/pattern.

    Uses the trained KMeans model to predict which cluster a new UPIR
    belongs to.

    Args:
        upir: UPIR to classify

    Returns:
        Cluster ID (0 to n_clusters-1)

    Example:
        >>> extractor = PatternExtractor()
        >>> extractor.discover_patterns(training_upirs)
        >>> new_upir = UPIR(...)
        >>> cluster_id = extractor.classify_upir(new_upir)
    """
    features = self.extract_features(upir)
    features_normalized = self.scaler.transform(features.reshape(1, -1))
    cluster_id = self.kmeans.predict(features_normalized)[0]
    return cluster_id

__str__()

String representation.

Source code in upir/patterns/extractor.py
def __str__(self) -> str:
    """String representation."""
    return f"PatternExtractor(n_clusters={self.n_clusters})"

__repr__()

Developer-friendly representation.

Source code in upir/patterns/extractor.py
def __repr__(self) -> str:
    """Developer-friendly representation."""
    return (
        f"PatternExtractor(n_clusters={self.n_clusters}, "
        f"feature_dim={self.feature_dim})"
    )

Usage Example

from upir import UPIR
from upir.patterns.extractor import PatternExtractor

# Create extractor
extractor = PatternExtractor(feature_dim=32)

# Extract pattern from UPIR
pattern = extractor.extract(upir)

print(f"Pattern: {pattern.name}")
print(f"Success rate: {pattern.success_rate:.2%}")

See Also