Skip to main content
This guide covers how to use the Relay API with Python using the requests library.
An official Python SDK may be available in the future. For now, use requests or httpx for HTTP requests.

Installation

pip install requests

Setup

Python
import requests
import os

# Load API key from environment
API_KEY = os.environ.get("RELAY_API_KEY")
if not API_KEY:
    raise ValueError("RELAY_API_KEY environment variable not set")

BASE_URL = "https://api.relayai.dev"

# Common headers
headers = {
    "X-API-Key": API_KEY,
    "Content-Type": "application/json"
}

Helper class

For convenience, create a simple client class:
Python
import requests
from typing import Optional, Dict, Any, List

class RelayClient:
    def __init__(self, api_key: str, base_url: str = "https://api.relayai.dev"):
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            "X-API-Key": api_key,
            "Content-Type": "application/json"
        })

    def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
        url = f"{self.base_url}{path}"
        response = self.session.request(method, url, **kwargs)
        response.raise_for_status()
        if response.status_code == 204:
            return {}
        return response.json()

    # Datasets
    def list_datasets(self) -> List[Dict]:
        return self._request("GET", "/api/v1/datasets")

    def create_dataset(self, name: str, artifact_types: List[Dict], description: str = None) -> Dict:
        data = {"name": name, "artifact_types": artifact_types}
        if description:
            data["description"] = description
        return self._request("POST", "/api/v1/datasets", json=data)

    def get_dataset(self, dataset_id: str) -> Dict:
        return self._request("GET", f"/api/v1/datasets/{dataset_id}")

    def delete_dataset(self, dataset_id: str) -> None:
        self._request("DELETE", f"/api/v1/datasets/{dataset_id}")

    # Audio
    def get_upload_url(self, dataset_id: str, filename: str, content_type: str, file_size: int) -> Dict:
        return self._request(
            "POST",
            f"/api/v1/datasets/{dataset_id}/audio/upload-url",
            json={"filename": filename, "content_type": content_type, "file_size": file_size}
        )

    def confirm_upload(self, dataset_id: str, audio_id: str) -> Dict:
        return self._request(
            "POST",
            f"/api/v1/datasets/{dataset_id}/audio/confirm",
            json={"audio_id": audio_id}
        )

    def list_audio(self, dataset_id: str, status: str = None) -> Dict:
        params = {}
        if status:
            params["status"] = status
        return self._request("GET", f"/api/v1/datasets/{dataset_id}/audio", params=params)

    # Annotations
    def create_annotation_set(self, dataset_id: str) -> Dict:
        return self._request("POST", f"/api/v1/datasets/{dataset_id}/annotation-sets")

    def create_annotations_bulk(self, dataset_id: str, annotation_set_id: str, annotations: List[Dict]) -> Dict:
        return self._request(
            "POST",
            f"/api/v1/datasets/{dataset_id}/annotation-sets/{annotation_set_id}/annotations/bulk",
            json={"annotations": annotations}
        )

    def publish_annotation_set(self, dataset_id: str, annotation_set_id: str) -> Dict:
        return self._request(
            "POST",
            f"/api/v1/datasets/{dataset_id}/annotation-sets/{annotation_set_id}/publish"
        )

    # Training
    def create_training_job(self, dataset_id: str, annotation_set_id: str, config: Dict) -> Dict:
        return self._request(
            "POST",
            "/api/v1/training-jobs",
            json={"dataset_id": dataset_id, "annotation_set_id": annotation_set_id, "config": config}
        )

    def get_training_job(self, job_id: str) -> Dict:
        return self._request("GET", f"/api/v1/training-jobs/{job_id}")

    def list_models(self, is_active: bool = None) -> Dict:
        params = {}
        if is_active is not None:
            params["is_active"] = is_active
        return self._request("GET", "/api/v1/models", params=params)

    # Inference
    def create_inference_job(self, model_id: str, config: Dict = None) -> Dict:
        data = {"model_id": model_id}
        if config:
            data["config"] = config
        return self._request("POST", "/api/v1/inference-jobs", json=data)

    def get_inference_job(self, job_id: str) -> Dict:
        return self._request("GET", f"/api/v1/inference-jobs/{job_id}")

    def get_inference_upload_url(self, job_id: str, filename: str, content_type: str, file_size: int) -> Dict:
        return self._request(
            "POST",
            f"/api/v1/inference-jobs/{job_id}/files/upload-url",
            json={"filename": filename, "content_type": content_type, "file_size_bytes": file_size}
        )

    def confirm_inference_upload(self, job_id: str, file_id: str) -> Dict:
        return self._request(
            "POST",
            f"/api/v1/inference-jobs/{job_id}/files/confirm",
            json={"file_id": file_id}
        )

Usage examples

Create a dataset

Python
client = RelayClient(os.environ["RELAY_API_KEY"])

dataset = client.create_dataset(
    name="TTS Glitch Detection",
    artifact_types=[
        {"name": "glitch", "description": "Audio pop or click"},
        {"name": "long_pause", "description": "Silence > 500ms"}
    ]
)
print(f"Created dataset: {dataset['id']}")

Upload audio

Python
import os

def upload_audio(client, dataset_id: str, file_path: str) -> str:
    """Upload an audio file and return its ID."""
    filename = os.path.basename(file_path)
    file_size = os.path.getsize(file_path)

    # Get content type
    ext = os.path.splitext(filename)[1].lower()
    content_types = {".wav": "audio/wav", ".mp3": "audio/mpeg", ".flac": "audio/flac"}
    content_type = content_types.get(ext, "audio/wav")

    # Get upload URL
    upload_info = client.get_upload_url(dataset_id, filename, content_type, file_size)

    # Upload to S3
    with open(file_path, "rb") as f:
        response = requests.post(
            upload_info["upload_url"],
            data=upload_info["fields"],
            files={"file": f}
        )
        response.raise_for_status()

    # Confirm upload
    client.confirm_upload(dataset_id, upload_info["audio_id"])

    return upload_info["audio_id"]

audio_id = upload_audio(client, dataset["id"], "sample.wav")
print(f"Uploaded: {audio_id}")

Add annotations and train

Python
import time

# Create annotation set
annotation_set = client.create_annotation_set(dataset["id"])

# Add annotations
client.create_annotations_bulk(
    dataset["id"],
    annotation_set["id"],
    [
        {"audio_file_id": audio_id, "artifact_type": "glitch", "start_ms": 1200, "end_ms": 1450},
        {"audio_file_id": audio_id, "artifact_type": "long_pause", "start_ms": 3500, "end_ms": 4200}
    ]
)

# Publish
client.publish_annotation_set(dataset["id"], annotation_set["id"])

# Train
job = client.create_training_job(
    dataset["id"],
    annotation_set["id"],
    {"artifact_types": ["glitch", "long_pause"], "epochs": 20}
)

# Wait for completion
while True:
    job = client.get_training_job(job["id"])
    print(f"Status: {job['status']} - {job['progress_percent']}%")
    if job["status"] in ["completed", "failed"]:
        break
    time.sleep(30)

Run inference

Python
# Get model
models = client.list_models(is_active=True)
model_id = models["items"][0]["id"]

# Create inference job
job = client.create_inference_job(model_id, {"threshold": 0.5})

# Upload audio for inference
upload_info = client.get_inference_upload_url(
    job["id"], "test.wav", "audio/wav", os.path.getsize("test.wav")
)

with open("test.wav", "rb") as f:
    requests.post(
        upload_info["upload_url"],
        data=upload_info["upload_fields"],
        files={"file": f}
    )

client.confirm_inference_upload(job["id"], upload_info["file_id"])

# Wait for results
while True:
    job = client.get_inference_job(job["id"])
    if job["processed_files"] == job["total_files"]:
        break
    time.sleep(2)

# Print detections
for file in job["files"]:
    print(f"{file['original_filename']}:")
    for d in file["detections"]:
        print(f"  {d['artifact_type']}: {d['start_ms']}-{d['end_ms']}ms ({d['confidence']:.2f})")

Error handling

Python
from requests.exceptions import HTTPError

try:
    dataset = client.get_dataset("invalid-id")
except HTTPError as e:
    if e.response.status_code == 404:
        print("Dataset not found")
    elif e.response.status_code == 401:
        print("Invalid API key")
    else:
        print(f"Error: {e.response.json()}")

Async with httpx

For async applications, use httpx:
Python
import httpx
import asyncio

async def main():
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "https://api.relayai.dev/api/v1/datasets",
            headers={"X-API-Key": os.environ["RELAY_API_KEY"]}
        )
        datasets = response.json()
        print(f"Found {len(datasets)} datasets")

asyncio.run(main())