Skip to main content

Ray Execution

Ray executors enable distributed execution of plugin actions on Ray clusters, providing horizontal scaling, resource management, and fault tolerance for production workloads.

Overview

The Synapse SDK provides four Ray-based executors, each optimized for different execution patterns:

At a Glance

ExecutorExecutionActor ReuseLog StreamingCheckpointBest For
RayActorExecutorSynchronousYesNoNoFast iteration, development
RayJobExecutorAsynchronousNoNoNoBatch processing, fire-and-forget
RayJobsApiExecutorAsynchronousNoYesNoLong jobs with log visibility
RayPipelineExecutorAsynchronousNoSSEYesMulti-action workflows

All executors inherit from BaseRayExecutor and share common configuration for runtime environments, package management, and credential injection.

Executor Selection Guide

Use this decision flow to choose the right executor:

Use Case Recommendations

Use CaseRecommended ExecutorReason
Development/testingRayActorExecutorFast startup, actor reuse
Single training jobRayJobsApiExecutorRuntime logs, resource tracking
Batch inference (multiple inputs)RayJobExecutorConcurrent submission
Full ML pipelineRayPipelineExecutorCheckpoints, progress streaming
Real-time inferenceRayActorExecutorPre-warmed workers

RayActorExecutor

Executes actions synchronously using a persistent Ray Actor. The actor maintains state across executions, enabling fast consecutive calls without re-initialization overhead.

Quick Start

from synapse_sdk.plugins.executors.ray import RayActorExecutor

executor = RayActorExecutor(
ray_address='auto',
working_dir='/path/to/plugin',
num_gpus=1,
)

# Execute action
result = executor.execute(TrainAction, {'epochs': 10})

# Reuse the same actor for subsequent executions (fast)
result2 = executor.execute(InferAction, {'batch_size': 32})

# Clean up when done
executor.shutdown()

Constructor Parameters

ParameterTypeDefaultDescription
envPluginEnvironment | dict | NoneNoneEnvironment config. Loads from os.environ if None
job_idstr | NoneNoneJob identifier for tracking
ray_addressstr'auto'Ray cluster address
runtime_envdict | NoneNoneRay runtime environment config
working_dirstr | Path | NoneNonePlugin working directory
requirements_filestr | Path | NoneNonePath to requirements.txt
package_managerPackageManager | 'pip' | 'uv''pip'Package manager to use
package_manager_optionslist[str] | NoneNoneAdditional package manager options
wheels_dirstr'wheels'Directory for .whl files (relative to working_dir)
num_cpusint | NoneNoneCPU cores to request
num_gpusint | NoneNoneGPUs to request
include_sdkboolFalseBundle local SDK (for development)

Methods

execute()

def execute(
self,
action_cls: type[BaseAction] | str,
params: dict[str, Any],
**kwargs: Any,
) -> Any:
"""Execute action using the Ray actor.

Args:
action_cls: BaseAction subclass or entrypoint string
(e.g., 'plugin.train.TrainAction').
params: Parameters dict to validate and pass.
**kwargs: Ignored (for protocol compatibility).

Returns:
Action result from execute().

Raises:
ValidationError: If params fail validation.
ExecutionError: If action execution fails.
"""

shutdown()

def shutdown(self) -> None:
"""Shutdown the actor and release resources."""

Actor Reuse Pattern

executor = RayActorExecutor(working_dir='/plugin')

# Same actor handles all executions - no re-initialization
result1 = executor.execute(PreprocessAction, {'input': 'data.csv'})
result2 = executor.execute(TrainAction, {'epochs': 100})
result3 = executor.execute(ExportAction, {'format': 'onnx'})

# Always shutdown when done
executor.shutdown()

Good to know: RayTaskExecutor is an alias for RayActorExecutor (backwards compatibility).

RayJobExecutor

Submits actions as asynchronous Ray tasks. Each submission creates a new actor instance. Best for batch processing where you submit multiple jobs and collect results later.

Quick Start

from synapse_sdk.plugins.executors.ray import RayJobExecutor

executor = RayJobExecutor(
ray_address='auto',
working_dir='/path/to/plugin',
)

# Submit job (non-blocking)
job_id = executor.submit(TrainAction, {'epochs': 100})

# Wait for completion
status = executor.wait(job_id, timeout_seconds=3600)

# Get result
result = executor.get_result(job_id)

Constructor Parameters

ParameterTypeDefaultDescription
envPluginEnvironment | dict | NoneNoneEnvironment config
ray_addressstr'auto'Ray cluster address
runtime_envdict | NoneNoneRay runtime environment
working_dirstr | Path | NoneNonePlugin working directory
requirements_filestr | Path | NoneNonePath to requirements.txt
package_managerPackageManager | 'pip' | 'uv''pip'Package manager
package_manager_optionslist[str] | NoneNonePackage manager options
wheels_dirstr'wheels'Wheel files directory
num_cpusint | NoneNoneCPU cores to request
num_gpusint | NoneNoneGPUs to request
include_sdkboolFalseBundle local SDK
job_idstr | NoneNoneDefault job ID

Methods

submit()

def submit(
self,
action_cls: type[BaseAction] | str,
params: dict[str, Any],
*,
job_id: str | None = None,
) -> str:
"""Submit action as a Ray task (non-blocking).

Args:
action_cls: BaseAction subclass or entrypoint string.
params: Parameters dict for the action.
job_id: Optional job identifier. Auto-generated if None.

Returns:
Job ID for tracking.
"""

get_status()

def get_status(self, job_id: str) -> str:
"""Get job status.

Returns:
'RUNNING', 'SUCCEEDED', 'FAILED', or 'UNKNOWN'.
"""

get_result()

def get_result(self, job_id: str, timeout: float | None = None) -> Any:
"""Get job result (blocks until complete).

Args:
job_id: Job ID from submit().
timeout: Optional timeout in seconds.

Returns:
Action result.

Raises:
ExecutionError: If job failed or not found.
"""

wait()

def wait(self, job_id: str, timeout_seconds: float = 300) -> str:
"""Wait for job to complete.

Returns:
Final job status ('SUCCEEDED' or raises exception).

Raises:
ExecutionError: If job fails or times out.
"""

Multiple Concurrent Jobs

executor = RayJobExecutor(working_dir='/plugin', num_gpus=1)

# Submit multiple jobs in parallel
job_ids = []
for dataset in [1, 2, 3, 4, 5]:
job_id = executor.submit(TrainAction, {'dataset': dataset})
job_ids.append(job_id)

# Collect results
results = {}
for job_id in job_ids:
executor.wait(job_id)
results[job_id] = executor.get_result(job_id)

Status Polling

import time

job_id = executor.submit(TrainAction, {'epochs': 100})

while True:
status = executor.get_status(job_id)
print(f'Status: {status}')
if status in ('SUCCEEDED', 'FAILED'):
break
time.sleep(5)

if status == 'SUCCEEDED':
result = executor.get_result(job_id)

RayJobsApiExecutor

Uses Ray's Jobs API (JobSubmissionClient) for job management. Provides access to runtime environment setup logs and real-time log streaming - essential for debugging dependency issues and monitoring long-running jobs.

Quick Start

from synapse_sdk.plugins.executors.ray import RayJobsApiExecutor

executor = RayJobsApiExecutor(
dashboard_address='http://localhost:8265',
working_dir='/path/to/plugin',
num_gpus=1,
)

# Submit job
job_id = executor.submit(TrainAction, {'epochs': 100})

# Stream logs in real-time (includes runtime env setup)
for log_line in executor.stream_logs(job_id):
print(log_line, end='')

# Get result
result = executor.get_result(job_id)

Constructor Parameters

ParameterTypeDefaultDescription
envPluginEnvironment | dict | NoneNoneEnvironment config
dashboard_addressstr'http://localhost:8265'Ray Dashboard HTTP address
runtime_envdict | NoneNoneRay runtime environment
working_dirstr | Path | NoneNonePlugin working directory
requirements_filestr | Path | NoneNonePath to requirements.txt
package_managerPackageManager | 'pip' | 'uv''pip'Package manager
package_manager_optionslist[str] | NoneNonePackage manager options
wheels_dirstr'wheels'Wheel files directory
num_cpusint | float | NoneNoneCPU cores to request
num_gpusint | float | NoneNoneGPUs to request
memoryint | NoneNoneMemory in bytes
include_sdkboolFalseBundle local SDK

Methods

submit()

def submit(
self,
action_cls: type[BaseAction] | str,
params: dict[str, Any],
*,
job_id: str | None = None,
) -> str:
"""Submit action as a Ray Job (non-blocking).

Returns:
Job ID for tracking.
"""

get_status()

def get_status(self, job_id: str) -> JobStatus:
"""Get job status.

Returns:
JobStatus enum (PENDING, RUNNING, SUCCEEDED, FAILED, STOPPED).
"""

get_logs()

def get_logs(self, job_id: str) -> str:
"""Get all job logs (includes runtime env setup logs).

Returns:
Full job logs as a string.
"""

stream_logs()

def stream_logs(
self,
job_id: str,
*,
timeout: float = 3600.0,
) -> Iterator[str]:
"""Stream job logs synchronously (includes runtime env setup logs).

Yields:
Log lines as they become available.
"""

stream_logs_async()

async def stream_logs_async(self, job_id: str) -> AsyncIterator[str]:
"""Stream job logs asynchronously.

Yields:
Log lines as they become available.
"""

get_result()

def get_result(self, job_id: str, timeout: float | None = None) -> Any:
"""Get job result (blocks until complete).

Parses result from job output logs using markers.

Raises:
ExecutionError: If job failed or result cannot be parsed.
"""

wait()

def wait(
self,
job_id: str,
timeout_seconds: float = 300,
poll_interval: float = 1.0,
) -> str:
"""Wait for job to complete.

Returns:
Final job status as string.
"""

stop() / delete()

def stop(self, job_id: str) -> bool:
"""Stop a running job."""

def delete(self, job_id: str) -> bool:
"""Delete job info (only for terminal jobs)."""

Async Log Streaming

import asyncio

async def run_with_logs():
executor = RayJobsApiExecutor(
dashboard_address='http://localhost:8265',
working_dir='/plugin',
)

job_id = executor.submit(TrainAction, {'epochs': 100})

async for log_line in executor.stream_logs_async(job_id):
print(log_line, end='')

return executor.get_result(job_id)

result = asyncio.run(run_with_logs())

Resource Configuration

executor = RayJobsApiExecutor(
dashboard_address='http://ray-head:8265',
working_dir='/plugin',
num_cpus=4,
num_gpus=2,
memory=16 * 1024 * 1024 * 1024, # 16GB
package_manager='uv',
)

Good to know: Log streaming includes runtime environment setup progress (pip/uv install), making it invaluable for debugging dependency issues.

RayPipelineExecutor

Executes multi-action pipelines with automatic checkpointing and real-time progress streaming via SSE. Integrates with the pipeline service API for state management.

Quick Start

from synapse_sdk.plugins.executors.ray import RayPipelineExecutor, PipelineDefinition

executor = RayPipelineExecutor(
ray_address='auto',
working_dir='/path/to/plugin',
pipeline_service_url='http://localhost:8100',
)

pipeline = PipelineDefinition(
name='YOLO Training Pipeline',
actions=[DownloadAction, ConvertAction, TrainAction, ExportAction],
description='End-to-end training workflow',
)

# Submit pipeline (non-blocking)
run_id = executor.submit(pipeline, {'dataset': 123, 'epochs': 100})

# Wait for completion
progress = executor.wait(run_id, timeout_seconds=7200)
result = executor.get_result(run_id)

Constructor Parameters

ParameterTypeDefaultDescription
envPluginEnvironment | dict | NoneNoneEnvironment config
ray_addressstr'auto'Ray cluster address
runtime_envdict | NoneNoneRay runtime environment
working_dirstr | Path | NoneNonePlugin working directory
requirements_filestr | Path | NoneNonePath to requirements.txt
package_managerPackageManager | 'pip' | 'uv''pip'Package manager
package_manager_optionslist[str] | NoneNonePackage manager options
wheels_dirstr'wheels'Wheel files directory
num_cpusint | NoneNoneCPU cores to request
num_gpusint | NoneNoneGPUs to request
include_sdkboolFalseBundle local SDK
pipeline_service_urlstr'http://localhost:8100'Pipeline service API URL
actor_pipeline_service_urlstr | NoneNoneURL for Ray actor (use when cluster needs different URL)

PipelineDefinition

@dataclass
class PipelineDefinition:
"""Definition of a pipeline to execute.

Attributes:
name: Pipeline name.
actions: List of action classes or entrypoint strings.
description: Optional description.
"""
name: str
actions: list[type[BaseAction] | str]
description: str | None = None

Methods

submit()

def submit(
self,
pipeline: PipelineDefinition | list[type[BaseAction] | str],
params: dict[str, Any],
*,
name: str | None = None,
resume_from: str | None = None,
) -> str:
"""Submit a pipeline for execution (non-blocking).

Args:
pipeline: PipelineDefinition or list of action classes/entrypoints.
params: Initial parameters for the pipeline.
name: Pipeline name (required if pipeline is a list).
resume_from: Run ID to resume from. Skips completed actions
and restores accumulated params from latest checkpoint.

Returns:
Run ID for tracking.
"""

get_status()

def get_status(self, run_id: str) -> RunStatus:
"""Get run status.

Returns:
RunStatus enum (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED).
"""

get_progress()

def get_progress(self, run_id: str) -> PipelineProgress:
"""Get detailed progress for a run.

Returns:
PipelineProgress with current action, overall progress, and per-action status.
"""

stream_progress()

def stream_progress(
self,
run_id: str,
timeout: float = 3600.0,
) -> Iterator[PipelineProgress]:
"""Stream progress updates via SSE.

More efficient than polling for long-running pipelines.

Yields:
PipelineProgress objects with current state.
"""

stream_progress_async()

async def stream_progress_async(
self,
run_id: str,
timeout: float = 3600.0,
) -> AsyncIterator[PipelineProgress]:
"""Stream progress updates via SSE (async version)."""

wait()

def wait(
self,
run_id: str,
timeout_seconds: float = 3600,
poll_interval: float = 5.0,
) -> PipelineProgress:
"""Wait for pipeline to complete.

Returns:
Final PipelineProgress.

Raises:
ExecutionError: If pipeline fails or times out.
"""

cancel() / close()

def cancel(self, run_id: str) -> None:
"""Cancel a running pipeline."""

def close(self) -> None:
"""Close the executor and clean up resources."""

Progress Streaming

from synapse_sdk.plugins.models.pipeline import RunStatus

run_id = executor.submit(pipeline, params)

for progress in executor.stream_progress(run_id):
print(f'Action: {progress.current_action}')
print(f'Status: {progress.status}')
print(f'Progress: {progress.overall_progress:.1%}')

if progress.status == RunStatus.COMPLETED:
print('Pipeline completed!')
break
elif progress.status == RunStatus.FAILED:
print(f'Pipeline failed: {progress.error}')
break

Resume from Checkpoint

# First run - fails at action index 2 (TrainAction)
run_id_1 = executor.submit(pipeline, params)
# ... TrainAction fails due to OOM ...

# Resume from checkpoint - skips completed actions
run_id_2 = executor.submit(
pipeline,
params,
resume_from=run_id_1, # Resumes from TrainAction
)

# Actions 0 (Download) and 1 (Convert) are marked as SKIPPED
# Execution continues from action 2 (TrainAction)

Simple List Syntax

# Use a list directly instead of PipelineDefinition
run_id = executor.submit(
[DownloadAction, ConvertAction, TrainAction],
params={'dataset': 123},
name='quick-pipeline',
)

Good to know: Checkpoints store params_snapshot with accumulated parameters from all completed actions, enabling seamless resume without data loss.

Common Configuration

All Ray executors share common configuration through BaseRayExecutor.

Runtime Environment

The runtime environment is automatically configured with:

  1. Working directory: Plugin code uploaded to cluster
  2. Dependencies: Requirements from requirements.txt
  3. Wheel files: Local packages from wheels_dir
  4. Environment variables: Plugin environment + Synapse credentials
# Custom runtime_env
executor = RayActorExecutor(
runtime_env={
'pip': {'packages': ['torch==2.0.0', 'ultralytics']},
'env_vars': {'CUDA_VISIBLE_DEVICES': '0,1'},
},
)

Package Management

# Using UV (faster)
executor = RayJobExecutor(
working_dir='/plugin',
package_manager='uv',
package_manager_options=['--no-cache', '--system'],
)

# Default pip options include --upgrade
# UV defaults include --no-cache

Working Directory with Wheels

plugin/
├── config.yaml
├── requirements.txt
├── wheels/
│ └── custom_package-1.0.0-py3-none-any.whl
└── plugin/
└── train.py
executor = RayActorExecutor(
working_dir='/plugin',
wheels_dir='wheels', # Wheels auto-included in runtime_env
)

Resource Allocation

# CPU/GPU allocation
executor = RayActorExecutor(
num_cpus=4,
num_gpus=1,
)

# Memory (RayJobsApiExecutor only)
executor = RayJobsApiExecutor(
num_cpus=4,
num_gpus=2,
memory=16 * 1024 * 1024 * 1024, # 16GB
)

Credential Injection

Synapse credentials are automatically injected into the runtime environment:

# Automatically included in runtime_env['env_vars']:
# - SYNAPSE_HOST
# - SYNAPSE_ACCESS_TOKEN

This enables BackendClient creation on Ray workers without manual configuration.

Remote Cluster Connection

Local vs Remote Mode

# Local mode - connects to local Ray instance
executor = RayActorExecutor(ray_address='auto')

# Remote mode - connects via Ray client protocol
executor = RayActorExecutor(ray_address='ray://cluster-head:10001')

GCS Packaging

For remote clusters, the working directory is automatically uploaded to Ray's Global Control Store (GCS):

  • Content-addressed URIs prevent duplicate uploads
  • Cached within executor instance for reuse

Pipeline Service URL Configuration

For remote clusters that need different URLs:

executor = RayPipelineExecutor(
ray_address='ray://cluster:10001',
pipeline_service_url='http://localhost:8100', # Local SDK uses this
actor_pipeline_service_url='http://internal-api:8100', # Ray actor uses this
)

Monitoring & Logs

Ray Dashboard

Access the Ray Dashboard at http://<ray-head>:8265 to:

  • View job status and resource usage
  • Monitor worker health
  • Access detailed logs

Log Streaming (RayJobsApiExecutor)

executor = RayJobsApiExecutor(dashboard_address='http://localhost:8265')
job_id = executor.submit(TrainAction, params)

# Full logs after completion
logs = executor.get_logs(job_id)

# Real-time streaming
for line in executor.stream_logs(job_id):
print(line, end='')

Progress Monitoring (RayPipelineExecutor)

# Polling approach
while True:
progress = executor.get_progress(run_id)
print(f'{progress.current_action}: {progress.overall_progress:.1%}')
if progress.status.is_terminal():
break
time.sleep(5)

# SSE streaming (more efficient)
for progress in executor.stream_progress(run_id):
print(f'{progress.current_action}: {progress.overall_progress:.1%}')

Best Practices

Choose the Right Executor

Good to know: Use RayActorExecutor for development and fast iteration. Switch to RayJobsApiExecutor when you need log visibility, or RayPipelineExecutor for multi-action workflows.

Explicitly Declare Resources

# Always specify resources for production workloads
executor = RayJobsApiExecutor(
num_cpus=4,
num_gpus=1,
memory=8 * 1024 * 1024 * 1024, # 8GB
)

Handle Errors Gracefully

from synapse_sdk.plugins.errors import ExecutionError

try:
result = executor.get_result(job_id, timeout=3600)
except ExecutionError as e:
print(f'Job failed: {e}')
logs = executor.get_logs(job_id) # Get logs for debugging

Use Checkpoints for Long Pipelines

# If pipeline fails, resume from checkpoint
try:
progress = executor.wait(run_id)
except ExecutionError:
# Retry from last checkpoint
new_run_id = executor.submit(pipeline, params, resume_from=run_id)

Clean Up Resources

# RayActorExecutor - shutdown the actor
executor.shutdown()

# RayPipelineExecutor - close the client
executor.close()

Use UV for Faster Dependency Installation

# UV is significantly faster than pip
executor = RayJobExecutor(
working_dir='/plugin',
package_manager='uv',
)