Skip to main content

Pipeline Patterns

The Synapse SDK provides powerful pipeline patterns for orchestrating complex workflows. These patterns enable you to break down complex operations into discrete, manageable steps with built-in progress tracking, error handling, and automatic rollback.

Available Pipeline Patterns

Step Orchestration

A sequential step-based workflow system with:

  • Ordered step execution - Steps run in sequence with dependencies
  • Automatic progress tracking - Weighted progress calculation across all steps
  • Rollback on failure - Automatic cleanup when steps fail
  • Step composition - Combine and reorder steps easily
  • Utility wrappers - Built-in logging, timing, and validation steps

Use Cases

Pipeline patterns are ideal for:

ScenarioExample
Multi-phase workflowsUpload: initialize -> validate -> upload -> cleanup
Operations requiring cleanupFile processing with cleanup on failure
Progress-tracked operationsTraining: dataset (20%) -> train (60%) -> upload (20%)
Composable workflowsReusable steps shared across actions

Quick Example

workflow_example.py
from synapse_sdk.plugins.steps import (
BaseStep, StepResult, StepRegistry, Orchestrator, BaseStepContext
)
from synapse_sdk.plugins import RuntimeContext
from dataclasses import dataclass, field

@dataclass
class MyContext(BaseStepContext):
"""Custom context for my workflow."""
data: list[str] = field(default_factory=list)
processed_count: int = 0

class LoadDataStep(BaseStep[MyContext]):
@property
def name(self) -> str:
return 'load_data'

@property
def progress_weight(self) -> float:
return 0.2 # 20% of total progress

def execute(self, context: MyContext) -> StepResult:
context.data = ['item1', 'item2', 'item3']
return StepResult(success=True)

class ProcessDataStep(BaseStep[MyContext]):
@property
def name(self) -> str:
return 'process_data'

@property
def progress_weight(self) -> float:
return 0.7 # 70% of total progress

def execute(self, context: MyContext) -> StepResult:
for item in context.data:
# Process each item
context.processed_count += 1
return StepResult(success=True)

def rollback(self, context: MyContext, result: StepResult) -> None:
# Cleanup on failure
context.processed_count = 0

class FinalizeStep(BaseStep[MyContext]):
@property
def name(self) -> str:
return 'finalize'

@property
def progress_weight(self) -> float:
return 0.1 # 10% of total progress

def execute(self, context: MyContext) -> StepResult:
return StepResult(
success=True,
data={'processed': context.processed_count}
)

# Execute the workflow
runtime_ctx = RuntimeContext() # Create runtime context

registry = StepRegistry[MyContext]()
registry.register(LoadDataStep())
registry.register(ProcessDataStep())
registry.register(FinalizeStep())

context = MyContext(runtime_ctx=runtime_ctx)
orchestrator = Orchestrator(registry, context)
result = orchestrator.execute()
# {'success': True, 'steps_executed': 3, 'steps_total': 3}

Core Components

ComponentDescription
BaseStep[C]Abstract base class for workflow steps
StepResultDataclass containing step execution results
StepRegistry[C]Manages ordered list of steps
Orchestrator[C]Executes steps with progress and rollback
BaseStepContextBase context for sharing state between steps

Utility Steps

UtilityDescription
LoggingStepWraps a step with start/end logging
TimingStepMeasures step execution duration
ValidationStepValidates context state before proceeding

Integration with Actions

All base action classes (Train, Export, Upload) support optional step-based execution:

upload_action.py
from synapse_sdk.plugins import BaseUploadAction
from synapse_sdk.plugins.steps import BaseStep, StepRegistry

class MyUploadAction(BaseUploadAction[MyParams]):
def setup_steps(self, registry: StepRegistry[UploadContext]) -> None:
registry.register(InitializeStep())
registry.register(ValidateStep())
registry.register(UploadFilesStep())
registry.register(CleanupStep())

See the Step Orchestration guide for complete documentation.