Skip to main content

Supported Platforms

Rubric exports evaluation data to major cloud data warehouses for advanced analytics, ML workflows, and business intelligence.
PlatformExport MethodBest For
SnowflakeDirect write, S3 stagingEnterprise analytics
Google BigQueryDirect write, GCS stagingGoogle Cloud environments
Amazon RedshiftDirect write, S3 stagingAWS environments
DatabricksDelta Lake, Unity CatalogML workflows
PostgreSQLDirect writeSelf-hosted analytics

Snowflake Integration

snowflake_integration.py
from rubric import Rubric

client = Rubric()

# Configure Snowflake export
snowflake = client.integrations.data_warehouse.configure(
    provider="snowflake",
    name="analytics_warehouse",

    # Connection details
    account="your-account.snowflakecomputing.com",
    warehouse="GRADED_WH",
    database="ANALYTICS",
    schema="GRADED_DATA",

    # Authentication
    auth_type="key_pair",  # or "password", "oauth"
    user="GRADED_SERVICE",
    private_key_path="/path/to/private_key.p8",

    # Export settings
    export_settings={
        "format": "parquet",
        "compression": "snappy",
        "staging": "s3://your-bucket/snowflake-staging/"
    }
)

# Set up scheduled export
client.integrations.data_warehouse.schedule_export(
    integration_id=snowflake.id,

    # What to export
    data_sources=[
        "evaluations",
        "samples",
        "metrics",
        "human_reviews"
    ],

    # Schedule
    schedule="0 2 * * *",  # Daily at 2 AM

    # Incremental export
    mode="incremental",
    watermark_column="updated_at"
)

Snowflake Schema

snowflake_schema.sql
-- Rubric evaluation data lands in these tables

-- Evaluations table
CREATE TABLE rubric_data.evaluations (
    evaluation_id VARCHAR PRIMARY KEY,
    project_id VARCHAR,
    name VARCHAR,
    dataset_id VARCHAR,
    model_version VARCHAR,
    status VARCHAR,
    created_at TIMESTAMP_NTZ,
    completed_at TIMESTAMP_NTZ,
    config VARIANT,
    summary VARIANT
);

-- Samples table
CREATE TABLE rubric_data.samples (
    sample_id VARCHAR PRIMARY KEY,
    evaluation_id VARCHAR,
    input VARIANT,
    ai_output VARIANT,
    expected_output VARIANT,
    metadata VARIANT,
    created_at TIMESTAMP_NTZ
);

-- Scores table
CREATE TABLE rubric_data.scores (
    score_id VARCHAR PRIMARY KEY,
    sample_id VARCHAR,
    evaluator_type VARCHAR,
    score FLOAT,
    reasoning TEXT,
    metadata VARIANT,
    created_at TIMESTAMP_NTZ
);

BigQuery Integration

bigquery_integration.py
from rubric import Rubric

client = Rubric()

# Configure BigQuery export
bigquery = client.integrations.data_warehouse.configure(
    provider="bigquery",
    name="gcp_analytics",

    # GCP settings
    project_id="your-gcp-project",
    dataset_id="rubric_analytics",
    location="US",

    # Authentication
    auth_type="service_account",
    credentials_path="/path/to/service-account.json",

    # Export settings
    export_settings={
        "write_disposition": "WRITE_APPEND",
        "partitioning": {
            "field": "created_at",
            "type": "DAY"
        },
        "clustering": ["project_id", "model_version"]
    }
)

# Export evaluation data to BigQuery
export_result = client.integrations.data_warehouse.export(
    integration_id=bigquery.id,

    evaluation_id="eval_abc123",

    # Tables to populate
    tables={
        "evaluations": "rubric_analytics.evaluations",
        "samples": "rubric_analytics.samples",
        "scores": "rubric_analytics.scores"
    }
)

print(f"Exported {export_result.rows_exported} rows")

Redshift Integration

redshift_integration.py
from rubric import Rubric

client = Rubric()

# Configure Redshift export
redshift = client.integrations.data_warehouse.configure(
    provider="redshift",
    name="aws_analytics",

    # Cluster details
    host="your-cluster.redshift.amazonaws.com",
    port=5439,
    database="analytics",
    schema="rubric",

    # Authentication
    auth_type="iam",  # or "password"
    iam_role="arn:aws:iam::123456789:role/RedshiftLoadRole",

    # Staging (required for bulk loads)
    staging_bucket="s3://your-bucket/redshift-staging/",

    # Export settings
    export_settings={
        "distribution_style": "KEY",
        "distribution_key": "evaluation_id",
        "sort_keys": ["created_at"]
    }
)

Databricks Integration

databricks_integration.py
from rubric import Rubric

client = Rubric()

# Configure Databricks export
databricks = client.integrations.data_warehouse.configure(
    provider="databricks",
    name="ml_platform",

    # Workspace details
    workspace_url="https://your-workspace.cloud.databricks.com",

    # Authentication
    auth_type="service_principal",
    client_id="your-client-id",
    client_secret="your-client-secret",

    # Unity Catalog settings
    catalog="ml_analytics",
    schema="rubric",

    # Delta Lake settings
    export_settings={
        "format": "delta",
        "mode": "merge",  # Upsert capability
        "merge_keys": ["sample_id"],
        "partition_by": ["date(created_at)"]
    }
)

# Export with Delta Lake merge
client.integrations.data_warehouse.export(
    integration_id=databricks.id,

    # Export all recent evaluations
    filters={
        "created_after": "2024-01-01"
    },

    # Target tables
    tables={
        "evaluations": "ml_analytics.rubric.evaluations",
        "samples": "ml_analytics.rubric.samples",
        "scores": "ml_analytics.rubric.scores"
    }
)

Real-Time Streaming

Stream evaluation data in real-time:
streaming_export.py
from rubric import Rubric

client = Rubric()

# Configure streaming export
stream = client.integrations.data_warehouse.configure_stream(
    integration_id="snowflake_prod",

    # Events to stream
    events=[
        "evaluation.completed",
        "sample.scored",
        "review.submitted"
    ],

    # Streaming settings
    streaming_config={
        "buffer_size": 100,
        "flush_interval_seconds": 30,
        "format": "json"
    },

    # Target
    target_table="ANALYTICS.GRADED.EVENTS_STREAM"
)

Best Practices

PracticeRationale
Use incremental exportsReduce data transfer and costs
Partition by dateImprove query performance
Cluster by common filtersOptimize for typical queries
Set up monitoringAlert on export failures
Use staging bucketsRequired for bulk loads
Schedule off-peakAvoid impacting production queries