Skip to main content

PipelineServiceClient

Client for the Pipeline Service API. Manages pipelines, runs, progress tracking, checkpoints, and logs.

Overview

The PipelineServiceClient communicates with the pipeline orchestration backend for:

  • Creating and managing pipeline definitions
  • Creating and monitoring pipeline runs
  • Real-time progress reporting and streaming
  • Checkpoint management for fault tolerance
  • Log collection and retrieval
from synapse_sdk.clients.pipeline import PipelineServiceClient

client = PipelineServiceClient("http://localhost:8100")

Initialization

client = PipelineServiceClient(
base_url="http://localhost:8100", # Pipeline service URL
timeout=30.0, # Request timeout in seconds
)

Parameters:

ParameterTypeDefaultDescription
base_urlstr"http://localhost:8100"Pipeline service base URL
timeoutfloat30.0Request timeout in seconds

Context Manager

Use as context manager for automatic cleanup:

with PipelineServiceClient("http://localhost:8100") as client:
pipeline = client.create_pipeline(...)
run = client.create_run(pipeline["id"])

Pipeline Management

create_pipeline()

Create a new pipeline definition.

pipeline = client.create_pipeline(
name="YOLO Training Pipeline",
actions=[
{"name": "download", "entrypoint": "plugin.download.DownloadAction"},
{"name": "convert", "entrypoint": "plugin.convert.ConvertAction"},
{"name": "train", "entrypoint": "plugin.train.TrainAction"},
],
description="End-to-end YOLO training pipeline",
)

print(f"Created pipeline: {pipeline['id']}")

Parameters:

ParameterTypeRequiredDescription
namestrYesPipeline name
actionslist[dict]YesList of action definitions
descriptionstrNoPipeline description

get_pipeline()

Get pipeline details by ID.

pipeline = client.get_pipeline("pipeline-123")

list_pipelines()

List all pipelines with pagination.

pipelines = client.list_pipelines(skip=0, limit=100)
for p in pipelines:
print(f"{p['id']}: {p['name']}")

delete_pipeline()

Delete a pipeline.

client.delete_pipeline("pipeline-123")

Run Management

create_run()

Create a new run for a pipeline.

run = client.create_run(
pipeline_id="pipeline-123",
params={"dataset": 456, "epochs": 100},
work_dir="/workspace/run-001",
)

print(f"Run ID: {run['id']}")
print(f"Status: {run['status']}")

Parameters:

ParameterTypeRequiredDescription
pipeline_idstrYesPipeline to run
paramsdictNoInitial parameters
work_dirstrNoWorking directory path

get_run()

Get run details by ID.

run = client.get_run("run-456")
print(f"Status: {run['status']}")
print(f"Progress: {run.get('progress')}")

list_runs()

List runs with optional status filter.

# All runs
runs = client.list_runs()

# Filter by status
running = client.list_runs(status="running")
completed = client.list_runs(status="completed")

update_run()

Update run status or result.

# Mark as completed
client.update_run(
run_id="run-456",
status="completed",
result={"accuracy": 0.95, "model_path": "/models/best.pt"},
)

# Mark as failed
client.update_run(
run_id="run-456",
status="failed",
error="Out of memory during training",
)

delete_run()

Delete a run.

client.delete_run("run-456")

Progress Reporting

report_progress()

Report progress update for a run.

from synapse_sdk.plugins.models.logger import ActionProgress

# Basic progress update
client.report_progress(
run_id="run-456",
current_action="train",
current_action_index=2,
status="running",
)

# With detailed action progress
client.report_progress(
run_id="run-456",
current_action="train",
action_progress=ActionProgress(
name="train",
status="running",
progress=50,
total=100,
metrics={"loss": 0.25, "accuracy": 0.87},
),
)

Parameters:

ParameterTypeDescription
run_idstrRun identifier
current_actionstrName of current action
current_action_indexintIndex of current action
statusstrOverall run status
action_progressActionProgress | dictDetailed action progress
errorstrError message if any

get_progress()

Get current progress for a run.

from synapse_sdk.plugins.models.logger import PipelineProgress

progress: PipelineProgress = client.get_progress("run-456")

print(f"Status: {progress.status}")
print(f"Current action: {progress.current_action}")

for action in progress.actions:
print(f" {action.name}: {action.progress}/{action.total}")

stream_progress()

Stream progress updates via Server-Sent Events (SSE).

# Synchronous streaming
for progress in client.stream_progress("run-456"):
print(f"Status: {progress.status}")
print(f"Action: {progress.current_action}")

if progress.status in ("completed", "failed"):
break

stream_progress_async()

Async version for streaming progress.

async for progress in client.stream_progress_async("run-456"):
print(f"Status: {progress.status}")

if progress.status in ("completed", "failed"):
break

Checkpoint Management

create_checkpoint()

Create a checkpoint for fault tolerance.

checkpoint = client.create_checkpoint(
run_id="run-456",
action_name="train",
action_index=2,
status="completed",
params_snapshot={"epochs": 100, "batch_size": 16},
result={"best_accuracy": 0.95},
artifacts_path="/workspace/checkpoints/epoch_50",
)

get_checkpoints()

Get all checkpoints for a run.

checkpoints = client.get_checkpoints("run-456")
for cp in checkpoints:
print(f"{cp['action_name']}: {cp['status']}")

get_latest_checkpoint()

Get the most recent checkpoint.

latest = client.get_latest_checkpoint("run-456")
if latest:
print(f"Resume from: {latest['action_name']}")

get_checkpoint_by_action()

Get checkpoint for a specific action.

checkpoint = client.get_checkpoint_by_action("run-456", "train")
if checkpoint:
print(f"Train checkpoint: {checkpoint['artifacts_path']}")

Log Management

append_logs()

Append log entries to a run.

from synapse_sdk.plugins.models.logger import LogEntry, LogLevel

client.append_logs("run-456", [
LogEntry(message="Starting training", level=LogLevel.INFO, action_name="train"),
LogEntry(message="Loaded dataset", level=LogLevel.INFO, action_name="train"),
])

get_logs()

Get logs with optional filters.

# All logs
logs = client.get_logs("run-456")

# Filter by action
train_logs = client.get_logs("run-456", action_name="train")

# Filter by level
errors = client.get_logs("run-456", level="error")

# Logs since timestamp
from datetime import datetime, timedelta
recent = client.get_logs(
"run-456",
since=datetime.now() - timedelta(hours=1),
)

Health Check

if client.health_check():
print("Pipeline service is healthy")
else:
print("Pipeline service is unavailable")

Complete Example

from synapse_sdk.clients.pipeline import PipelineServiceClient
from synapse_sdk.plugins.models.logger import ActionProgress

with PipelineServiceClient("http://localhost:8100") as client:
# Create pipeline
pipeline = client.create_pipeline(
name="Training Pipeline",
actions=[
{"name": "download", "entrypoint": "plugin.download.DownloadAction"},
{"name": "train", "entrypoint": "plugin.train.TrainAction"},
],
)

# Start run
run = client.create_run(
pipeline["id"],
params={"dataset": 123, "epochs": 50},
)

# Monitor progress
for progress in client.stream_progress(run["id"], timeout=3600):
print(f"[{progress.status}] {progress.current_action}")

if progress.current_action:
for action in progress.actions:
if action.name == progress.current_action:
print(f" Progress: {action.progress}/{action.total}")

if progress.status in ("completed", "failed", "cancelled"):
break

# Get final result
final_run = client.get_run(run["id"])
if final_run["status"] == "completed":
print(f"Result: {final_run.get('result')}")
else:
print(f"Error: {final_run.get('error')}")