Source code for afterimage.evaluation.base

"""Core types and async composite evaluation."""

from __future__ import annotations

import asyncio
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Protocol

from ..types import ConversationWithContext


[docs] class EvaluationMetric(str, Enum): """Available evaluation metrics.""" COHERENCE = "coherence" GROUNDING = "grounding" RELEVANCE = "relevance" FACTUALITY = "factuality" HELPFULNESS = "helpfulness" SAFETY = "safety" FORMATTING = "formatting"
[docs] class AggregationMode(str, Enum): """How per-metric scores are combined into a single overall score.""" MEAN = "mean" """Unweighted arithmetic mean over all reported metrics.""" WEIGHTED_MEAN = "weighted_mean" """Weighted average using :attr:`CompositeEvaluator.metric_weights` (default weight 1.0).""" MIN = "min" """Minimum of all metric scores (strictest)."""
[docs] @dataclass class EvaluationResult: """Aggregated scores and feedback for one conversation.""" scores: Dict[EvaluationMetric, float] feedback: Dict[EvaluationMetric, str] overall_score: float needs_regeneration: bool regeneration_strategy: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) @property def final_score(self) -> float: """Alias for :attr:`overall_score` (monitoring and legacy call sites).""" return self.overall_score
class BaseEvaluator(Protocol): """Protocol for async conversation evaluators.""" async def aevaluate( self, conversation: ConversationWithContext ) -> EvaluationResult: """Score a single conversation.""" ...
[docs] class CompositeEvaluator: """Runs multiple async evaluators in parallel and aggregates scores. Weighted combination: for each sub-evaluator ``(E, w)``, metric ``m`` receives ``score_m * w`` (only metrics that ``E`` returns). If multiple evaluators emit the same metric, contributions are summed. The overall score is then computed from the combined per-metric map using :attr:`aggregation_mode`. * ``MEAN``: ``sum(combined_scores.values()) / len(combined_scores)`` * ``WEIGHTED_MEAN``: ``sum(s * metric_weights.get(m,1)) / sum(metric_weights.get(m,1) for m in combined)`` * ``MIN``: ``min(combined_scores.values())`` (or 0 if empty) """ def __init__( self, evaluators: List[tuple[BaseEvaluator, float]], min_acceptable_score: float = 0.6, aggregation_mode: AggregationMode = AggregationMode.MEAN, metric_weights: Optional[Dict[EvaluationMetric, float]] = None, ): self.evaluators = evaluators self.min_acceptable_score = min_acceptable_score self.aggregation_mode = aggregation_mode self.metric_weights = metric_weights or {}
[docs] async def aevaluate( self, conversation: ConversationWithContext ) -> EvaluationResult: async def _run( pair: tuple[BaseEvaluator, float], ) -> tuple[EvaluationResult, float]: ev, w = pair return await ev.aevaluate(conversation), w pairs = await asyncio.gather(*[_run(p) for p in self.evaluators]) combined_scores: dict[EvaluationMetric, float] = defaultdict(float) combined_feedback: dict[EvaluationMetric, list[str]] = defaultdict(list) for result, weight in pairs: for metric, score in result.scores.items(): combined_scores[metric] += score * weight for metric, fb in result.feedback.items(): combined_feedback[metric].append(fb) feedback_merged = { k: "; ".join(v) for k, v in combined_feedback.items() } overall = self._aggregate_overall(dict(combined_scores)) return EvaluationResult( scores=dict(combined_scores), feedback=feedback_merged, overall_score=overall, needs_regeneration=overall < self.min_acceptable_score, regeneration_strategy=self._determine_regeneration_strategy( dict(combined_scores) ), )
def _aggregate_overall(self, combined_scores: Dict[EvaluationMetric, float]) -> float: if not combined_scores: return 0.0 if self.aggregation_mode == AggregationMode.MIN: return min(combined_scores.values()) if self.aggregation_mode == AggregationMode.WEIGHTED_MEAN: num = 0.0 den = 0.0 for m, s in combined_scores.items(): w = self.metric_weights.get(m, 1.0) num += s * w den += w return num / den if den > 0 else 0.0 # MEAN return sum(combined_scores.values()) / len(combined_scores) def _determine_regeneration_strategy( self, scores: Dict[EvaluationMetric, float] ) -> str: worst_metric = min(scores.items(), key=lambda x: x[1]) strategies = { EvaluationMetric.COHERENCE: "improve_coherence", EvaluationMetric.GROUNDING: "improve_grounding", EvaluationMetric.RELEVANCE: "improve_relevance", EvaluationMetric.FACTUALITY: "verify_facts", EvaluationMetric.SAFETY: "ensure_safety", } return strategies.get(worst_metric[0], "general_improvement")