Skip to main content

Pipeline Architecture

Custom pipelines let you orchestrate complex evaluation workflows that go beyond simple API calls. Common patterns include:

Batch Processing

Process large datasets with parallel execution and checkpointing.

CI/CD Integration

Run evaluations as part of your model deployment pipeline.

Custom Evaluators

Implement domain-specific evaluation logic with custom code.

Orchestration

Use Airflow, Prefect, or Dagster to manage complex workflows.

Batch Processing Pipeline

Process large datasets efficiently with parallel execution:
batch_pipeline.py
from rubric import Rubric
from rubric.pipeline import BatchPipeline, ParallelExecutor
import pandas as pd

client = Rubric(api_key="gr_live_xxxxxxxx")

# Load your data
df = pd.read_csv("patient_calls.csv")

# Create a batch pipeline
pipeline = BatchPipeline(
    client=client,
    project="patient-triage",
    dataset="ds_batch_jan2024"
)

# Define data transformation
def transform_row(row):
    return {
        "input": {
            "transcript": row["transcript"],
            "call_duration": row["duration_seconds"]
        },
        "ai_output": {
            "triage_level": row["predicted_triage"],
            "symptoms": row["extracted_symptoms"].split(",")
        },
        "expected": {
            "triage_level": row["actual_triage"]
        },
        "metadata": {
            "call_id": row["call_id"],
            "agent_id": row["agent_id"]
        }
    }

# Process in parallel batches
results = pipeline.process(
    data=df.to_dict("records"),
    transform=transform_row,

    # Execution settings
    batch_size=100,
    max_workers=4,

    # Checkpointing for resume on failure
    checkpoint_dir="./checkpoints",

    # Progress callback
    on_progress=lambda p: print(f"Progress: {p.completed}/{p.total}")
)

print(f"Processed {results.total_samples} samples")
print(f"Failed: {results.failed_samples}")

# Run evaluation on the dataset
evaluation = client.evaluations.create(
    name="Batch Evaluation - Jan 2024",
    project="patient-triage",
    dataset="ds_batch_jan2024",
    evaluators=["triage_accuracy", "safety_score"]
)

# Wait for completion
evaluation.wait_for_completion(timeout=3600)
print(f"Evaluation complete: {evaluation.summary}")

CI/CD Integration

Run evaluations as part of your model deployment pipeline to catch regressions before production.

GitHub Actions

.github/workflows/model-eval.yml
name: Model Evaluation

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  evaluate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install rubric

      - name: Run Evaluation
        env:
          GRADED_API_KEY: ${{ secrets.GRADED_API_KEY }}
        run: |
          python scripts/run_evaluation.py

      - name: Check Results
        env:
          GRADED_API_KEY: ${{ secrets.GRADED_API_KEY }}
        run: |
          python scripts/check_results.py --min-triage-accuracy 0.80 --min-safety-score 0.90

Evaluation Script

scripts/run_evaluation.py
#!/usr/bin/env python3
import os
import sys
from rubric import Rubric

client = Rubric(api_key=os.environ["GRADED_API_KEY"])

# Get current git commit
import subprocess
commit = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode().strip()[:8]

# Create evaluation with commit metadata
evaluation = client.evaluations.create(
    name=f"CI Evaluation - {commit}",
    project="patient-triage",
    dataset="ds_test_set",  # Golden test set
    evaluators=["triage_accuracy", "safety_score", "hallucination_detection"],
    metadata={
        "commit": commit,
        "branch": os.environ.get("GITHUB_REF_NAME", "unknown"),
        "triggered_by": "github_actions"
    }
)

print(f"Created evaluation: {evaluation.id}")

# Wait for completion
evaluation.wait_for_completion(timeout=1800)

# Save results for next step
with open("evaluation_id.txt", "w") as f:
    f.write(evaluation.id)

Quality Gate Script

scripts/check_results.py
#!/usr/bin/env python3
import argparse
import os
import sys
from rubric import Rubric

parser = argparse.ArgumentParser()
parser.add_argument("--min-triage-accuracy", type=float, required=True)
parser.add_argument("--min-safety-score", type=float, required=True)
args = parser.parse_args()

client = Rubric(api_key=os.environ["GRADED_API_KEY"])

# Load evaluation ID from previous step
with open("evaluation_id.txt") as f:
    eval_id = f.read().strip()

evaluation = client.evaluations.get(eval_id)

# Check quality gates
triage_accuracy = evaluation.summary["triage_accuracy"]["mean"]
safety_score = evaluation.summary["safety_score"]["mean"]

print(f"Triage Accuracy: {triage_accuracy:.2%} (min: {args.min_triage_accuracy:.2%})")
print(f"Safety Score: {safety_score:.2%} (min: {args.min_safety_score:.2%})")

failed = False

if triage_accuracy < args.min_triage_accuracy:
    print(f"FAILED: Triage accuracy below threshold")
    failed = True
else:
    print(f"PASSED: Triage accuracy")

if safety_score < args.min_safety_score:
    print(f"FAILED: Safety score below threshold")
    failed = True
else:
    print(f"PASSED: Safety score")

# Compare to baseline (previous production version)
baseline = client.evaluations.get_baseline(project="patient-triage")
if baseline:
    baseline_triage = baseline.summary["triage_accuracy"]["mean"]
    regression = baseline_triage - triage_accuracy

    if regression > 0.05:  # More than 5% regression
        print(f"FAILED: {regression:.1%} regression vs baseline")
        failed = True
    else:
        print(f"PASSED: No significant regression")

if failed:
    sys.exit(1)
else:
    print("\nAll quality gates passed!")
    sys.exit(0)

Custom Evaluators

Implement domain-specific evaluation logic that runs alongside built-in evaluators:
custom_evaluator.py
from rubric.evaluators import CustomEvaluator, EvaluationResult

class ChestPainProtocolEvaluator(CustomEvaluator):
    """
    Evaluates whether chest pain cases follow the HEART score protocol.
    """

    name = "chest_pain_protocol"
    description = "Checks HEART score protocol adherence for chest pain triage"

    # Required questions for HEART score
    REQUIRED_QUESTIONS = [
        "onset",           # When did the pain start?
        "character",       # What does the pain feel like?
        "radiation",       # Does it radiate anywhere?
        "associated",      # Any associated symptoms?
        "risk_factors",    # History, diabetes, smoking?
        "prior_cardiac"    # Previous cardiac events?
    ]

    def evaluate(self, sample) -> EvaluationResult:
        transcript = sample.input.get("transcript", "")
        ai_questions = sample.ai_output.get("questions_asked", [])

        # Check which required questions were asked
        asked = set()
        for question_type in self.REQUIRED_QUESTIONS:
            if self._question_was_asked(transcript, question_type):
                asked.add(question_type)

        coverage = len(asked) / len(self.REQUIRED_QUESTIONS)
        missing = set(self.REQUIRED_QUESTIONS) - asked

        # Determine if triage was appropriate based on symptoms
        symptoms = sample.ai_output.get("extracted_symptoms", [])
        triage_level = sample.ai_output.get("triage_level")

        appropriate_triage = self._check_triage_appropriateness(
            symptoms, triage_level
        )

        # Calculate final score
        score = (coverage * 0.6) + (1.0 if appropriate_triage else 0.0) * 0.4

        return EvaluationResult(
            score=score,
            reasoning=self._generate_reasoning(coverage, missing, appropriate_triage),
            metadata={
                "protocol_coverage": coverage,
                "missing_questions": list(missing),
                "appropriate_triage": appropriate_triage
            }
        )


# Register the custom evaluator
client.evaluators.register(ChestPainProtocolEvaluator)

# Use in evaluations
evaluation = client.evaluations.create(
    project="patient-triage",
    dataset="ds_chest_pain_cases",
    evaluators=[
        "triage_accuracy",
        "chest_pain_protocol"  # Your custom evaluator
    ]
)

Orchestration with Airflow

Use Apache Airflow to manage complex evaluation workflows:
dags/rubric_evaluation_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from rubric import Rubric

default_args = {
    "owner": "ml-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["[email protected]"],
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "rubric_daily_evaluation",
    default_args=default_args,
    description="Daily AI model evaluation pipeline",
    schedule_interval="0 6 * * *",  # 6 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False,
)

def fetch_new_samples(**context):
    """Fetch samples from production logs."""
    from your_data_pipeline import fetch_yesterdays_calls

    samples = fetch_yesterdays_calls()

    client = Rubric()
    dataset = client.datasets.create(
        name=f"Production Calls - {context['ds']}",
        project="patient-triage"
    )

    client.samples.batch_create(
        dataset=dataset.id,
        samples=samples
    )

    return dataset.id

def run_evaluation(**context):
    """Run evaluation on the dataset."""
    dataset_id = context["task_instance"].xcom_pull(task_ids="fetch_samples")

    client = Rubric()
    evaluation = client.evaluations.create(
        name=f"Daily Eval - {context['ds']}",
        project="patient-triage",
        dataset=dataset_id,
        evaluators=[
            "triage_accuracy",
            "safety_score",
            "guideline_adherence",
            "chest_pain_protocol"
        ]
    )

    evaluation.wait_for_completion(timeout=3600)
    return evaluation.id

def check_quality_gates(**context):
    """Check if evaluation meets quality thresholds."""
    eval_id = context["task_instance"].xcom_pull(task_ids="run_evaluation")

    client = Rubric()
    evaluation = client.evaluations.get(eval_id)

    thresholds = {
        "triage_accuracy": 0.80,
        "safety_score": 0.90,
        "guideline_adherence": 0.75
    }

    failures = []
    for metric, threshold in thresholds.items():
        actual = evaluation.summary.get(metric, {}).get("mean", 0)
        if actual < threshold:
            failures.append(f"{metric}: {actual:.2%} < {threshold:.2%}")

    if failures:
        raise ValueError(f"Quality gates failed: {', '.join(failures)}")

    return "All quality gates passed"

fetch_task = PythonOperator(
    task_id="fetch_samples",
    python_callable=fetch_new_samples,
    dag=dag,
)

eval_task = PythonOperator(
    task_id="run_evaluation",
    python_callable=run_evaluation,
    dag=dag,
)

gate_task = PythonOperator(
    task_id="check_quality_gates",
    python_callable=check_quality_gates,
    dag=dag,
)

fetch_task >> eval_task >> gate_task

Best Practices

PracticeDetails
Use CheckpointingSave progress during batch processing to enable resume on failure
Implement IdempotencyEnsure pipelines can be safely re-run without creating duplicates
Monitor Pipeline HealthTrack metrics like throughput, error rates, and latency
Version Your PipelinesTrack pipeline code alongside model versions for reproducibility