Pipeline Architecture
Custom pipelines let you orchestrate complex evaluation workflows that go beyond simple API calls. Common patterns include:Batch Processing
Process large datasets with parallel execution and checkpointing.
CI/CD Integration
Run evaluations as part of your model deployment pipeline.
Custom Evaluators
Implement domain-specific evaluation logic with custom code.
Orchestration
Use Airflow, Prefect, or Dagster to manage complex workflows.
Batch Processing Pipeline
Process large datasets efficiently with parallel execution:batch_pipeline.py
Copy
Ask AI
from rubric import Rubric
from rubric.pipeline import BatchPipeline, ParallelExecutor
import pandas as pd
client = Rubric(api_key="gr_live_xxxxxxxx")
# Load your data
df = pd.read_csv("patient_calls.csv")
# Create a batch pipeline
pipeline = BatchPipeline(
client=client,
project="patient-triage",
dataset="ds_batch_jan2024"
)
# Define data transformation
def transform_row(row):
return {
"input": {
"transcript": row["transcript"],
"call_duration": row["duration_seconds"]
},
"ai_output": {
"triage_level": row["predicted_triage"],
"symptoms": row["extracted_symptoms"].split(",")
},
"expected": {
"triage_level": row["actual_triage"]
},
"metadata": {
"call_id": row["call_id"],
"agent_id": row["agent_id"]
}
}
# Process in parallel batches
results = pipeline.process(
data=df.to_dict("records"),
transform=transform_row,
# Execution settings
batch_size=100,
max_workers=4,
# Checkpointing for resume on failure
checkpoint_dir="./checkpoints",
# Progress callback
on_progress=lambda p: print(f"Progress: {p.completed}/{p.total}")
)
print(f"Processed {results.total_samples} samples")
print(f"Failed: {results.failed_samples}")
# Run evaluation on the dataset
evaluation = client.evaluations.create(
name="Batch Evaluation - Jan 2024",
project="patient-triage",
dataset="ds_batch_jan2024",
evaluators=["triage_accuracy", "safety_score"]
)
# Wait for completion
evaluation.wait_for_completion(timeout=3600)
print(f"Evaluation complete: {evaluation.summary}")
CI/CD Integration
Run evaluations as part of your model deployment pipeline to catch regressions before production.GitHub Actions
.github/workflows/model-eval.yml
Copy
Ask AI
name: Model Evaluation
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install rubric
- name: Run Evaluation
env:
GRADED_API_KEY: ${{ secrets.GRADED_API_KEY }}
run: |
python scripts/run_evaluation.py
- name: Check Results
env:
GRADED_API_KEY: ${{ secrets.GRADED_API_KEY }}
run: |
python scripts/check_results.py --min-triage-accuracy 0.80 --min-safety-score 0.90
Evaluation Script
scripts/run_evaluation.py
Copy
Ask AI
#!/usr/bin/env python3
import os
import sys
from rubric import Rubric
client = Rubric(api_key=os.environ["GRADED_API_KEY"])
# Get current git commit
import subprocess
commit = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode().strip()[:8]
# Create evaluation with commit metadata
evaluation = client.evaluations.create(
name=f"CI Evaluation - {commit}",
project="patient-triage",
dataset="ds_test_set", # Golden test set
evaluators=["triage_accuracy", "safety_score", "hallucination_detection"],
metadata={
"commit": commit,
"branch": os.environ.get("GITHUB_REF_NAME", "unknown"),
"triggered_by": "github_actions"
}
)
print(f"Created evaluation: {evaluation.id}")
# Wait for completion
evaluation.wait_for_completion(timeout=1800)
# Save results for next step
with open("evaluation_id.txt", "w") as f:
f.write(evaluation.id)
Quality Gate Script
scripts/check_results.py
Copy
Ask AI
#!/usr/bin/env python3
import argparse
import os
import sys
from rubric import Rubric
parser = argparse.ArgumentParser()
parser.add_argument("--min-triage-accuracy", type=float, required=True)
parser.add_argument("--min-safety-score", type=float, required=True)
args = parser.parse_args()
client = Rubric(api_key=os.environ["GRADED_API_KEY"])
# Load evaluation ID from previous step
with open("evaluation_id.txt") as f:
eval_id = f.read().strip()
evaluation = client.evaluations.get(eval_id)
# Check quality gates
triage_accuracy = evaluation.summary["triage_accuracy"]["mean"]
safety_score = evaluation.summary["safety_score"]["mean"]
print(f"Triage Accuracy: {triage_accuracy:.2%} (min: {args.min_triage_accuracy:.2%})")
print(f"Safety Score: {safety_score:.2%} (min: {args.min_safety_score:.2%})")
failed = False
if triage_accuracy < args.min_triage_accuracy:
print(f"FAILED: Triage accuracy below threshold")
failed = True
else:
print(f"PASSED: Triage accuracy")
if safety_score < args.min_safety_score:
print(f"FAILED: Safety score below threshold")
failed = True
else:
print(f"PASSED: Safety score")
# Compare to baseline (previous production version)
baseline = client.evaluations.get_baseline(project="patient-triage")
if baseline:
baseline_triage = baseline.summary["triage_accuracy"]["mean"]
regression = baseline_triage - triage_accuracy
if regression > 0.05: # More than 5% regression
print(f"FAILED: {regression:.1%} regression vs baseline")
failed = True
else:
print(f"PASSED: No significant regression")
if failed:
sys.exit(1)
else:
print("\nAll quality gates passed!")
sys.exit(0)
Custom Evaluators
Implement domain-specific evaluation logic that runs alongside built-in evaluators:custom_evaluator.py
Copy
Ask AI
from rubric.evaluators import CustomEvaluator, EvaluationResult
class ChestPainProtocolEvaluator(CustomEvaluator):
"""
Evaluates whether chest pain cases follow the HEART score protocol.
"""
name = "chest_pain_protocol"
description = "Checks HEART score protocol adherence for chest pain triage"
# Required questions for HEART score
REQUIRED_QUESTIONS = [
"onset", # When did the pain start?
"character", # What does the pain feel like?
"radiation", # Does it radiate anywhere?
"associated", # Any associated symptoms?
"risk_factors", # History, diabetes, smoking?
"prior_cardiac" # Previous cardiac events?
]
def evaluate(self, sample) -> EvaluationResult:
transcript = sample.input.get("transcript", "")
ai_questions = sample.ai_output.get("questions_asked", [])
# Check which required questions were asked
asked = set()
for question_type in self.REQUIRED_QUESTIONS:
if self._question_was_asked(transcript, question_type):
asked.add(question_type)
coverage = len(asked) / len(self.REQUIRED_QUESTIONS)
missing = set(self.REQUIRED_QUESTIONS) - asked
# Determine if triage was appropriate based on symptoms
symptoms = sample.ai_output.get("extracted_symptoms", [])
triage_level = sample.ai_output.get("triage_level")
appropriate_triage = self._check_triage_appropriateness(
symptoms, triage_level
)
# Calculate final score
score = (coverage * 0.6) + (1.0 if appropriate_triage else 0.0) * 0.4
return EvaluationResult(
score=score,
reasoning=self._generate_reasoning(coverage, missing, appropriate_triage),
metadata={
"protocol_coverage": coverage,
"missing_questions": list(missing),
"appropriate_triage": appropriate_triage
}
)
# Register the custom evaluator
client.evaluators.register(ChestPainProtocolEvaluator)
# Use in evaluations
evaluation = client.evaluations.create(
project="patient-triage",
dataset="ds_chest_pain_cases",
evaluators=[
"triage_accuracy",
"chest_pain_protocol" # Your custom evaluator
]
)
Orchestration with Airflow
Use Apache Airflow to manage complex evaluation workflows:dags/rubric_evaluation_dag.py
Copy
Ask AI
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from rubric import Rubric
default_args = {
"owner": "ml-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["[email protected]"],
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"rubric_daily_evaluation",
default_args=default_args,
description="Daily AI model evaluation pipeline",
schedule_interval="0 6 * * *", # 6 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
)
def fetch_new_samples(**context):
"""Fetch samples from production logs."""
from your_data_pipeline import fetch_yesterdays_calls
samples = fetch_yesterdays_calls()
client = Rubric()
dataset = client.datasets.create(
name=f"Production Calls - {context['ds']}",
project="patient-triage"
)
client.samples.batch_create(
dataset=dataset.id,
samples=samples
)
return dataset.id
def run_evaluation(**context):
"""Run evaluation on the dataset."""
dataset_id = context["task_instance"].xcom_pull(task_ids="fetch_samples")
client = Rubric()
evaluation = client.evaluations.create(
name=f"Daily Eval - {context['ds']}",
project="patient-triage",
dataset=dataset_id,
evaluators=[
"triage_accuracy",
"safety_score",
"guideline_adherence",
"chest_pain_protocol"
]
)
evaluation.wait_for_completion(timeout=3600)
return evaluation.id
def check_quality_gates(**context):
"""Check if evaluation meets quality thresholds."""
eval_id = context["task_instance"].xcom_pull(task_ids="run_evaluation")
client = Rubric()
evaluation = client.evaluations.get(eval_id)
thresholds = {
"triage_accuracy": 0.80,
"safety_score": 0.90,
"guideline_adherence": 0.75
}
failures = []
for metric, threshold in thresholds.items():
actual = evaluation.summary.get(metric, {}).get("mean", 0)
if actual < threshold:
failures.append(f"{metric}: {actual:.2%} < {threshold:.2%}")
if failures:
raise ValueError(f"Quality gates failed: {', '.join(failures)}")
return "All quality gates passed"
fetch_task = PythonOperator(
task_id="fetch_samples",
python_callable=fetch_new_samples,
dag=dag,
)
eval_task = PythonOperator(
task_id="run_evaluation",
python_callable=run_evaluation,
dag=dag,
)
gate_task = PythonOperator(
task_id="check_quality_gates",
python_callable=check_quality_gates,
dag=dag,
)
fetch_task >> eval_task >> gate_task
Best Practices
| Practice | Details |
|---|---|
| Use Checkpointing | Save progress during batch processing to enable resume on failure |
| Implement Idempotency | Ensure pipelines can be safely re-run without creating duplicates |
| Monitor Pipeline Health | Track metrics like throughput, error rates, and latency |
| Version Your Pipelines | Track pipeline code alongside model versions for reproducibility |
