Upload Actions
Upload actions handle file upload workflows with built-in step orchestration, progress tracking, and automatic rollback support.
Overview
BaseUploadAction is a specialized action base class designed for multi-step upload workflows. It enforces a step-based architecture where each phase of the upload process is defined as a separate step.
At a Glance
| Feature | Description |
|---|---|
| Step-based | Must override setup_steps() to register workflow steps |
| Automatic rollback | On failure, executes rollback() on completed steps in reverse order |
| Progress tracking | Tracks progress across all steps based on weights |
| Upload context | UploadContext carries state between steps |
Good to know: Unlike other action types,
BaseUploadActionrequires you to define workflow steps. Directexecute()override is not supported.
BaseUploadAction
# P = TypeVar('P', bound=BaseModel) - Parameter model type
class BaseUploadAction(BaseAction[P]):
"""Base class for upload actions with workflow step support."""
category = PluginCategory.UPLOAD
@property
def client(self) -> BackendClient:
"""Backend client from runtime context."""
...
def setup_steps(self, registry: StepRegistry[UploadContext]) -> None:
"""Register workflow steps. Override this method."""
pass
def create_context(self) -> UploadContext:
"""Create upload context for the workflow."""
...
def execute(self) -> dict[str, Any]:
"""Execute the upload workflow. Do not override."""
...
Class Attributes
| Attribute | Type | Description |
|---|---|---|
category | PluginCategory | Defaults to PluginCategory.UPLOAD |
Instance Properties
| Property | Type | Description |
|---|---|---|
client | BackendClient | Backend client from runtime context |
Methods to Override
| Method | Required | Description |
|---|---|---|
setup_steps(registry) | Yes | Register workflow steps to the registry |
create_context() | No | Customize upload context creation |
Warning: Do not override
execute()directly. The orchestrator calls it internally to run the step workflow.
UploadContext
UploadContext extends BaseStepContext with upload-specific state fields. Steps read and write to the context as the workflow progresses.
@dataclass
class UploadContext(BaseStepContext):
"""Shared context passed between upload workflow steps."""
# Upload parameters (from action params)
params: dict[str, Any] = field(default_factory=dict)
# Processing state (populated by steps)
storage: Any | None = None
pathlib_cwd: Any | None = None
organized_files: list[dict[str, Any]] = field(default_factory=list)
uploaded_files: list[dict[str, Any]] = field(default_factory=list)
data_units: list[dict[str, Any]] = field(default_factory=list)
Fields
| Field | Type | Populated By | Description |
|---|---|---|---|
params | dict[str, Any] | Action | Upload parameters from action params |
storage | Any | None | Init step | Storage configuration |
pathlib_cwd | Any | None | Init step | Working directory path |
organized_files | list[dict] | Organize step | Files prepared for upload |
uploaded_files | list[dict] | Upload step | Successfully uploaded files |
data_units | list[dict] | Generate step | Created data units |
Inherited from BaseStepContext
| Field | Type | Description |
|---|---|---|
runtime_ctx | RuntimeContext | Parent runtime context |
step_results | list[StepResult] | Results from each executed step |
errors | list[str] | Accumulated error messages |
current_step | str | None | Name of currently executing step |
Context Methods (Inherited from BaseStepContext)
| Method | Description |
|---|---|
log(event, data, file) | Log an event via runtime context |
set_progress(current, total, step) | Set progress (auto-uses current_step if no step) |
set_metrics(value: dict, step) | Set metrics (auto-uses current_step if no step) |
UploadContext Properties
| Property | Type | Description |
|---|---|---|
client | BackendClient | Backend client from runtime context (raises RuntimeError if not available) |
DefaultUploadAction
DefaultUploadAction extends BaseUploadAction with a pre-configured 8-step workflow. Use this when you need the standard upload pipeline without customization.
from synapse_sdk.plugins.actions.upload import DefaultUploadAction, UploadParams
class MyUploadAction(DefaultUploadAction[UploadParams]):
action_name = 'upload'
params_model = UploadParams
# All 8 steps are automatically registered:
# 1. InitializeStep (5%) - Storage and path setup
# 2. ProcessMetadataStep (10%) - Excel/CSV metadata loading
# 3. AnalyzeCollectionStep (5%) - File specifications loading
# 4. OrganizeFilesStep (15%) - File grouping by stem
# 5. ValidateFilesStep (10%) - Validation against specs
# 6. UploadFilesStep (30%) - File upload with deduplication
# 7. GenerateDataUnitsStep (20%) - Data unit creation
# 8. CleanupStep (5%) - Final cleanup
For custom workflows, extend BaseUploadAction instead and override setup_steps().
Upload Configuration
UploadConfig controls file upload behavior including chunked uploads, presigned URLs, and duplicate detection.
@dataclass
class UploadConfig:
chunked_threshold_mb: int = 50 # Chunked upload threshold (MB)
batch_size: int = 1 # Data unit creation batch size
max_workers: int = 10 # Concurrent upload workers
use_presigned: bool = True # Use presigned URL uploads
skip_local_duplicates: bool = True # Remove local duplicate files by checksum
skip_existing_checksums: bool = False # Skip files already on server
checksum_batch_size: int = 1000 # Server checksum verification batch size
Configuration Parameters
| Parameter | Default | Description |
|---|---|---|
chunked_threshold_mb | 50 | Files larger than this are uploaded in chunks |
batch_size | 1 | Number of data units created per API call |
max_workers | 10 | Number of concurrent upload threads |
use_presigned | True | Use presigned URL uploads (S3, MinIO, GCS, Azure) |
skip_local_duplicates | True | Calculate MD5 checksums and remove local duplicates before upload |
skip_existing_checksums | False | Query server for existing checksums and skip already-uploaded files |
checksum_batch_size | 1000 | Number of checksums per server verification API call |
These values can be overridden via context.params:
# In your plugin's params or action configuration
params = {
'skip_local_duplicates': True, # default: True
'skip_existing_checksums': True, # default: False
'checksum_batch_size': 500,
'max_file_size_mb': 100,
'upload_batch_size': 10,
'use_presigned': True,
}
Duplicate Detection
The upload step supports two levels of duplicate detection, both executed before any files are uploaded:
Local Duplicate Detection (skip_local_duplicates)
Enabled by default. Calculates MD5 checksums for all files using parallel workers, then removes files with duplicate checksums. The first occurrence is kept, subsequent duplicates are skipped.
30 files (15 originals + 15 copies with same content)
│
├─ Parallel MD5 calculation (ThreadPoolExecutor, 10 workers)
├─ deduplicate_by_checksum() → set-based O(1) lookup
│ └─ 15 unique, 15 duplicates skipped
│
▼ Only 15 files proceed to upload
This ensures that the number of created data units matches the joblog count accurately.
Server-Side Duplicate Detection (skip_existing_checksums)
Disabled by default. Queries the server in batches to find checksums that already exist, then filters out those files. Useful for resuming interrupted uploads or re-importing folders.
15 files after local dedup
│
├─ Batch API call: verify_data_files_checksums (1000 per batch)
├─ Server returns existing checksums
├─ filter_existing_checksums() → remove already-uploaded files
│
▼ Only truly new files proceed to upload
Note: When both options are enabled, local deduplication runs first, reducing the number of checksums sent to the server.
Step-Based Workflow
Upload actions must define their workflow through steps. Override setup_steps() to register steps in execution order.
Creating a Step
from synapse_sdk.plugins.steps import BaseStep, StepResult
from synapse_sdk.plugins.actions.upload import UploadContext
class ValidateFilesStep(BaseStep[UploadContext]):
"""Validate files before upload."""
@property
def name(self) -> str:
return 'validate'
@property
def progress_weight(self) -> float:
return 0.1 # 10% of total progress
def execute(self, context: UploadContext) -> StepResult:
files = context.organized_files
if not files:
return StepResult(success=False, error='No files to upload')
# Validate each file
for file in files:
if not self._is_valid(file):
return StepResult(success=False, error=f"Invalid file: {file['path']}")
return StepResult(success=True, data={'validated_count': len(files)})
def _is_valid(self, file: dict) -> bool:
# Validation logic
return True
Registering Steps
from pydantic import BaseModel
from synapse_sdk.plugins.actions.upload import BaseUploadAction, UploadContext
from synapse_sdk.plugins.steps import StepRegistry
from .steps import InitStep, ValidateFilesStep, UploadFilesStep, CleanupStep
class UploadParams(BaseModel):
storage_id: int
path: str
class MyUploadAction(BaseUploadAction[UploadParams]):
action_name = 'upload'
def setup_steps(self, registry: StepRegistry[UploadContext]) -> None:
registry.register(InitStep())
registry.register(ValidateFilesStep())
registry.register(UploadFilesStep())
registry.register(CleanupStep())
Step Execution Order
┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
│ Init │───▶│ Validate │───▶│ Upload │───▶│ Cleanup │
└────────────┘ └────────────┘ └────────────┘ └────────────┘
10% 10% 60% 20%
The orchestrator executes steps in registration order:
- Init: Initialize storage connection and paths
- Validate: Check files before upload
- Upload: Transfer files to storage
- Cleanup: Post-upload cleanup tasks
Automatic Rollback
When a step fails, the orchestrator automatically rolls back all previously executed steps in reverse order.
Implementing Rollback
from synapse_sdk.plugins.steps import BaseStep, StepResult
from synapse_sdk.plugins.actions.upload import UploadContext
class UploadFilesStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'upload'
@property
def progress_weight(self) -> float:
return 0.6
def execute(self, context: UploadContext) -> StepResult:
uploaded = []
for file in context.organized_files:
result = self._upload_file(file, context)
uploaded.append(result)
context.uploaded_files.append(result)
return StepResult(
success=True,
data={'uploaded_count': len(uploaded)},
rollback_data={'uploaded_files': uploaded}, # Store for rollback
)
def rollback(self, context: UploadContext, result: StepResult) -> None:
"""Delete uploaded files on failure."""
uploaded = result.rollback_data.get('uploaded_files', [])
for file in uploaded:
self._delete_file(file, context)
def _upload_file(self, file: dict, context: UploadContext) -> dict:
# Upload logic
return {'path': file['path'], 'storage_id': context.storage}
def _delete_file(self, file: dict, context: UploadContext) -> None:
# Delete logic for rollback
pass
Rollback Flow
┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
│ Init │───▶│ Validate │───▶│ Upload │──X─│ Cleanup │
│ (success) │ │ (success) │ │ (success) │ │ (failed) │
└────────────┘ └────────────┘ └────────────┘ └────────────┘
▲ ▲ ▲
│ │ │
└─────────────────┴─────────────────┘
Rollback
(reverse order: Upload → Validate → Init)
Good to know: Rollback is best-effort. If a rollback fails, the error is logged but other rollbacks continue. Always design rollback logic to be idempotent.
Complete Example
A full file upload plugin implementation:
from dataclasses import dataclass
from pathlib import Path
from pydantic import BaseModel, Field
from synapse_sdk.plugins.actions.upload import BaseUploadAction, UploadContext
from synapse_sdk.plugins.steps import BaseStep, StepRegistry, StepResult
# Parameters
class FileUploadParams(BaseModel):
storage_id: int = Field(description='Target storage ID')
source_path: str = Field(description='Local directory path')
extensions: list[str] = Field(default=['.jpg', '.png'], description='File extensions to upload')
# Steps
class InitializeStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'initialize'
@property
def progress_weight(self) -> float:
return 0.1
def execute(self, context: UploadContext) -> StepResult:
storage_id = context.params['storage_id']
storage = context.client.get_storage(storage_id)
context.storage = storage
context.pathlib_cwd = Path(context.params['source_path'])
return StepResult(success=True)
class OrganizeFilesStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'organize'
@property
def progress_weight(self) -> float:
return 0.1
def execute(self, context: UploadContext) -> StepResult:
extensions = context.params.get('extensions', ['.jpg', '.png'])
source_dir = context.pathlib_cwd
files = []
for ext in extensions:
files.extend(source_dir.glob(f'**/*{ext}'))
context.organized_files = [{'path': str(f), 'name': f.name} for f in files]
context.log('files_organized', {'count': len(files)})
return StepResult(success=True, data={'file_count': len(files)})
class UploadFilesStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'upload'
@property
def progress_weight(self) -> float:
return 0.6
def execute(self, context: UploadContext) -> StepResult:
files = context.organized_files
total = len(files)
for i, file in enumerate(files):
# Upload file (implementation depends on storage type)
result = self._upload_to_storage(file, context.storage)
context.uploaded_files.append(result)
# Update progress (step auto-inferred from current step name)
context.set_progress(i + 1, total)
return StepResult(
success=True,
data={'uploaded_count': total},
rollback_data={'files': context.uploaded_files},
)
def rollback(self, context: UploadContext, result: StepResult) -> None:
for file in result.rollback_data.get('files', []):
self._delete_from_storage(file, context.storage)
def _upload_to_storage(self, file: dict, storage) -> dict:
# Upload implementation
return {'path': file['path'], 'uploaded': True}
def _delete_from_storage(self, file: dict, storage) -> None:
# Rollback implementation
pass
class FinalizeStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'finalize'
@property
def progress_weight(self) -> float:
return 0.2
def execute(self, context: UploadContext) -> StepResult:
# Create data units or finalize upload
for file in context.uploaded_files:
data_unit = {'file': file['path'], 'status': 'complete'}
context.data_units.append(data_unit)
context.log('upload_complete', {
'uploaded': len(context.uploaded_files),
'data_units': len(context.data_units),
})
return StepResult(success=True)
# Action
class FileUploadAction(BaseUploadAction[FileUploadParams]):
"""Upload files to storage with automatic rollback support."""
action_name = 'upload'
def setup_steps(self, registry: StepRegistry[UploadContext]) -> None:
registry.register(InitializeStep())
registry.register(OrganizeFilesStep())
registry.register(UploadFilesStep())
registry.register(FinalizeStep())
Best Practices
Step Design
- Keep steps focused: Each step should have a single responsibility
- Set appropriate weights:
progress_weightshould reflect actual execution time - Implement rollback: Always implement
rollback()for steps that modify state
Large File Handling
def execute(self, context: UploadContext) -> StepResult:
files = context.organized_files
for i, file in enumerate(files):
# Use chunked upload for large files
if file['size'] > 100_000_000: # 100MB
self._chunked_upload(file, context)
else:
self._simple_upload(file, context)
context.set_progress(i + 1, len(files))
return StepResult(success=True)
Retry Logic
from tenacity import retry, stop_after_attempt, wait_exponential
class UploadFilesStep(BaseStep[UploadContext]):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def _upload_file(self, file: dict, storage) -> dict:
# Upload with automatic retry on failure
return storage.upload(file['path'])
Conditional Steps
class ValidateStep(BaseStep[UploadContext]):
def can_skip(self, context: UploadContext) -> bool:
"""Skip validation if skip_validation is set."""
return context.params.get('skip_validation', False)
def execute(self, context: UploadContext) -> StepResult:
# Validation logic
return StepResult(success=True)
Related
- Steps & Workflow - Step infrastructure details
- Defining Actions - Action definition patterns
- RuntimeContext - Context API reference