requests library.
An official Python SDK may be available in the future. For now, use
requests or httpx for HTTP requests.Installation
Copy
pip install requests
Setup
Python
Copy
import requests
import os
# Load API key from environment
API_KEY = os.environ.get("RELAY_API_KEY")
if not API_KEY:
raise ValueError("RELAY_API_KEY environment variable not set")
BASE_URL = "https://api.relayai.dev"
# Common headers
headers = {
"X-API-Key": API_KEY,
"Content-Type": "application/json"
}
Helper class
For convenience, create a simple client class:Python
Copy
import requests
from typing import Optional, Dict, Any, List
class RelayClient:
def __init__(self, api_key: str, base_url: str = "https://api.relayai.dev"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": api_key,
"Content-Type": "application/json"
})
def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
url = f"{self.base_url}{path}"
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
if response.status_code == 204:
return {}
return response.json()
# Datasets
def list_datasets(self) -> List[Dict]:
return self._request("GET", "/api/v1/datasets")
def create_dataset(self, name: str, artifact_types: List[Dict], description: str = None) -> Dict:
data = {"name": name, "artifact_types": artifact_types}
if description:
data["description"] = description
return self._request("POST", "/api/v1/datasets", json=data)
def get_dataset(self, dataset_id: str) -> Dict:
return self._request("GET", f"/api/v1/datasets/{dataset_id}")
def delete_dataset(self, dataset_id: str) -> None:
self._request("DELETE", f"/api/v1/datasets/{dataset_id}")
# Audio
def get_upload_url(self, dataset_id: str, filename: str, content_type: str, file_size: int) -> Dict:
return self._request(
"POST",
f"/api/v1/datasets/{dataset_id}/audio/upload-url",
json={"filename": filename, "content_type": content_type, "file_size": file_size}
)
def confirm_upload(self, dataset_id: str, audio_id: str) -> Dict:
return self._request(
"POST",
f"/api/v1/datasets/{dataset_id}/audio/confirm",
json={"audio_id": audio_id}
)
def list_audio(self, dataset_id: str, status: str = None) -> Dict:
params = {}
if status:
params["status"] = status
return self._request("GET", f"/api/v1/datasets/{dataset_id}/audio", params=params)
# Annotations
def create_annotation_set(self, dataset_id: str) -> Dict:
return self._request("POST", f"/api/v1/datasets/{dataset_id}/annotation-sets")
def create_annotations_bulk(self, dataset_id: str, annotation_set_id: str, annotations: List[Dict]) -> Dict:
return self._request(
"POST",
f"/api/v1/datasets/{dataset_id}/annotation-sets/{annotation_set_id}/annotations/bulk",
json={"annotations": annotations}
)
def publish_annotation_set(self, dataset_id: str, annotation_set_id: str) -> Dict:
return self._request(
"POST",
f"/api/v1/datasets/{dataset_id}/annotation-sets/{annotation_set_id}/publish"
)
# Training
def create_training_job(self, dataset_id: str, annotation_set_id: str, config: Dict) -> Dict:
return self._request(
"POST",
"/api/v1/training-jobs",
json={"dataset_id": dataset_id, "annotation_set_id": annotation_set_id, "config": config}
)
def get_training_job(self, job_id: str) -> Dict:
return self._request("GET", f"/api/v1/training-jobs/{job_id}")
def list_models(self, is_active: bool = None) -> Dict:
params = {}
if is_active is not None:
params["is_active"] = is_active
return self._request("GET", "/api/v1/models", params=params)
# Inference
def create_inference_job(self, model_id: str, config: Dict = None) -> Dict:
data = {"model_id": model_id}
if config:
data["config"] = config
return self._request("POST", "/api/v1/inference-jobs", json=data)
def get_inference_job(self, job_id: str) -> Dict:
return self._request("GET", f"/api/v1/inference-jobs/{job_id}")
def get_inference_upload_url(self, job_id: str, filename: str, content_type: str, file_size: int) -> Dict:
return self._request(
"POST",
f"/api/v1/inference-jobs/{job_id}/files/upload-url",
json={"filename": filename, "content_type": content_type, "file_size_bytes": file_size}
)
def confirm_inference_upload(self, job_id: str, file_id: str) -> Dict:
return self._request(
"POST",
f"/api/v1/inference-jobs/{job_id}/files/confirm",
json={"file_id": file_id}
)
Usage examples
Create a dataset
Python
Copy
client = RelayClient(os.environ["RELAY_API_KEY"])
dataset = client.create_dataset(
name="TTS Glitch Detection",
artifact_types=[
{"name": "glitch", "description": "Audio pop or click"},
{"name": "long_pause", "description": "Silence > 500ms"}
]
)
print(f"Created dataset: {dataset['id']}")
Upload audio
Python
Copy
import os
def upload_audio(client, dataset_id: str, file_path: str) -> str:
"""Upload an audio file and return its ID."""
filename = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
# Get content type
ext = os.path.splitext(filename)[1].lower()
content_types = {".wav": "audio/wav", ".mp3": "audio/mpeg", ".flac": "audio/flac"}
content_type = content_types.get(ext, "audio/wav")
# Get upload URL
upload_info = client.get_upload_url(dataset_id, filename, content_type, file_size)
# Upload to S3
with open(file_path, "rb") as f:
response = requests.post(
upload_info["upload_url"],
data=upload_info["fields"],
files={"file": f}
)
response.raise_for_status()
# Confirm upload
client.confirm_upload(dataset_id, upload_info["audio_id"])
return upload_info["audio_id"]
audio_id = upload_audio(client, dataset["id"], "sample.wav")
print(f"Uploaded: {audio_id}")
Add annotations and train
Python
Copy
import time
# Create annotation set
annotation_set = client.create_annotation_set(dataset["id"])
# Add annotations
client.create_annotations_bulk(
dataset["id"],
annotation_set["id"],
[
{"audio_file_id": audio_id, "artifact_type": "glitch", "start_ms": 1200, "end_ms": 1450},
{"audio_file_id": audio_id, "artifact_type": "long_pause", "start_ms": 3500, "end_ms": 4200}
]
)
# Publish
client.publish_annotation_set(dataset["id"], annotation_set["id"])
# Train
job = client.create_training_job(
dataset["id"],
annotation_set["id"],
{"artifact_types": ["glitch", "long_pause"], "epochs": 20}
)
# Wait for completion
while True:
job = client.get_training_job(job["id"])
print(f"Status: {job['status']} - {job['progress_percent']}%")
if job["status"] in ["completed", "failed"]:
break
time.sleep(30)
Run inference
Python
Copy
# Get model
models = client.list_models(is_active=True)
model_id = models["items"][0]["id"]
# Create inference job
job = client.create_inference_job(model_id, {"threshold": 0.5})
# Upload audio for inference
upload_info = client.get_inference_upload_url(
job["id"], "test.wav", "audio/wav", os.path.getsize("test.wav")
)
with open("test.wav", "rb") as f:
requests.post(
upload_info["upload_url"],
data=upload_info["upload_fields"],
files={"file": f}
)
client.confirm_inference_upload(job["id"], upload_info["file_id"])
# Wait for results
while True:
job = client.get_inference_job(job["id"])
if job["processed_files"] == job["total_files"]:
break
time.sleep(2)
# Print detections
for file in job["files"]:
print(f"{file['original_filename']}:")
for d in file["detections"]:
print(f" {d['artifact_type']}: {d['start_ms']}-{d['end_ms']}ms ({d['confidence']:.2f})")
Error handling
Python
Copy
from requests.exceptions import HTTPError
try:
dataset = client.get_dataset("invalid-id")
except HTTPError as e:
if e.response.status_code == 404:
print("Dataset not found")
elif e.response.status_code == 401:
print("Invalid API key")
else:
print(f"Error: {e.response.json()}")
Async with httpx
For async applications, usehttpx:
Python
Copy
import httpx
import asyncio
async def main():
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.relayai.dev/api/v1/datasets",
headers={"X-API-Key": os.environ["RELAY_API_KEY"]}
)
datasets = response.json()
print(f"Found {len(datasets)} datasets")
asyncio.run(main())
