"""Async metric evaluators (embeddings + LLM-as-judge)."""
from __future__ import annotations
import math
import time
from typing import Any, List, Optional
from pydantic import BaseModel, Field
from ..monitoring import GenerationMonitor
from ..providers import LLMProvider
from ..providers.embedding_providers import EmbeddingProvider
from ..types import ConversationWithContext, Role
from .base import EvaluationMetric, EvaluationResult
def _cosine_similarity(a: list[float], b: list[float]) -> float:
"""Cosine similarity for dense vectors (L2-normalized dot product)."""
if not a or not b or len(a) != len(b):
return 0.0
dot = sum(x * y for x, y in zip(a, b))
na = math.sqrt(sum(x * x for x in a))
nb = math.sqrt(sum(y * y for y in b))
if na == 0.0 or nb == 0.0:
return 0.0
return dot / (na * nb)
class LLMJudgeStructuredOutput(BaseModel):
"""Structured LLM judge response."""
scores: List[float] = Field(
...,
description="One score in [0, 1] per evaluated item, in order.",
)
feedback: str = ""
needs_improvement: bool = False
[docs]
class CoherenceEvaluator:
"""Question–answer semantic coherence via embedding cosine similarity."""
def __init__(
self,
embedding: EmbeddingProvider,
monitor: Optional[GenerationMonitor] = None,
coherence_threshold: float = 0.65,
):
self._embedding = embedding
self.monitor = monitor
self._threshold = coherence_threshold
[docs]
async def aevaluate(
self, conversation: ConversationWithContext
) -> EvaluationResult:
start = time.time()
try:
pairs: list[tuple[str, str]] = []
for i in range(0, len(conversation.conversations), 2):
if i + 1 < len(conversation.conversations):
pairs.append(
(
conversation.conversations[i].content,
conversation.conversations[i + 1].content,
)
)
if not pairs:
return self._result(
0.0, "No question-answer pairs found", True, start
)
texts: list[str] = []
for q, a in pairs:
texts.extend([q, a])
vecs = await self._embedding.embed(texts)
sims: list[float] = []
for i in range(0, len(vecs), 2):
sims.append(_cosine_similarity(vecs[i], vecs[i + 1]))
avg = sum(sims) / len(sims)
ok = avg >= self._threshold
fb = (
"Good question-answer coherence"
if ok
else "Low coherence between questions and answers"
)
return self._result(avg, fb, not ok, start)
except Exception:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=False,
evaluator_type=self.__class__.__name__,
scores={},
error="coherence evaluation failed",
error_type="Exception",
)
raise
def _result(
self, score: float, feedback: str, needs_regen: bool, start: float
) -> EvaluationResult:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=True,
evaluator_type=self.__class__.__name__,
scores={EvaluationMetric.COHERENCE: score},
)
return EvaluationResult(
scores={EvaluationMetric.COHERENCE: score},
feedback={EvaluationMetric.COHERENCE: feedback},
overall_score=score,
needs_regeneration=needs_regen,
)
[docs]
class GroundingEvaluator:
"""How well assistant answers align with the response context embedding."""
def __init__(
self,
embedding: EmbeddingProvider,
monitor: Optional[GenerationMonitor] = None,
grounding_threshold: float = 0.55,
):
self._embedding = embedding
self.monitor = monitor
self._threshold = grounding_threshold
[docs]
async def aevaluate(
self, conversation: ConversationWithContext
) -> EvaluationResult:
start = time.time()
try:
ctx = conversation.response_context
if not ctx:
return self._neutral("No context provided", start)
answers = [
t.content
for t in conversation.conversations
if t.role == Role.ASSISTANT
]
if not answers:
return self._bad("No answers found", start)
vecs = await self._embedding.embed([ctx] + answers)
ctx_vec, ans_vecs = vecs[0], vecs[1:]
sims = [_cosine_similarity(ctx_vec, av) for av in ans_vecs]
avg = sum(sims) / len(sims)
ok = avg >= self._threshold
fb = (
"Answers are well-grounded in context"
if ok
else "Answers show weak grounding in context"
)
return self._scored(avg, fb, not ok, start)
except Exception:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=False,
evaluator_type=self.__class__.__name__,
scores={},
error="grounding evaluation failed",
error_type="Exception",
)
raise
def _neutral(self, msg: str, start: float) -> EvaluationResult:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=True,
evaluator_type=self.__class__.__name__,
scores={EvaluationMetric.GROUNDING: 1.0},
)
return EvaluationResult(
scores={EvaluationMetric.GROUNDING: 1.0},
feedback={EvaluationMetric.GROUNDING: msg},
overall_score=1.0,
needs_regeneration=False,
)
def _bad(self, msg: str, start: float) -> EvaluationResult:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=True,
evaluator_type=self.__class__.__name__,
scores={EvaluationMetric.GROUNDING: 0.0},
)
return EvaluationResult(
scores={EvaluationMetric.GROUNDING: 0.0},
feedback={EvaluationMetric.GROUNDING: msg},
overall_score=0.0,
needs_regeneration=True,
)
def _scored(
self, score: float, feedback: str, needs_regen: bool, start: float
) -> EvaluationResult:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=True,
evaluator_type=self.__class__.__name__,
scores={EvaluationMetric.GROUNDING: score},
)
return EvaluationResult(
scores={EvaluationMetric.GROUNDING: score},
feedback={EvaluationMetric.GROUNDING: feedback},
overall_score=score,
needs_regeneration=needs_regen,
)
[docs]
class RelevanceEvaluator:
"""How relevant user questions are to the instruction context."""
def __init__(
self,
embedding: EmbeddingProvider,
monitor: Optional[GenerationMonitor] = None,
relevance_threshold: float = 0.55,
):
self._embedding = embedding
self.monitor = monitor
self._threshold = relevance_threshold
[docs]
async def aevaluate(
self, conversation: ConversationWithContext
) -> EvaluationResult:
start = time.time()
try:
ctx = conversation.instruction_context
if not ctx:
return self._neutral("No context provided", start)
questions = [
t.content
for t in conversation.conversations
if t.role == Role.USER
]
if not questions:
return self._bad("No questions found", start)
vecs = await self._embedding.embed([ctx] + questions)
ctx_vec, q_vecs = vecs[0], vecs[1:]
sims = [_cosine_similarity(ctx_vec, qv) for qv in q_vecs]
avg = sum(sims) / len(sims)
ok = avg >= self._threshold
fb = (
"Questions are relevant to context"
if ok
else "Questions show weak relevance to context"
)
return self._scored(avg, fb, not ok, start)
except Exception:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=False,
evaluator_type=self.__class__.__name__,
scores={},
error="relevance evaluation failed",
error_type="Exception",
)
raise
def _neutral(self, msg: str, start: float) -> EvaluationResult:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=True,
evaluator_type=self.__class__.__name__,
scores={EvaluationMetric.RELEVANCE: 1.0},
)
return EvaluationResult(
scores={EvaluationMetric.RELEVANCE: 1.0},
feedback={EvaluationMetric.RELEVANCE: msg},
overall_score=1.0,
needs_regeneration=False,
)
def _bad(self, msg: str, start: float) -> EvaluationResult:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=True,
evaluator_type=self.__class__.__name__,
scores={EvaluationMetric.RELEVANCE: 0.0},
)
return EvaluationResult(
scores={EvaluationMetric.RELEVANCE: 0.0},
feedback={EvaluationMetric.RELEVANCE: msg},
overall_score=0.0,
needs_regeneration=True,
)
def _scored(
self, score: float, feedback: str, needs_regen: bool, start: float
) -> EvaluationResult:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=True,
evaluator_type=self.__class__.__name__,
scores={EvaluationMetric.RELEVANCE: score},
)
return EvaluationResult(
scores={EvaluationMetric.RELEVANCE: score},
feedback={EvaluationMetric.RELEVANCE: feedback},
overall_score=score,
needs_regeneration=needs_regen,
)
class AsyncLLMBaseEvaluator:
"""Shared async LLM judge with structured output and monitoring."""
def __init__(
self,
llm: LLMProvider,
max_retries: int = 3,
monitor: Optional[GenerationMonitor] = None,
):
self.llm = llm
self.max_retries = max_retries
self.monitor = monitor
async def _aget_structured(
self, prompt: str
) -> tuple[LLMJudgeStructuredOutput, Any]:
last_err: Exception | None = None
for attempt in range(self.max_retries):
temp = max(0.1, 0.35 - attempt * 0.08)
try:
resp = await self.llm.agenerate_structured(
prompt,
LLMJudgeStructuredOutput,
temperature=temp,
)
parsed = resp.parsed
if parsed.scores and all(0.0 <= s <= 1.0 for s in parsed.scores):
return parsed, resp
except Exception as e:
last_err = e
continue
raise ValueError(
f"LLM judge failed after {self.max_retries} attempts: {last_err}"
)
[docs]
class FactualityEvaluator(AsyncLLMBaseEvaluator):
"""LLM rubric for factual accuracy vs context."""
[docs]
async def aevaluate(
self, conversation: ConversationWithContext
) -> EvaluationResult:
start = time.time()
try:
responses = [
t.content
for t in conversation.conversations
if t.role == Role.ASSISTANT
]
if not responses:
return self._finalize(
EvaluationResult(
scores={EvaluationMetric.FACTUALITY: 0.0},
feedback={
EvaluationMetric.FACTUALITY: "No responses to evaluate"
},
overall_score=0.0,
needs_regeneration=True,
),
start,
[],
)
separator_str = "\n----\n"
prompt = f"""Evaluate the factual accuracy of the following responses in relation to the provided context.
Rate each response on a scale of 0-1 and give concise feedback.
Context:
{conversation.response_context or conversation.instruction_context or ""}
Responses:
{separator_str.join(f"[{i + 1}] {r}" for i, r in enumerate(responses))}
Return scores (one float per response, 0-1), feedback (short summary), and needs_improvement (true if any response is unreliable)."""
try:
parsed, resp = await self._aget_structured(prompt)
avg = sum(parsed.scores) / len(parsed.scores)
needs = parsed.needs_improvement or avg < 0.55
result = EvaluationResult(
scores={EvaluationMetric.FACTUALITY: avg},
feedback={EvaluationMetric.FACTUALITY: parsed.feedback},
overall_score=avg,
needs_regeneration=needs,
)
return self._finalize(result, start, [resp])
except Exception:
result = EvaluationResult(
scores={EvaluationMetric.FACTUALITY: 0.5},
feedback={
EvaluationMetric.FACTUALITY: "Factuality evaluation inconclusive"
},
overall_score=0.5,
needs_regeneration=False,
)
return self._finalize(result, start, [])
except Exception:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=False,
evaluator_type=self.__class__.__name__,
scores={},
error="factuality evaluation failed",
error_type="Exception",
)
raise
def _finalize(
self,
result: EvaluationResult,
start: float,
responses: list[Any],
) -> EvaluationResult:
if self.monitor:
tok: dict[str, Any] = {}
if responses:
tok["prompt_token_count"] = sum(
r.prompt_token_count for r in responses
)
tok["completion_token_count"] = sum(
r.completion_token_count for r in responses
)
tok["total_token_count"] = sum(r.total_token_count for r in responses)
tok["model_name"] = responses[0].model_name
self.monitor.track_evaluation(
duration=time.time() - start,
success=True,
evaluator_type=self.__class__.__name__,
scores=result.scores,
**tok,
)
return result
[docs]
class HelpfulnessEvaluator(AsyncLLMBaseEvaluator):
"""LLM rubric for answer helpfulness."""
[docs]
async def aevaluate(
self, conversation: ConversationWithContext
) -> EvaluationResult:
start = time.time()
try:
pairs: list[tuple[str, str]] = []
for i in range(0, len(conversation.conversations), 2):
if i + 1 < len(conversation.conversations):
pairs.append(
(
conversation.conversations[i].content,
conversation.conversations[i + 1].content,
)
)
if not pairs:
return self._finalize(
EvaluationResult(
scores={EvaluationMetric.HELPFULNESS: 0.0},
feedback={
EvaluationMetric.HELPFULNESS: "No Q/A pairs to evaluate"
},
overall_score=0.0,
needs_regeneration=True,
),
start,
[],
)
separator_str = "\n----\n"
pairs = separator_str.join(f"[{i+1}] Q: {q}\nA: {a}" for i, (q, a) in enumerate(pairs))
prompt = f"""Evaluate how helpful each answer is for its question (0-1 each).
Context (reference):
{conversation.response_context or conversation.instruction_context or ""}
Pairs:
{pairs}
Return scores (one per pair), feedback, needs_improvement."""
try:
parsed, resp = await self._aget_structured(prompt)
avg = sum(parsed.scores) / len(parsed.scores)
needs = parsed.needs_improvement or avg < 0.55
result = EvaluationResult(
scores={EvaluationMetric.HELPFULNESS: avg},
feedback={EvaluationMetric.HELPFULNESS: parsed.feedback},
overall_score=avg,
needs_regeneration=needs,
)
return self._finalize(result, start, [resp])
except Exception as e:
result = EvaluationResult(
scores={EvaluationMetric.HELPFULNESS: 0.0},
feedback={
EvaluationMetric.HELPFULNESS: f"Helpfulness evaluation failed: {e}"
},
overall_score=0.0,
needs_regeneration=True,
)
return self._finalize(result, start, [])
except Exception:
if self.monitor:
self.monitor.track_evaluation(
duration=time.time() - start,
success=False,
evaluator_type=self.__class__.__name__,
scores={},
error="helpfulness evaluation failed",
error_type="Exception",
)
raise
def _finalize(
self,
result: EvaluationResult,
start: float,
responses: list[Any],
) -> EvaluationResult:
if self.monitor:
tok: dict[str, Any] = {}
if responses:
tok["prompt_token_count"] = sum(
r.prompt_token_count for r in responses
)
tok["completion_token_count"] = sum(
r.completion_token_count for r in responses
)
tok["total_token_count"] = sum(r.total_token_count for r in responses)
tok["model_name"] = responses[0].model_name
self.monitor.track_evaluation(
duration=time.time() - start,
success=True,
evaluator_type=self.__class__.__name__,
scores=result.scores,
**tok,
)
return result
class SafetyEvaluator:
"""Placeholder safety pass-through (always neutral-positive)."""
async def aevaluate(
self, conversation: ConversationWithContext
) -> EvaluationResult:
return EvaluationResult(
scores={EvaluationMetric.SAFETY: 1.0},
feedback={EvaluationMetric.SAFETY: "Safety check not implemented"},
overall_score=1.0,
needs_regeneration=False,
)