requests library.
An official Python SDK may be available in the future. For now, use
requests or httpx for HTTP requests.Installation
pip install requests
Setup
Python
import requests
import os
# Load API key from environment
API_KEY = os.environ.get("RELAY_API_KEY")
if not API_KEY:
raise ValueError("RELAY_API_KEY environment variable not set")
BASE_URL = "https://api.relayai.dev"
# Common headers
headers = {
"X-API-Key": API_KEY,
"Content-Type": "application/json"
}
Helper class
For convenience, create a simple client class:Python
import requests
from typing import Optional, Dict, Any, List
class RelayClient:
def __init__(self, api_key: str, base_url: str = "https://api.relayai.dev"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": api_key,
"Content-Type": "application/json"
})
def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
url = f"{self.base_url}{path}"
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
if response.status_code == 204:
return {}
return response.json()
# Datasets
def list_datasets(self) -> List[Dict]:
return self._request("GET", "/api/v1/datasets")
def create_dataset(self, name: str, artifact_types: List[Dict], description: str = None) -> Dict:
data = {"name": name, "artifact_types": artifact_types}
if description:
data["description"] = description
return self._request("POST", "/api/v1/datasets", json=data)
def get_dataset(self, dataset_id: str) -> Dict:
return self._request("GET", f"/api/v1/datasets/{dataset_id}")
def delete_dataset(self, dataset_id: str) -> None:
self._request("DELETE", f"/api/v1/datasets/{dataset_id}")
# Audio
def get_upload_url(self, dataset_id: str, filename: str, content_type: str, file_size: int) -> Dict:
return self._request(
"POST",
f"/api/v1/datasets/{dataset_id}/audio/upload-url",
json={"filename": filename, "content_type": content_type, "file_size": file_size}
)
def confirm_upload(self, dataset_id: str, audio_id: str) -> Dict:
return self._request(
"POST",
f"/api/v1/datasets/{dataset_id}/audio/confirm",
json={"audio_id": audio_id}
)
def list_audio(self, dataset_id: str, status: str = None) -> Dict:
params = {}
if status:
params["status"] = status
return self._request("GET", f"/api/v1/datasets/{dataset_id}/audio", params=params)
# Annotations
def create_annotation_set(self, dataset_id: str) -> Dict:
return self._request("POST", f"/api/v1/datasets/{dataset_id}/annotation-sets")
def create_annotations_bulk(self, dataset_id: str, annotation_set_id: str, annotations: List[Dict]) -> Dict:
return self._request(
"POST",
f"/api/v1/datasets/{dataset_id}/annotation-sets/{annotation_set_id}/annotations/bulk",
json={"annotations": annotations}
)
def publish_annotation_set(self, dataset_id: str, annotation_set_id: str) -> Dict:
return self._request(
"POST",
f"/api/v1/datasets/{dataset_id}/annotation-sets/{annotation_set_id}/publish"
)
# Training
def create_training_job(self, dataset_id: str, annotation_set_id: str, config: Dict) -> Dict:
return self._request(
"POST",
"/api/v1/training-jobs",
json={"dataset_id": dataset_id, "annotation_set_id": annotation_set_id, "config": config}
)
def get_training_job(self, job_id: str) -> Dict:
return self._request("GET", f"/api/v1/training-jobs/{job_id}")
def list_models(self, is_active: bool = None) -> Dict:
params = {}
if is_active is not None:
params["is_active"] = is_active
return self._request("GET", "/api/v1/models", params=params)
# Inference
def create_inference_job(self, model_id: str, config: Dict = None) -> Dict:
data = {"model_id": model_id}
if config:
data["config"] = config
return self._request("POST", "/api/v1/inference-jobs", json=data)
def get_inference_job(self, job_id: str) -> Dict:
return self._request("GET", f"/api/v1/inference-jobs/{job_id}")
def get_inference_upload_url(self, job_id: str, filename: str, content_type: str, file_size: int) -> Dict:
return self._request(
"POST",
f"/api/v1/inference-jobs/{job_id}/files/upload-url",
json={"filename": filename, "content_type": content_type, "file_size_bytes": file_size}
)
def confirm_inference_upload(self, job_id: str, file_id: str) -> Dict:
return self._request(
"POST",
f"/api/v1/inference-jobs/{job_id}/files/confirm",
json={"file_id": file_id}
)
Usage examples
Create a dataset
Python
client = RelayClient(os.environ["RELAY_API_KEY"])
dataset = client.create_dataset(
name="TTS Glitch Detection",
artifact_types=[
{"name": "glitch", "description": "Audio pop or click"},
{"name": "long_pause", "description": "Silence > 500ms"}
]
)
print(f"Created dataset: {dataset['id']}")
Upload audio
Python
import os
def upload_audio(client, dataset_id: str, file_path: str) -> str:
"""Upload an audio file and return its ID."""
filename = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
# Get content type
ext = os.path.splitext(filename)[1].lower()
content_types = {".wav": "audio/wav", ".mp3": "audio/mpeg", ".flac": "audio/flac"}
content_type = content_types.get(ext, "audio/wav")
# Get upload URL
upload_info = client.get_upload_url(dataset_id, filename, content_type, file_size)
# Upload to S3
with open(file_path, "rb") as f:
response = requests.post(
upload_info["upload_url"],
data=upload_info["fields"],
files={"file": f}
)
response.raise_for_status()
# Confirm upload
client.confirm_upload(dataset_id, upload_info["audio_id"])
return upload_info["audio_id"]
audio_id = upload_audio(client, dataset["id"], "sample.wav")
print(f"Uploaded: {audio_id}")
Add annotations and train
Python
import time
# Create annotation set
annotation_set = client.create_annotation_set(dataset["id"])
# Add annotations
client.create_annotations_bulk(
dataset["id"],
annotation_set["id"],
[
{"audio_file_id": audio_id, "artifact_type": "glitch", "start_ms": 1200, "end_ms": 1450},
{"audio_file_id": audio_id, "artifact_type": "long_pause", "start_ms": 3500, "end_ms": 4200}
]
)
# Publish
client.publish_annotation_set(dataset["id"], annotation_set["id"])
# Train
job = client.create_training_job(
dataset["id"],
annotation_set["id"],
{"artifact_types": ["glitch", "long_pause"], "epochs": 20}
)
# Wait for completion
while True:
job = client.get_training_job(job["id"])
print(f"Status: {job['status']} - {job['progress_percent']}%")
if job["status"] in ["completed", "failed"]:
break
time.sleep(30)
Run inference
Python
# Get model
models = client.list_models(is_active=True)
model_id = models["items"][0]["id"]
# Create inference job
job = client.create_inference_job(model_id, {"threshold": 0.5})
# Upload audio for inference
upload_info = client.get_inference_upload_url(
job["id"], "test.wav", "audio/wav", os.path.getsize("test.wav")
)
with open("test.wav", "rb") as f:
requests.post(
upload_info["upload_url"],
data=upload_info["upload_fields"],
files={"file": f}
)
client.confirm_inference_upload(job["id"], upload_info["file_id"])
# Wait for results
while True:
job = client.get_inference_job(job["id"])
if job["processed_files"] == job["total_files"]:
break
time.sleep(2)
# Print detections
for file in job["files"]:
print(f"{file['original_filename']}:")
for d in file["detections"]:
print(f" {d['artifact_type']}: {d['start_ms']}-{d['end_ms']}ms ({d['confidence']:.2f})")
Error handling
Python
from requests.exceptions import HTTPError
try:
dataset = client.get_dataset("invalid-id")
except HTTPError as e:
if e.response.status_code == 404:
print("Dataset not found")
elif e.response.status_code == 401:
print("Invalid API key")
else:
print(f"Error: {e.response.json()}")
Async with httpx
For async applications, usehttpx:
Python
import httpx
import asyncio
async def main():
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.relayai.dev/api/v1/datasets",
headers={"X-API-Key": os.environ["RELAY_API_KEY"]}
)
datasets = response.json()
print(f"Found {len(datasets)} datasets")
asyncio.run(main())
