Skip to main content

Pre-Annotation Actions

Pre-annotation actions prepare task data before human annotation. Use ToTaskAction to add or transform task data with either file-based inputs or inference outputs.

Overview

ToTaskAction handles:

  • Task discovery with filters
  • Data collection and file specification validation
  • File-based annotation ingestion
  • Inference-based annotation via pre-processor plugins
  • Progress and metrics tracking

ToTaskAction

Subclassing

Override the conversion hooks to map your data format into task data.

from synapse_sdk.plugins.actions.to_task import ToTaskAction


class ToTask(ToTaskAction):
action_name = 'to_task'

def convert_data_from_file(self, primary_file_url, primary_file_name, data_file_url, data_file_name, task_data=None):
# TODO: Parse data_file_url and return task data payload.
return {}

def convert_data_from_inference(self, inference_data, task_data=None):
# TODO: Convert inference output to task data payload.
return inference_data

Conversion Methods

Override these methods to transform your data into task data format:

MethodDescriptionParametersReturns
convert_data_from_file()Parse file-based annotation dataprimary_file_url, primary_file_name, data_file_url, data_file_name, task_data (optional)dict[str, Any] - Task data payload
convert_data_from_inference()Convert inference output to task datainference_data, task_data (optional)dict[str, Any] - Task data payload

Both methods receive an optional task_data parameter containing the full task payload for additional context.

Config.yaml

Define the action entrypoint in your plugin config.yaml:

actions:
to_task:
entrypoint: plugin.to_task.ToTask
method: job

Parameters

The base params model supports both file and inference workflows:

  • name: Action job name (required)
  • description: Optional description for the action job
  • project: Project ID
  • agent: Agent ID
  • task_filters: Task query filters (optional, default: {})
  • method: Annotation method - file or inference (default: file)
  • target_specification_name: File spec name (required for file method)
  • pre_processor: Pre-processor release ID (required for inference method)
  • model: Model ID (required for inference method)
  • pre_processor_params: Extra parameters for inference (optional, default: {})

Result

The action returns an ToTaskResult with the following fields:

  • status: Job completion status (SUCCEEDED, FAILED, etc.)
  • message: Summary message describing the outcome
  • total_tasks: Total number of tasks processed
  • success_count: Number of successfully annotated tasks
  • failed_count: Number of tasks that failed to annotate
  • failures: List of failure records, each containing:
    • task_id: ID of the failed task
    • error: Error message describing what went wrong

Pre-Processor Lifecycle (INFERENCE method)

When method=inference, PrepareStep ensures the pre-processor's Ray Serve application is running before annotation begins. The lifecycle has three phases:

  1. Detectionclient.list_serve_applications(plugin_code=code, job__agent=agent) polls for an existing RUNNING app.
  2. Dispatch — if no running app exists, client.run_plugin(code, {action: 'deployment', params: {...}}) is called.
  3. Wait — polls every 10s (default) until status reaches RUNNING, fails terminally, or timeout_seconds (default 180s) elapses.

Required Resources Payload (SYN-7005 / FR-8)

The dispatch params payload forwards resource requirements declared on the pre-processor plugin release under actions.inference.required_resources. This keeps the dispatch shape aligned with the agent-side capacity gate (see Inference Actions — Capacity Gate) so cluster-full conditions surface at dispatch instead of after Ray Serve has accepted a pending replica.

Payload keySource on required_resourcesRequiredNotes
num_cpusrequired_cpu_count (preferred) or num_cpusYesDefault 1
num_gpusrequired_gpu_count (preferred) or num_gpusYesDefault 0.1
num_replicasnum_replicasNoOmitted from payload when not declared
memorymemory (preferred) or memory_bytes (alias)NoSent on the wire as memory. Omitted when not declared

Optional keys are conditionally added — they are omitted (not sent as null) so backend request models with extra='forbid' continue to accept the payload.

actions:
inference:
required_resources:
required_cpu_count: 2
required_gpu_count: 1
num_replicas: 2
memory: 4294967296 # 4 GiB. `memory_bytes` is also accepted as alias.

Error Handling (SYN-7005 E-Series)

PrepareStep (INFERENCE method) raises structured RuntimeError instances on dispatch and wait failures. Error messages expose only the error_type (or terminal status); the original cause is preserved on __cause__ for diagnostics but no token / URL / stack is interpolated into the user-facing string.

Fail-Fast Matrix

PhaseTriggerException
Dispatch (_ensure_pre_processor_running)client.run_plugin(...) raises ClientError (HTTP 4xx/5xx)RuntimeError('Pre-processor deployment dispatch failed ({error_type})')
Wait (_wait_for_pre_processor)Serve app status reaches DEPLOY_FAILED / UNHEALTHY / UNAVAILABLERuntimeError('Pre-processor deployment failed: serve application reached terminal state ({status})')
Wait (_wait_for_pre_processor)Timeout elapses without RUNNINGRuntimeError('Pre-processor did not become ready within timeout')

Serve Application Status Classification (FR-6)

_classify_serve_app folds the backend's 7-value ServeApplication.Status enum into three actionable buckets so polling can short-circuit on terminal failures (previously, a doomed deploy burned the full 180s timeout budget).

BucketStatusesPolling Behavior
runningRUNNINGReturn immediately (success)
failedDEPLOY_FAILED, UNHEALTHY, UNAVAILABLERaise RuntimeError immediately (fail-fast)
pendingNOT_STARTED, DEPLOYING, DELETING, empty list, malformed payloadContinue polling
RUNNING Precedence

If any replica reports RUNNING, the classifier returns ('running', None) even when other replicas have failed. This avoids mis-classifying eventual-consistency rollouts during multi-replica deploys.

Pre-Processor Wait Flow

Error Handling Example

try:
# Triggered indirectly via ToTaskAction.execute()
result = action.execute()
except RuntimeError as exc:
msg = str(exc)
if 'dispatch failed' in msg:
# HTTP 4xx/5xx from agent — inspect exc.__cause__ for original ClientError
cause = exc.__cause__
log.error('dispatch failed', error_type=type(cause).__name__, status=getattr(cause, 'status_code', None))
elif 'reached terminal state' in msg:
# DEPLOY_FAILED / UNHEALTHY / UNAVAILABLE — check serve app logs on agent
log.error('pre-processor terminal failure', message=msg)
elif 'did not become ready within timeout' in msg:
# Deploy is still pending after 180s — check Ray cluster capacity
log.error('pre-processor wait timeout')
raise