Pre-Annotation Actions
Pre-annotation actions prepare task data before human annotation. Use ToTaskAction to add or transform task data with either file-based inputs or inference outputs.
Overview
ToTaskAction handles:
- Task discovery with filters
- Data collection and file specification validation
- File-based annotation ingestion
- Inference-based annotation via pre-processor plugins
- Progress and metrics tracking
ToTaskAction
Subclassing
Override the conversion hooks to map your data format into task data.
from synapse_sdk.plugins.actions.to_task import ToTaskAction
class ToTask(ToTaskAction):
action_name = 'to_task'
def convert_data_from_file(self, primary_file_url, primary_file_name, data_file_url, data_file_name, task_data=None):
# TODO: Parse data_file_url and return task data payload.
return {}
def convert_data_from_inference(self, inference_data, task_data=None):
# TODO: Convert inference output to task data payload.
return inference_data
Conversion Methods
Override these methods to transform your data into task data format:
| Method | Description | Parameters | Returns |
|---|---|---|---|
convert_data_from_file() | Parse file-based annotation data | primary_file_url, primary_file_name, data_file_url, data_file_name, task_data (optional) | dict[str, Any] - Task data payload |
convert_data_from_inference() | Convert inference output to task data | inference_data, task_data (optional) | dict[str, Any] - Task data payload |
Both methods receive an optional task_data parameter containing the full task payload for additional context.
Config.yaml
Define the action entrypoint in your plugin config.yaml:
actions:
to_task:
entrypoint: plugin.to_task.ToTask
method: job
Parameters
The base params model supports both file and inference workflows:
name: Action job name (required)description: Optional description for the action jobproject: Project IDagent: Agent IDtask_filters: Task query filters (optional, default:{})method: Annotation method -fileorinference(default:file)target_specification_name: File spec name (required for file method)pre_processor: Pre-processor release ID (required for inference method)model: Model ID (required for inference method)pre_processor_params: Extra parameters for inference (optional, default:{})
Result
The action returns an ToTaskResult with the following fields:
status: Job completion status (SUCCEEDED,FAILED, etc.)message: Summary message describing the outcometotal_tasks: Total number of tasks processedsuccess_count: Number of successfully annotated tasksfailed_count: Number of tasks that failed to annotatefailures: List of failure records, each containing:task_id: ID of the failed taskerror: Error message describing what went wrong
Pre-Processor Lifecycle (INFERENCE method)
When method=inference, PrepareStep ensures the pre-processor's Ray Serve application is running before annotation begins. The lifecycle has three phases:
- Detection —
client.list_serve_applications(plugin_code=code, job__agent=agent)polls for an existingRUNNINGapp. - Dispatch — if no running app exists,
client.run_plugin(code, {action: 'deployment', params: {...}})is called. - Wait — polls every 10s (default) until status reaches
RUNNING, fails terminally, ortimeout_seconds(default 180s) elapses.
Required Resources Payload (SYN-7005 / FR-8)
The dispatch params payload forwards resource requirements declared on the pre-processor plugin release under actions.inference.required_resources. This keeps the dispatch shape aligned with the agent-side capacity gate (see Inference Actions — Capacity Gate) so cluster-full conditions surface at dispatch instead of after Ray Serve has accepted a pending replica.
| Payload key | Source on required_resources | Required | Notes |
|---|---|---|---|
num_cpus | required_cpu_count (preferred) or num_cpus | Yes | Default 1 |
num_gpus | required_gpu_count (preferred) or num_gpus | Yes | Default 0.1 |
num_replicas | num_replicas | No | Omitted from payload when not declared |
memory | memory (preferred) or memory_bytes (alias) | No | Sent on the wire as memory. Omitted when not declared |
Optional keys are conditionally added — they are omitted (not sent as null) so backend request models with extra='forbid' continue to accept the payload.
actions:
inference:
required_resources:
required_cpu_count: 2
required_gpu_count: 1
num_replicas: 2
memory: 4294967296 # 4 GiB. `memory_bytes` is also accepted as alias.
Error Handling (SYN-7005 E-Series)
PrepareStep (INFERENCE method) raises structured RuntimeError instances on dispatch and wait failures. Error messages expose only the error_type (or terminal status); the original cause is preserved on __cause__ for diagnostics but no token / URL / stack is interpolated into the user-facing string.
Fail-Fast Matrix
| Phase | Trigger | Exception |
|---|---|---|
Dispatch (_ensure_pre_processor_running) | client.run_plugin(...) raises ClientError (HTTP 4xx/5xx) | RuntimeError('Pre-processor deployment dispatch failed ({error_type})') |
Wait (_wait_for_pre_processor) | Serve app status reaches DEPLOY_FAILED / UNHEALTHY / UNAVAILABLE | RuntimeError('Pre-processor deployment failed: serve application reached terminal state ({status})') |
Wait (_wait_for_pre_processor) | Timeout elapses without RUNNING | RuntimeError('Pre-processor did not become ready within timeout') |
Serve Application Status Classification (FR-6)
_classify_serve_app folds the backend's 7-value ServeApplication.Status enum into three actionable buckets so polling can short-circuit on terminal failures (previously, a doomed deploy burned the full 180s timeout budget).
| Bucket | Statuses | Polling Behavior |
|---|---|---|
running | RUNNING | Return immediately (success) |
failed | DEPLOY_FAILED, UNHEALTHY, UNAVAILABLE | Raise RuntimeError immediately (fail-fast) |
pending | NOT_STARTED, DEPLOYING, DELETING, empty list, malformed payload | Continue polling |
If any replica reports RUNNING, the classifier returns ('running', None) even when other replicas have failed. This avoids mis-classifying eventual-consistency rollouts during multi-replica deploys.
Pre-Processor Wait Flow
Error Handling Example
try:
# Triggered indirectly via ToTaskAction.execute()
result = action.execute()
except RuntimeError as exc:
msg = str(exc)
if 'dispatch failed' in msg:
# HTTP 4xx/5xx from agent — inspect exc.__cause__ for original ClientError
cause = exc.__cause__
log.error('dispatch failed', error_type=type(cause).__name__, status=getattr(cause, 'status_code', None))
elif 'reached terminal state' in msg:
# DEPLOY_FAILED / UNHEALTHY / UNAVAILABLE — check serve app logs on agent
log.error('pre-processor terminal failure', message=msg)
elif 'did not become ready within timeout' in msg:
# Deploy is still pending after 180s — check Ray cluster capacity
log.error('pre-processor wait timeout')
raise