Skip to main content

Plugin System

The Synapse SDK plugin system provides a type-safe, extensible framework for building ML pipelines. You can define actions using either function decorators or class inheritance, chain them into pipelines, and execute them locally or on distributed Ray clusters.

Overview

The plugin system centers around actions—reusable units of work with validated parameters and typed inputs/outputs. Actions can be composed into pipelines for complex workflows, orchestrated through steps for fine-grained control, and executed via executors that support both local development and distributed production environments.

At a Glance

ConceptDescriptionPrimary Use
BaseActionGeneric base class with typed parametersComplex actions with workflows
@action decoratorFunction-based action definitionSimple, stateless operations
RuntimeContextInjected context with logger, client, envAccess runtime services
DataTypeSemantic input/output typesPipeline compatibility validation
StepWorkflow unit with progress trackingMulti-stage operations
PipelineAction chaining with schema validationEnd-to-end ML workflows
ExecutorLocal or Ray-based executionDevelopment and production

Key Concepts

BaseAction

BaseAction is a generic abstract class that provides type-safe parameter handling. The generic parameter P must be a Pydantic BaseModel that defines the action's input schema.

from abc import ABC, abstractmethod
from typing import Generic, TypeVar
from pydantic import BaseModel

P = TypeVar('P', bound=BaseModel)

class BaseAction(ABC, Generic[P]):
# Optional: injected from config.yaml during discovery if not set
action_name: str | None = None # Action name for invocation
category: PluginCategory | None = None # Category for grouping

# Semantic types for pipeline compatibility validation
input_type: type[DataType] | None = None # Semantic input type
output_type: type[DataType] | None = None # Semantic output type
params_model: type[P] # Auto-extracted from generic
result_model: type[BaseModel] | type[NoResult] = NoResult # Optional result validation

def __init__(self, params: P, ctx: RuntimeContext) -> None:
self.params = params
self.ctx = ctx

@abstractmethod
def execute(self) -> Any:
"""Execute the action logic."""
...

Good to know:

  • The params_model is automatically extracted from BaseAction[MyParams]
  • Set result_model to enable output validation (warning-only mode)
  • Use input_type and output_type for pipeline compatibility checks

@action Decorator

For simple, stateless operations, use the @action decorator to define function-based actions. The decorator attaches metadata for discovery and execution.

Decorator parameters:

ParameterTypeDescription
namestr | NoneAction name (defaults to function name)
descriptionstrHuman-readable description
paramstype[BaseModel] | NonePydantic model for parameter validation
resulttype[BaseModel] | NonePydantic model for result validation
categoryPluginCategory | NoneCategory for grouping actions
from synapse_sdk.plugins import action, RuntimeContext
from synapse_sdk.plugins.enums import PluginCategory
from pydantic import BaseModel

class ConvertParams(BaseModel):
source_path: str
target_format: str = 'yolo'

class ConvertResult(BaseModel):
output_path: str
files_converted: int

@action(
name='convert',
description='Convert dataset format',
params=ConvertParams,
result=ConvertResult,
category=PluginCategory.EXPORT,
)
def convert(params: ConvertParams, ctx: RuntimeContext) -> ConvertResult:
ctx.log_message(f'Converting to {params.target_format}')
# Conversion logic...
return ConvertResult(output_path='/converted', files_converted=100)

RuntimeContext

RuntimeContext is injected into every action, providing access to logging, environment variables, backend clients, and checkpoints.

@dataclass
class RuntimeContext:
logger: BaseLogger # Structured logging
env: PluginEnvironment # Environment variables
job_id: str | None = None # Current job identifier
client: BackendClient | None = None # Backend API client
agent_client: AgentClient | None = None # Ray operations client
checkpoint: dict[str, Any] | None = None # Pretrained model info

Available methods:

MethodDescription
set_progress(current, total, step)Update progress for a step
set_metrics(value, step)Record metrics (loss, accuracy)
log(event, data, file)Log structured event
log_message(message, context)Log user-facing message
log_dev_event(message, data)Log development/debug event
end_log()Signal plugin execution complete

Semantic Type System

Actions declare input_type and output_type using the DataType hierarchy. This enables compile-time validation of pipeline compatibility.

class DataType:
"""Base class for semantic data types."""

@classmethod
def is_compatible_with(cls, other: type[DataType]) -> bool:
return issubclass(cls, other) or issubclass(other, cls)

# Dataset types
class Dataset(DataType): ...
class DMDataset(Dataset): ... # DataMaker format
class DMv1Dataset(DMDataset): ... # DataMaker v1
class DMv2Dataset(DMDataset): ... # DataMaker v2
class YOLODataset(Dataset): ... # YOLO detection format
class COCODataset(Dataset): ... # COCO instance segmentation
class PascalVOCDataset(Dataset): ... # Pascal VOC format

# Model types
class Model(DataType): ...
class ModelWeights(Model): ... # Trained weights
class ONNXModel(Model): ... # ONNX export
class TensorRTModel(Model): ... # TensorRT optimized

# Result types
class Result(DataType): ...
class TestResults(Result): ... # Test/evaluation metrics
class InferenceResults(Result): ... # Inference predictions

Key Features

The plugin system provides:

  • Type-safe parameters — Pydantic validation with auto-generated UI schemas
  • Step-based workflows — Orchestrated execution with progress tracking and rollback
  • Action pipelines — Chain actions with automatic result-to-params mapping
  • Semantic type validation — Compile-time pipeline compatibility checks
  • Multiple execution modes — Local, Ray Task, Ray Job, Ray Serve
  • Dataset format conversion — DM, YOLO, COCO, Pascal VOC interoperability

Plugin Categories

Organize plugins by functionality using PluginCategory:

CategoryValueDescriptionExample Actions
Neural Networkneural_netTraining, inference, deploymentTrainAction, InferenceAction
ExportexportData export and format conversionExportAction
UploaduploadFile upload and validationUploadAction
Smart Toolsmart_toolAI-assisted annotation toolsAutoLabelAction
Pre-Annotationpre_annotationAnnotation preprocessingToTaskAction
Post-Annotationpost_annotationAnnotation postprocessingValidateAction
Data Validationdata_validationData quality checksQualityCheckAction
CustomcustomUser-defined functionalityAny custom action

Execution Methods

Choose the appropriate execution method based on your workload:

MethodValueCharacteristicsBest For
LocallocalIn-process execution, no Ray dependencyDevelopment, debugging
TasktaskSynchronous, fast startup (under 1s), reusable ActorInteractive operations, medium workloads
JobjobAsynchronous, isolated environment, status trackingLong-running training (100s+), batch processing
ServeserveREST API endpoint, auto-scalingReal-time inference, model serving

Good to know:

  • Use local or task for development iteration and job for production training runs.
  • The run_plugin() function supports local, task, and job modes.
  • The serve mode requires Ray Serve deployment configuration and is not available via run_plugin().

Architecture

The plugin system follows a layered architecture:

Data flow:

  1. Definition — Define actions via decorator or class inheritance
  2. Discovery — Scan modules or load from config.yaml
  3. Execution — Select executor based on workload (local/task/job)
  4. Runtime — Inject context, orchestrate steps, chain pipelines

Quick Start

Function-Based Action (Simple)

Create a simple action using the @action decorator:

from synapse_sdk.plugins import action, RuntimeContext
from pydantic import BaseModel, Field

class GreetParams(BaseModel):
name: str = Field(description='Name to greet')
formal: bool = False

@action(name='greet', params=GreetParams)
def greet(params: GreetParams, ctx: RuntimeContext) -> dict:
prefix = 'Dear' if params.formal else 'Hello'
message = f'{prefix} {params.name}!'
ctx.log_message(message, 'info')
return {'message': message}

Class-Based Action (Complex Workflows)

For complex actions with progress tracking, use class inheritance:

from synapse_sdk.plugins.action import BaseAction
from synapse_sdk.plugins.types import YOLODataset, ModelWeights
from synapse_sdk.plugins.enums import PluginCategory
from pydantic import BaseModel, Field

class TrainParams(BaseModel):
dataset: int = Field(description='Dataset ID to train on')
epochs: int = Field(default=100, ge=1)
batch_size: int = Field(default=32, ge=1)

class TrainResult(BaseModel):
model_path: str
final_loss: float
metrics: dict[str, float]

class TrainAction(BaseAction[TrainParams]):
category = PluginCategory.NEURAL_NET
input_type = YOLODataset
output_type = ModelWeights
result_model = TrainResult

def execute(self) -> TrainResult:
self.set_progress(0, self.params.epochs, 'training')

for epoch in range(self.params.epochs):
# Training logic...
self.set_progress(epoch + 1, self.params.epochs, 'training')
self.set_metrics({'loss': 0.5, 'mAP50': 0.85}, 'training')

return TrainResult(
model_path='/models/best.pt',
final_loss=0.05,
metrics={'mAP50': 0.92, 'mAP50-95': 0.78}
)

Plugin Configuration

Define your plugin in config.yaml:

name: My Training Plugin
code: my-training-plugin
version: 1.0.0
category: neural_net
description: Custom training plugin with YOLO support

package_manager: pip

actions:
greet:
entrypoint: my_plugin.actions:greet
method: task
description: Simple greeting action

train:
entrypoint: my_plugin.train:TrainAction
method: job
description: Train model on dataset
input_type: yolo_dataset
output_type: model_weights

Running Actions

Execute actions using the runner:

from synapse_sdk.plugins.runner import run_plugin

# Local execution (development)
result = run_plugin(
plugin_code='my-training-plugin',
action='train',
params={'dataset': 123, 'epochs': 10},
mode='local'
)

# Ray Task execution (production, fast startup)
result = run_plugin(
plugin_code='my-training-plugin',
action='train',
params={'dataset': 123, 'epochs': 10},
mode='task',
ray_address='ray://cluster:10001'
)

# Ray Job execution (production, long-running)
job_id = run_plugin(
plugin_code='my-training-plugin',
action='train',
params={'dataset': 123, 'epochs': 100},
mode='job',
ray_address='ray://cluster:10001'
)