Monitoring & Observability
When generating thousands of conversations, you need visibility into the process. Is it working? How fast is it? Are errors occurring? Afterimage provides a robust, thread-safe Monitoring System to track these metrics in real-time, visualize them, and export them for analysis.
GenerationMonitor
The central component is the GenerationMonitor. It collects metrics from the generator relative to performance, health, and quality, and routes them to various handlers (files, logs, or custom dashboards).
Initialization
You can attach a monitor to any generator (ConversationGenerator, PersonaGenerator, etc.). The monitor uses background threads to process metrics without blocking the main generation loop.
from afterimage import ConversationGenerator, GenerationMonitor
# 1. Initialize Monitor
# This will save metrics to 'metrics.jsonl' and logs to 'afterimage.log' in the specified directory.
# If no log_dir is provided, it creates a timestamped folder in ./monitoring/
monitor = GenerationMonitor(
log_dir="./logs",
metrics_interval=60 # Check for alerts every 60 seconds
)
# 2. Attach to Generator
generator = ConversationGenerator(
...,
monitor=monitor
)
Metrics Tracked
The monitor automatically tracks a wide range of metrics:
Performance:
generation_time: Time taken to generate one conversation.prompt_token_count: Input tokens used.completion_token_count: Output tokens generated.total_token_count: Total token usage.conversation_length: Number of turns in the generated conversation.
Health:
success_rate: Binary tracking of successful generations (1.0) vs failures (0.0).error_rate: Binary tracking of errors.api_errors: Specific API failures.
Quality (if Evaluation is running):
evaluation_score_<type>: Scores from evaluators (e.g.,evaluation_score_coherence).evaluation_time: Time taken for evaluation steps.
Exporting Data
You can export your collected metrics to various formats for external analysis (e.g., in Jupyter Notebooks or Excel).
# Export to JSON
monitor.export_metrics("metrics_export.json", format="json")
# Export to CSV (creates separate files for each metric type)
monitor.export_metrics("metrics_export.csv", format="csv")
# Export to Excel (creates a multi-sheet workbook)
monitor.export_metrics("metrics_report.xlsx", format="excel")
# Export to Parquet (efficient binary format)
monitor.export_metrics("metrics.parquet", format="parquet")
You can also filter exports by a time window:
from datetime import timedelta
# Export only the last hour of data
monitor.export_metrics("last_hour.csv", format="csv", window=timedelta(hours=1))
Visualization
The GenerationMonitor has built-in plotting capabilities using matplotlib and seaborn. It can generate a suite of plots to help you understand your generation run.
# Generate and save all standard plots to the log directory
monitor.visualize_metrics()
# Or specify a custom directory
monitor.visualize_metrics(save_dir="./plots")
The standard visualizations include:
Success/Error Rate Over Time: Rolling averages of success and failure rates.
Generation Time Distribution: Histogram of latencies.
Token Usage Over Time: Trends for prompt, completion, and total tokens.
Evaluation Scores Over Time: Trends for quality metrics.
Evaluation Time Distribution: Histogram of evaluation latencies.
Alerts
The monitor includes an active alerting system that checks for anomalies every metrics_interval. Built-in alerts include:
Low Success Rate: Triggers if success rate drops below 80%.
High Generation Time: Triggers if average generation time exceeds 30s.
High Error Rate: Triggers if error rate exceeds 20%.
High Token Usage: Triggers if token usage spikes (Prompt > 4k, Completion > 4k, Total > 8k).
Short Conversations: Triggers if average conversation length is < 2 turns.
Custom Alert Handlers
You can define custom logic to respond to these alerts, such as sending a Slack notification or stopping the generation.
def stop_on_critical_error(alert):
if alert.level == "error":
print(f"CRITICAL ALERT: {alert.name} - {alert.message}")
# Logic to stop generation or notify team
monitor = GenerationMonitor(
log_dir="./logs",
alert_handlers=[stop_on_critical_error]
)
Internals & Extensibility (For Developers)
Threading Model
The GenerationMonitor uses a producer-consumer architecture to ensure monitoring does not impact generation performance.
Producers:
record_metric,log_info, etc., simply put items into a thread-safequeue.Queue.Consumers: Background worker threads (
_metric_worker,_log_worker) pull items from the queues and process them (writing to files, checking alerts, etc.).
Custom Handlers
By default, the monitor uses FileMetricHandler and FileLogHandler. You can implement your own handlers (e.g., to send metrics to Datadog, Prometheus, or WandB) by implementing the MetricHandler or LogHandler protocols.
from typing import Dict, Any
class WandBMetricHandler:
def handle_metric(self, metric_name: str, value: float, metadata: Dict[str, Any]) -> None:
import wandb
wandb.log({metric_name: value, **metadata})
monitor = GenerationMonitor(
metric_handlers=[WandBMetricHandler()]
)