Skip to content

Commit bfbb255

Browse files
authored
Create metrics.py
1 parent 9e9c1ee commit bfbb255

File tree

1 file changed

+245
-0
lines changed

1 file changed

+245
-0
lines changed

core/utils/metrics.py

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
"""
2+
Unified Metrics System - Multi-dimensional Telemetry Collection & Exposition
3+
"""
4+
5+
from __future__ import annotations
6+
import asyncio
7+
import time
8+
from contextlib import asynccontextmanager
9+
from functools import wraps
10+
from typing import (
11+
Any,
12+
Awaitable,
13+
Callable,
14+
Dict,
15+
List,
16+
Optional,
17+
Tuple,
18+
TypeVar,
19+
Union
20+
)
21+
from collections import defaultdict
22+
from datetime import datetime
23+
24+
# Third-party
25+
from prometheus_client import (
26+
Counter,
27+
Gauge,
28+
Histogram,
29+
Summary,
30+
REGISTRY,
31+
start_http_server,
32+
generate_latest,
33+
CollectorRegistry,
34+
push_to_gateway
35+
)
36+
from prometheus_client.metrics import MetricWrapperBase
37+
from opentelemetry import metrics
38+
from opentelemetry.sdk.metrics import MeterProvider
39+
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
40+
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
41+
42+
# Type variables
43+
F = TypeVar('F', bound=Callable[..., Any])
44+
AsyncF = TypeVar('AsyncF', bound=Callable[..., Awaitable[Any]])
45+
46+
class MetricConfig:
47+
"""Configuration for metric initialization"""
48+
49+
def __init__(
50+
self,
51+
name: str,
52+
description: str,
53+
labels: List[str] = None,
54+
buckets: Tuple[float, ...] = None,
55+
namespace: str = "aelion",
56+
subsystem: str = "core"
57+
):
58+
self.name = name
59+
self.description = description
60+
self.labels = labels or []
61+
self.buckets = buckets or Histogram.DEFAULT_BUCKETS
62+
self.namespace = namespace
63+
self.subsystem = subsystem
64+
65+
class TelemetryManager:
66+
"""Core metrics registry with multi-backend support"""
67+
68+
_instance = None
69+
_lock = asyncio.Lock()
70+
71+
def __new__(cls):
72+
if cls._instance is None:
73+
cls._instance = super().__new__(cls)
74+
cls._instance._init_metrics()
75+
return cls._instance
76+
77+
def _init_self):
78+
"""Initialize OpenTelemetry SDK if enabled"""
79+
if self.otel_enabled:
80+
self.otel_exporter = OTLPMetricExporter(endpoint=self.otel_endpoint)
81+
self.otel_reader = PeriodicExportingMetricReader(
82+
exporter=self.otel_exporter,
83+
export_interval_millis=5000
84+
)
85+
self.otel_provider = MeterProvider(metric_readers=[self.otel_reader])
86+
metrics.set_meter_provider(self.otel_provider)
87+
self.otel_meter = metrics.get_meter("aelion.ai")
88+
89+
def create_metric(self, config: MetricConfig) -> MetricWrapperBase:
90+
"""Factory method for metric creation with conflict detection"""
91+
metric_key = f"{config.namespace}_{config.subsystem}_{config.name}"
92+
93+
if metric_key in self._metrics:
94+
raise ValueError(f"Metric {metric_key} already registered")
95+
96+
if config.name.endswith("_total"):
97+
metric = Counter(
98+
config.name,
99+
config.description,
100+
config.labels,
101+
namespace=config.namespace,
102+
subsystem=config.subsystem
103+
)
104+
elif config.name.endswith("_seconds"):
105+
metric = Histogram(
106+
config.name,
107+
config.description,
108+
config.labels,
109+
buckets=config.buckets,
110+
namespace=config.namespace,
111+
subsystem=config.subsystem
112+
)
113+
else:
114+
metric = Gauge(
115+
config.name,
116+
config.description,
117+
config.labels,
118+
namespace=config.namespace,
119+
subsystem=config.subsystem
120+
)
121+
122+
self._metrics[metric_key] = metric
123+
return metric
124+
125+
@staticmethod
126+
def http_handler():
127+
"""Generate Prometheus metrics endpoint for FastAPI/Starlette"""
128+
async def handler(request):
129+
return generate_latest(REGISTRY)
130+
return handler
131+
132+
def push_gateway_job(self, job_name: str):
133+
"""Configure periodic push to Prometheus PushGateway"""
134+
push_to_gateway(
135+
self.push_gateway,
136+
job=job_name,
137+
registry=REGISTRY,
138+
timeout=30
139+
)
140+
141+
@asynccontextmanager
142+
async def latency_metric(self, metric_name: str, **labels):
143+
"""Async context manager for latency measurement"""
144+
start_time = time.monotonic()
145+
try:
146+
yield
147+
finally:
148+
latency = time.monotonic() - start_time
149+
self._metrics[metric_name].labels(**labels).observe(latency)
150+
151+
def track_concurrency(self, metric_name: str):
152+
"""Decorator for tracking concurrent executions"""
153+
def decorator(func: AsyncF) -> AsyncF:
154+
@wraps(func)
155+
async def wrapper(*args, **kwargs):
156+
concurrency_gauge = self._metrics[f"{metric_name}_concurrent"]
157+
concurrency_gauge.inc()
158+
try:
159+
return await func(*args, **kwargs)
160+
finally:
161+
concurrency_gauge.dec()
162+
return wrapper
163+
return decorator
164+
165+
def error_counter(self, metric_name: str):
166+
"""Decorator for counting exceptions"""
167+
def decorator(func: F) -> F:
168+
@wraps(func)
169+
async def async_wrapper(*args, **kwargs):
170+
try:
171+
return await func(*args, **kwargs)
172+
except Exception as e:
173+
self._metrics[metric_name].labels(
174+
error_type=e.__class__.__name__
175+
).inc()
176+
raise
177+
@wraps(func)
178+
def sync_wrapper(*args, **kwargs):
179+
try:
180+
return func(*args, **kwargs)
181+
except Exception as e:
182+
self._metrics[metric_name].labels(
183+
error_type=e.__class__.__name__
184+
).inc()
185+
raise
186+
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
187+
return decorator
188+
189+
def reset_metrics(self):
190+
"""Clear all metric data (for testing only)"""
191+
for metric in self._metrics.values():
192+
metric.clear()
193+
194+
# Predefined core metrics
195+
CORE_METRICS = [
196+
MetricConfig(
197+
name="agent_actions_total",
198+
description="Total agent actions executed",
199+
labels=["agent_type", "action_type"]
200+
),
201+
MetricConfig(
202+
name="task_duration_seconds",
203+
description="Task processing latency distribution",
204+
labels=["task_type", "priority"],
205+
buckets=(0.01, 0.1, 0.5, 1, 5, 10, 30)
206+
),
207+
MetricConfig(
208+
name="resource_usage_bytes",
209+
description="Memory/CPU resource consumption",
210+
labels=["resource_type", "node"]
211+
),
212+
MetricConfig(
213+
name="api_errors_total",
214+
description="API endpoint errors",
215+
labels=["endpoint", "status_code"]
216+
)
217+
]
218+
219+
# Initialize core metrics
220+
manager = TelemetryManager()
221+
for metric_config in CORE_METRICS:
222+
manager.create_metric(metric_config)
223+
224+
# Example usage in FastAPI
225+
from fastapi import FastAPI
226+
227+
app = FastAPI()
228+
app.add_route("/metrics", manager.http_handler())
229+
230+
@app.get("/health")
231+
@manager.error_counter("api_errors_total")
232+
async def health_check():
233+
with manager.latency_metric("http_request_duration_seconds", endpoint="/health"):
234+
return {"status": "ok"}
235+
236+
# Example usage in async tasks
237+
@manager.track_concurrency("background_tasks")
238+
@manager.error_counter("task_errors_total")
239+
async def process_task(task_data: dict):
240+
async with manager.latency_metric("task_processing_time", task_type="data_ingest"):
241+
# Processing logic
242+
pass
243+
244+
# Push metrics to gateway (cron job)
245+
manager.push_gateway_job("aelion-ai-prod")

0 commit comments

Comments
 (0)