Skip to main content

Export Actions

Export actions transform annotation data into various formats for external use. Use BaseExportAction to build custom exporters that convert assignments, ground truth datasets, or tasks into formats like COCO, YOLO, or custom schemas.

Overview

Export actions provide a structured way to:

  • Query filtered annotation results from the backend
  • Transform data into target formats (COCO, YOLO, Pascal VOC, CSV, etc.)
  • Track export progress with built-in progress reporting
  • Handle large datasets efficiently using generators

At a Glance

PropertyValue
Base ClassBaseExportAction[P]
CategoryPluginCategory.EXPORT
Progress StepDATASET_CONVERSION
ContextExportContext
Execution ModesSimple / Step-Based
Large-volume pathSingle fixed 9-step registry — in-process or server-side delegated acquisition head + shared transform tail (auto, threshold-gated — see 서버사이드 위임)

BaseExportAction

BaseExportAction extends BaseAction with export-specific functionality. It provides backend client access and progress tracking out of the box.

from synapse_sdk.plugins.action import BaseAction
from synapse_sdk.plugins.enums import PluginCategory

class BaseExportAction(BaseAction[P]):
category = PluginCategory.EXPORT

The generic type P represents your params model, which must inherit from BaseModel.

Progress Categories

Export actions support standardized progress tracking:

CategoryConstantDescription
Dataset ConversionDATASET_CONVERSIONData transformation and file generation phase
# Track conversion progress
self.set_progress(current, total, self.progress.DATASET_CONVERSION)

Key Methods

client Property

Access the backend client for API calls. Raises RuntimeError if no client exists in the context.

# Access backend client
assignments = self.client.get_assignments(filters)

Good to know: If you don't need the backend client, override get_filtered_results() to fetch data from alternative sources.

get_filtered_results()

Override this abstract method to fetch data for export. Returns a tuple of (results_iterator, total_count).

def get_filtered_results(self, filters: dict[str, Any]) -> tuple[Any, int]:
"""Fetch filtered results for export.

Args:
filters: Filter criteria dict.

Returns:
Tuple of (results_iterator, total_count).
"""
return self.client.get_assignments(filters)

setup_steps()

Register workflow steps for step-based execution. If steps are registered, step-based execution takes precedence over execute().

def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchDataStep())
registry.register(ConvertFormatStep())
registry.register(SaveOutputStep())

create_context()

Create the export context for step-based workflows. Override to customize context initialization.

def create_context(self) -> ExportContext:
params_dict = self.params.model_dump()
return ExportContext(
runtime_ctx=self.ctx,
params=params_dict,
)

ExportContext

ExportContext carries shared state between workflow steps. It extends BaseStepContext with export-specific fields.

from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

from synapse_sdk.plugins.steps import BaseStepContext

if TYPE_CHECKING:
from synapse_sdk.clients.backend import BackendClient

@dataclass
class ExportContext(BaseStepContext):
# Input parameters
params: dict[str, Any] = field(default_factory=dict)

# Processing state (populated by steps)
results: Any | None = None
total_count: int = 0
exported_count: int = 0
output_path: str | None = None

@property
def client(self) -> BackendClient:
"""Backend client from runtime context."""
if self.runtime_ctx.client is None:
raise RuntimeError('No client in runtime context')
return self.runtime_ctx.client

Fields

FieldTypeDescription
paramsdict[str, Any]Export parameters from action
resultsAny | NoneFetched data (populated by steps)
total_countintTotal items to export
exported_countintSuccessfully exported items
output_pathstr | NoneOutput file/directory path

Properties

PropertyTypeDescription
clientBackendClientBackend client from runtime context. Raises RuntimeError if no client.

Inherited Fields

From BaseStepContext:

FieldTypeDescription
runtime_ctxRuntimeContextRuntime context with logger, client, env
step_resultslist[StepResult]Results from completed steps
errorslist[str]Accumulated error messages
current_stepstr | NoneCurrently executing step name

StepResult

StepResult is a dataclass representing the result of a workflow step execution.

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

@dataclass
class StepResult:
success: bool = True
data: dict[str, Any] = field(default_factory=dict)
error: str | None = None
rollback_data: dict[str, Any] = field(default_factory=dict)
skipped: bool = False
timestamp: datetime = field(default_factory=datetime.now)
FieldTypeDescription
successboolWhether the step completed successfully (default: True)
datadict[str, Any]Output data from the step
errorstr | NoneError message if step failed
rollback_datadict[str, Any]Data needed for rollback on failure
skippedboolWhether the step was skipped (default: False)
timestampdatetimeWhen the step completed

Execution Modes

Choose between two execution modes based on complexity.

Simple Mode

Override execute() directly for straightforward export logic.

from typing import Any

from pydantic import BaseModel
from synapse_sdk.plugins.actions.export import BaseExportAction

class ExportParams(BaseModel):
project_id: int
output_format: str = "coco"

class SimpleExportAction(BaseExportAction[ExportParams]):
action_name = "simple_export"
params_model = ExportParams

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)

def execute(self) -> dict[str, Any]:
# Fetch data
results, total = self.get_filtered_results({
"project": self.params.project_id
})

# Initialize progress
self.set_progress(0, total, self.progress.DATASET_CONVERSION)

# Process items
exported = []
for i, item in enumerate(results, 1):
exported.append(self._convert_item(item))
self.set_progress(i, total, self.progress.DATASET_CONVERSION)

return {
"format": self.params.output_format,
"exported_count": len(exported),
"data": exported
}

def _convert_item(self, item: dict) -> dict:
# Conversion logic here
return item

Tip: Use Simple Mode when your export logic fits in a single method without complex error recovery needs.

Step-Based Mode

Use setup_steps() for complex workflows with multiple phases, progress tracking, and automatic rollback on failure.

from pathlib import Path

from synapse_sdk.plugins.actions.export import BaseExportAction, ExportContext
from synapse_sdk.plugins.steps import BaseStep, StepResult, StepRegistry

# Define workflow steps
class FetchDataStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "fetch_data"

@property
def progress_weight(self) -> float:
return 0.2 # 20% of total progress

def execute(self, context: ExportContext) -> StepResult:
filters = context.params.get("filters", {})
context.results, context.total_count = context.client.get_assignments(filters)
return StepResult(success=True)

class ConvertFormatStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "convert_format"

@property
def progress_weight(self) -> float:
return 0.6 # 60% of total progress

def execute(self, context: ExportContext) -> StepResult:
converted = []
for item in context.results:
converted.append(self._convert(item))
context.exported_count = len(converted)
return StepResult(success=True, data={"converted": converted})

def _convert(self, item: dict) -> dict:
return item

class SaveOutputStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "save_output"

@property
def progress_weight(self) -> float:
return 0.2 # 20% of total progress

def execute(self, context: ExportContext) -> StepResult:
context.output_path = "/tmp/export_output.json"
return StepResult(success=True)

def rollback(self, context: ExportContext, result: StepResult) -> None:
# Clean up output file on failure
if context.output_path:
Path(context.output_path).unlink(missing_ok=True)

# Register steps in action
class StepBasedExportAction(BaseExportAction[ExportParams]):
action_name = "step_export"
params_model = ExportParams

def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchDataStep())
registry.register(ConvertFormatStep())
registry.register(SaveOutputStep())

Step-Based Mode Benefits:

  • Automatic progress: Progress calculated from step weights
  • Automatic rollback: On failure, rollback() called in reverse order
  • Reusable steps: Share steps across multiple export actions
  • Testable units: Test each step independently

Export Targets

Export actions support three data targets.

Assignment

Export annotation work (labeling results, reviews, etc.).

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)

Ground Truth

Export curated ground truth datasets for ML training.

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
events = self.client.list_ground_truth_events(
params=filters,
list_all=True
)
return events, len(events)

Task

Export task metadata and configurations.

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_tasks(filters)

대량 데이터 무중단 export (Large-volume resilient export)

Export 는 한 프로젝트/데이터셋의 전체 task / assignment / ground-truth 를 빠짐없이 한 번 회수해야 한다. 데이터가 커질수록 단순 REST 순회는 (1) HTTP 왕복 수 증가, (2) cursor 열화, (3) payload 의 API 티어 통과, (4) 일시적 네트워크/DB 장애 취약 — 한 번의 페이지 실패로 export 전체가 중단된다. v2 export 는 이를 4가지 기법으로 완화한다.

1. 2-phase: list(slim) → bulk-fetch

slim list 로 id 만 모은 뒤(_collect_then_bulk 헬퍼) chunk(≤200) 단위 bulk-fetch 로 heavy payload 를 회수한다. list 응답에 무거운 data 를 싣지 않아 page 비용이 작다.

2. 유니크 keyset cursor — offset 열화 제거

list 정렬을 비유니크 -created 가 아니라 유니크 PK keyset (sort='-id') 으로 한다. DRF CursorPagination 은 cursor position 이 단일 필드라, 비유니크 정렬에서 동일 값 군집을 offset 으로 처리 → 깊은 페이지가 점진적으로 느려진다. 유니크 키로 정렬하면 매 페이지가 인덱스 seek (WHERE project=X AND id < cursor ORDER BY id DESC LIMIT N) 로 일정 비용을 유지한다. backend 에 (project, id) 복합 인덱스가 동반된다(synapse-backend).

  • Task / Assignment: sort='-id' 적용.
  • GroundTruth: ground_truth_events 가 이미 -pgh_id (event PK) keyset 이라 변경 불요.

3. presigned bulk — payload 를 API 가 아닌 object storage 에서

bulk 호출을 response_mode='presigned' 로 보내면 외재화된 data 가 presigned URL wrapper 로 돌아온다. 핸들러는 _resolve_data_payload() 로 이를 감지해 synapse_sdk.utils.file.download.download_json(presigned_url)S3 에서 직접 실제 payload 를 받는다.

  • 대용량 data 가 API 티어를 통과하지 않고, inline 한도(*_INLINE_MAX_BYTES) 로 잘리지 않는다.
  • 외재화 안 된 row 는 inline raw dict 로 폴백 — 판별은 data.get('mode') in {'presigned','inline'} 보수 가드. forward-compatible.

4. bounded retry — 일시적 실패에 무중단

Export list paginator pass-through 의 retry_on_status502/503/504 에 더해 500 을 포함하고 page_retries=3 를 보장한다(EXPORT_LIST_RETRY_ON_STATUS). 느린 쿼리 중 DB 연결 끊김성 500 / 게이트웨이 일시 오류에 export 가 죽지 않고 페이지 단위로 재시도한다. bulk-fetch 사이에는 throttle_seconds 로 back-pressure 를 유지한다(Task 0.2s 등). 전역 DEFAULT_RETRY_ON_STATUS 는 불변 — export 한정 정책이다.

적용 범위

page_retries / retry_on_status 는 export list paginator pass-through 로 전달된다(v2 transport 기본값에 더해 export 한정 정책). 다른 v2 consumer 동작은 영향받지 않는다.

한계와 향후 작업 (TODO)

위 기법들은 REST 를 유지한 점진적 개선으로 헤드룸을 확보하지만, 행 단위로 REST 를 통해 끌어오는 방식 자체는 요청 분량에 선형 비례하는 구조적 한계가 남는다. 근본 확장 해법은 별도 작업(익스포트 안정성/속도 개선)으로 추적한다:

  • (A안) 서버사이드 비동기 export-to-storage — backend AsyncJob 이 서버 keyset 스캔 한 번으로 전체를 raw bundle(JSONL + files + manifest)로 storage 에 기록 → SDK 는 enqueue + 진행률 스트리밍 후 bundle 을 읽어 공유 transform tail(ConvertData → SaveFiles → Finalize) 로 플러그인 산출물을 만든다. N 회 왕복 / 클라 타임아웃 / 연결 끊김 취약성 제거. 임계 초과 시 투명 위임으로 SYN-7104 P2 에서 1차 도입됨 — 아래 대량 export 의 서버사이드 위임 참고. (확장성 최종 해법)
  • (B안) 스트리밍 응답(NDJSON) — backend StreamingHttpResponse + 서버 keyset 단일 커서로 한 connection 에 전량 스트리밍 (중간 옵션).
  • (C안) GroundTruthDatasetVersion 스냅샷 사전 생성 — 불변 GTDV 의 export 아티팩트를 version release 시점에 미리 S3 에 생성 → train/export 는 다운로드만.
  • presigned 다운로드 병렬화 — 현재 _resolve_data_payload 는 per-item 직렬. 외재화 본격 도입 시 chunk 병렬 다운로드로 전환.
  • export read timeout / page_retries / throttle_secondsplugin config 노출 — 현재 핸들러 상수(EXPORT_*).

대량 export 의 서버사이드 위임 (Backend async-job)

SYN-7182 — 전 수량 delegate · in-process 경로 deprecated

[SYN-7182] 이제 모든 export 가 수량과 무관하게 서버사이드로 위임된다. 아래 이 절의 "count > threshold 초과 시에만 위임" 서술은 과거(임계 기반) 동작이며, 임계(export_async_job_size_threshold / SYNAPSE_EXPORT_ASYNC_JOB_SIZE_THRESHOLD / ExportParams.async_job_size_threshold)는 deprecated — no-op 다. 위임 게이트는 이제 ① 위임 enabled, ② v2_client 배선, ③ 위임 scope 해소 세 가지만 본다(건수 무관). count 는 분할 fan-out·progress 용으로 여전히 프로브되나 결정에 영향을 주지 않는다.

SYN-7214 FR-6 — ground_truth 는 dataset 기준으로 위임

위 ③ "위임 scope" 는 target 방식별로 다르다. task/assignmentproject_id(context → filter['project'])로 해소하고, ground_truthproject 가 아니라 ground_truth_dataset_version(s) 로 해소한다(GT 는 project 와 decouple — GroundTruth.project 는 nullable·derived). 즉 GT export 는 filter 에 ground_truth_dataset_version(GroundTruthDatasetVersion id 목록 — version 문자열 아님)이 있으면 project_id 없이도 위임된다. backend 는 GT 를 tenant + dataset-membership + 요청 version 으로 격리한다. (이전에는 GT 가 project_id 미해소로 항상 deprecated in-process 폴백했으나 그건 의도가 아닌 fragile 한 부작용이었다.)

비-delegate(in-process) 경로는 deprecated 다(향후 릴리즈에서 제거 예정). v2 list 표면이 선택 필터 ids/ids_ex/search 를 떨궈(전역 PrimaryKeyFilterBackend/SearchFilter 누락) 선택/검색 export 시 프로젝트 전량을 silent over-export 하는 정확성 결함이 있다. 따라서 in-process 는 위임 불가 상황(v2_client 미배선 / kill-switch export_disable_async_delegate / 위임 scope 미해소 — task/assignmentproject_id 부재 또는 ground_truth 의 dataset version scope 부재)에서만 폴백으로만 동작하며, 진입 시 DeprecationWarning + 워커 로그 경고를 사유와 함께 발생시킨다. 권장: 위임을 활성 상태로 유지하라. :::

위의 v2 2-phase 경로(list → bulk-fetch)는 SDK(ray job)가 v2 REST 로 데이터를 끌어와 in-process 로 변환한다. 10만 건 이상 + 원본 파일이 큰 export 에서는 이 끌어오기 자체가 비용·취약성의 핵심이다. SYN-7104 P2 는 이를 한 단계 더 끌어올려, 대상 건수가 임계를 넘으면 데이터 취득(acquisition)을 backend async-job 에 위임(서버사이드 keyset 스캔 → raw bundle 기록)한다. 단, 위임은 데이터를 어떻게 취득하는가의 차이일 뿐이며, 위임/비위임 양쪽 모두 동일한 공유 transform tail 을 거쳐 동일한 플러그인 산출물을 만든다(투명 위임 · FR-8 parity).

이 위임은 기존 v2 2-phase 경로 위에 얹히는 추가 레이어다 — 임계 미만 export 는 종전과 동일하게 in-process head(list → bulk-fetch)로 데이터를 취득하고, 임계 초과 export 는 delegated head 로 취득한다. 어느 경로든 취득 후에는 같은 공유 tail(ConvertData → SaveFiles → Finalize) 이 실행되므로 플러그인 커스텀 로직이 양쪽에서 동일하게 적용된다.

기본 ON · 투명 위임 (staging 검증 권장)

위임 게이트는 **기본 활성(ON)**이다. 위임은 데이터 취득 경로만 서버사이드로 바꿀 뿐, 산출물은 비위임 경로와 동일한 플러그인 포맷이다 — 위임 경로도 비위임과 같은 공유 transform tail(ConvertData → SaveFiles) 을 거치기 때문이다(아래 투명 acquisition 과 공유 tail 참고). backend 가 쓰는 raw bundle 은 위임 head 의 중간 산출물이며, SDK 가 이를 읽어(BundleRead) tail 로 전달한다. 즉시 in-process 취득으로 되돌리려면 아래 kill-switch 를 사용한다. 신규 위임 경로는 프로덕션 반영 전 staging 검증을 권장한다.

위임 게이트와 임계

setup_steps() 는 위임 여부로 분기하지 않는다 — 항상 단일 고정 9-step registry 를 등록한다(아래 단일 고정 step registry 참고). 위임 결정(_resolve_delegation_decision())은 run() 시점에 한 번 평가되어 context.delegation_decision 에 seed 되고, 각 head step 의 can_skip 이 이를 참조해 비활성 head 를 완료-pass 한다. 위임은 아래 조건을 모두 만족할 때만 발동한다.

조건설명
위임 enabledkill-switch 가 꺼져 있어야 한다(기본 ON).
v2_client 존재ctx.v2_client 가 wiring 되어 있어야 한다(Runtime v2 Client 참고).
count > threshold[SYN-7182 deprecated] 더 이상 게이트가 아니다 — 수량 무관하게 위임한다.
위임 scope 해소위임 enqueue 에 필요한 scope 가 있어야 한다 — task/assignmentproject_id, ground_truthground_truth_dataset_version(s)(GroundTruthDatasetVersion id). 둘 다 없으면 deprecated in-process 폴백.

건수 산정은 풀스캔 없이 v1 단일페이지 count 프로브로 한다 — 첫 페이지의 count 만 읽고, count 를 못 구하면(프로브 실패) 비위임으로 폴백한다. 임계 기본값은 1,000이며 다음으로 오버라이드한다.

항목ExportParams 필드extra_params / config 키env
임계 overrideasync_job_size_thresholdexport_async_job_size_thresholdSYNAPSE_EXPORT_ASYNC_JOB_SIZE_THRESHOLD
kill-switch (즉시 in-process 복귀)disable_async_delegateexport_disable_async_delegateSYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1

우선순위(높음 → 낮음): ExportParams 명시 필드 > params.extra_params 의 동일 키 > 액션 config > env > 모듈 기본(1,000). ExportParams.async_job_size_threshold / disable_async_delegate 는 모두 optional 이며 기본 None — 미지정 시 종전대로 env/기본값으로 폴백한다(BC). 즉 export config 만으로 env 변경 없이 위임 임계를 조정하거나 위임을 끌 수 있다.

Kill-switch — 즉시 종전 경로 복귀

SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1(env) · config export_disable_async_delegate · ExportParams.disable_async_delegate=True어느 하나라도 켜면 게이트가 즉시 닫히고, 모든 export 가 임계와 무관하게 기존 in-process 경로로 처리된다. 인시던트 시 PR revert 없이 롤백할 수 있다(v2 export 의 SYNAPSE_FORCE_V1_EXPORT kill-switch 와 동일한 운영 패턴). 운영자 env kill-switch 가 켜져 있으면 export config 로는 재활성할 수 없다(env 우선) — config/필드는 위임을 끌 수만 있고 다시 켜지는 못한다.

단일 고정 step registry (9-step)

setup_steps() 는 위임 여부로 분기하지 않고 항상 동일한 9개 step 을 등록한다. 종전의 "분기형 registry(위임 3-step OR 비위임 6-step)" 모델은 모드에 따라 job 의 step 목록 자체가 달라져, 긴 acquisition 이 전체 진행률을 정체시켰다. 단일 고정 registry 는 이를 제거한다 — registry 는 항상 9-step 이고, 위임 결정에 따라 비활성 head 의 각 step 만 can_skip 으로 완료-pass(skip → 100%, orchestrator 가 보고) 된다.

9-step 은 두 acquisition head + 공유 tail 로 구성된다:

  • In-process head: Initialize → FetchResults → PrepareExport — v2 list → bulk-fetch 로 데이터 취득.
  • Delegated head: DelegateEnqueue → DelegateStreamProgress → BundleRead — backend async-job enqueue → SSE 진행률 → raw bundle 읽기.
  • Shared tail (plugin transform): ConvertData → SaveFiles → Finalize두 head 가 공유하는 동일 step 클래스. 플러그인 커스텀 transform 이 위임/비위임 양쪽에서 동일하게 실행된다(FR-8 투명성).

고정 9-step — 비활성 head 의 step 은 can_skip(=is_delegated(context) 또는 그 역)으로 완료-pass:

영역Step역할위임 시비위임 시
in-process headInitializestorage/path 초기화.skip(pass)실행
in-process headFetchResultsv2 list → bulk-fetch 로 결과 회수.skip(pass)실행
in-process headPrepareExportexport params build · project config.skip(pass)실행
delegated headDelegateEnqueueplugin_exports.create 로 export job enqueue.실행skip(pass)
delegated headDelegateStreamProgressasync_jobs.stream_progress (SSE) 진행률 수신.실행skip(pass)
delegated headBundleReadraw bundle(JSONL+files+manifest) 읽기 + url→bundle_path rewrite.실행skip(pass)
shared tailConvertDataplugin 커스텀 transform.실행실행
shared tailSaveFiles로컬 파일 저장.실행실행
shared tailFinalize마무리 · cleanup.실행실행

투명 acquisition 과 공유 tail

위임/비위임은 "데이터를 어떻게 취득하는가"의 차이일 뿐이며 산출물은 동일하다(FR-8 parity). 핵심은 공유 transform tail 이다 — 플러그인이 template 기반으로 override 하는 convert_data 등 커스텀 처리는 ConvertData → SaveFiles → Finalize 에서 실행되며, 이 tail 은 위임/비위임 양쪽 모두 거친다. 따라서 위임 export 도 플러그인 커스텀 로직이 그대로 적용된다(이것이 분기형(Option A)이 아니라 통합 파이프라인(Option B)을 채택한 이유다).

비위임 (in-process head)위임 (delegated head)
데이터 취득 위치SDK(ray job) — v2 list → bulk-fetchbackend async-job — 서버 keyset 스캔 → raw bundle
취득 중간 산출물in-process iteratorraw bundle(JSONL + files + manifest), SDK 가 BundleRead 로 읽음
ConvertData (plugin transform)적용됨적용됨 (공유 tail)
SaveFiles (로컬 파일 저장)적용됨적용됨 (공유 tail)
최종 산출물plugin 포맷동일한 plugin 포맷

bundle-read 계약

위임 시 backend 는 {prefix}/{tasks|assignments|ground_truths}.jsonl 를 쓰고, 각 라인은 {id, data, files_manifest: {spec: {file_name_original, url, bundle_path}}} 형태다. SDK 의 BundleRead step 은 map_files_to_bundle_local 로 각 항목의 url{prefix}/{bundle_path} 로 rewrite(traversal-guard 적용)해 로컬 bundle 을 가리키게 하고, 출력 파일명은 file_name_original 을 유지한다. rewrite 된 항목은 공유 tail 의 ConvertData/SaveFiles 로 전달되어 비위임과 동일하게 처리된다.

관측성

delegated head step 도 in-process head 와 동등한 관측성을 남긴다 — 즉 위임이라고 해서 진행률·메트릭·이벤트 로그가 비는 일은 없다. (취득 이후의 공유 tail 관측성은 위임/비위임 공통이다.)

신호API비고
진행률set_progress(..., step='server_export')step 이름은 server_export.
메트릭set_metrics(...)success / failed.
job-log 이벤트export_info / export_completed / export_failed
user-facing 메시지log_message(...)EXPORT_DELEGATED_TO_SERVER / EXPORT_DELEGATED_COMPLETED.

역할 분담: backend 는 progress(processed/total)만 SSE 로 흘리고, event / metric / user-facing 메시지는 SDK 가 전담한다. 따라서 DelegateStreamProgress 는 SSE 진행률을 set_progress 로 중계하고, enqueue / bundle-read step 이 이벤트·메트릭·메시지를 마무리한다.

v2 client resource

delegated head 는 BackendV2Client 의 다음 resource 를 사용한다(상세 wiring 은 Runtime v2 Client 참고).

Resource용도
v2_client.plugin_exports.create(...)export job enqueue (DelegateEnqueue).
v2_client.async_jobs.stream_progress(job_id)SSE 진행률 스트림 (DelegateStreamProgress).
v2_client.async_jobs.retrieve(job_id)manifest → bundle prefix 확정, BundleRead 가 raw bundle 을 읽어 tail 로 전달.

Examples

Example: COCO Format Exporter

A complete example exporting annotations to COCO format.

from pydantic import BaseModel, Field
from synapse_sdk.plugins.actions.export import BaseExportAction
from typing import Any

class CocoExportParams(BaseModel):
project_id: int = Field(..., description="Project ID to export")
include_images: bool = Field(True, description="Include image metadata")

class CocoExportAction(BaseExportAction[CocoExportParams]):
action_name = "coco_export"
params_model = CocoExportParams

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)

def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results({
"project": self.params.project_id
})

self.set_progress(0, total, self.progress.DATASET_CONVERSION)

coco_data = {
"info": {"description": "Exported from Synapse"},
"images": [],
"annotations": [],
"categories": []
}

annotation_id = 0
category_map = {}

for i, assignment in enumerate(results, 1):
# Add image
data_unit = assignment.get("data_unit", {})
image_id = data_unit.get("id")

if self.params.include_images:
coco_data["images"].append({
"id": image_id,
"file_name": data_unit.get("name", ""),
"width": data_unit.get("width", 0),
"height": data_unit.get("height", 0)
})

# Add annotations
for ann in assignment.get("data", {}).get("annotations", []):
category = ann.get("category", "default")
if category not in category_map:
cat_id = len(category_map)
category_map[category] = cat_id
coco_data["categories"].append({
"id": cat_id,
"name": category
})

coco_data["annotations"].append({
"id": annotation_id,
"image_id": image_id,
"category_id": category_map[category],
"bbox": ann.get("bbox", []),
"area": ann.get("area", 0),
"iscrowd": 0
})
annotation_id += 1

self.set_progress(i, total, self.progress.DATASET_CONVERSION)

return {
"format": "coco",
"exported_count": len(coco_data["images"]),
"annotation_count": len(coco_data["annotations"]),
"data": coco_data
}

Example: Multi-Format Exporter with Steps

A flexible exporter supporting multiple output formats.

from enum import Enum
from pydantic import BaseModel, Field
from synapse_sdk.plugins.actions.export import BaseExportAction, ExportContext
from synapse_sdk.plugins.steps import BaseStep, StepResult, StepRegistry

class OutputFormat(str, Enum):
COCO = "coco"
YOLO = "yolo"
CSV = "csv"

class MultiFormatParams(BaseModel):
project_id: int
output_format: OutputFormat = OutputFormat.COCO
output_dir: str = Field("/tmp/export", description="Output directory")

class FetchStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "fetch"

@property
def progress_weight(self) -> float:
return 0.3

def execute(self, context: ExportContext) -> StepResult:
project_id = context.params["project_id"]
context.results, context.total_count = context.client.get_assignments({
"project": project_id
})
return StepResult(success=True)

class ConvertStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "convert"

@property
def progress_weight(self) -> float:
return 0.5

def execute(self, context: ExportContext) -> StepResult:
output_format = context.params["output_format"]

if output_format == OutputFormat.COCO:
converted = self._to_coco(context.results)
elif output_format == OutputFormat.YOLO:
converted = self._to_yolo(context.results)
else:
converted = self._to_csv(context.results)

context.exported_count = context.total_count
return StepResult(success=True, data={"output": converted})

def _to_coco(self, results) -> dict:
# COCO conversion logic
return {"format": "coco", "images": [], "annotations": []}

def _to_yolo(self, results) -> list:
# YOLO conversion logic
return []

def _to_csv(self, results) -> str:
# CSV conversion logic
return "id,label,x,y,width,height\n"

class SaveStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "save"

@property
def progress_weight(self) -> float:
return 0.2

def execute(self, context: ExportContext) -> StepResult:
from pathlib import Path
import json

output_dir = Path(context.params["output_dir"])
output_dir.mkdir(parents=True, exist_ok=True)

output_format = context.params["output_format"]
output_data = context.step_results[-1].data["output"]

if output_format in (OutputFormat.COCO,):
output_path = output_dir / "annotations.json"
output_path.write_text(json.dumps(output_data, indent=2))
else:
output_path = output_dir / f"output.{output_format}"
output_path.write_text(str(output_data))

context.output_path = str(output_path)
return StepResult(success=True)

class MultiFormatExportAction(BaseExportAction[MultiFormatParams]):
action_name = "multi_format_export"
params_model = MultiFormatParams

def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchStep())
registry.register(ConvertStep())
registry.register(SaveStep())

Best Practices

Memory-Efficient Processing

Use generators for large datasets to avoid loading everything into memory.

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
# Return a generator, not a list
results = self.client.get_assignments_stream(filters)
count = self.client.count_assignments(filters)
return results, count

def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results(self.params.filters)

for i, item in enumerate(results, 1): # Iterate without loading all
self._process_item(item)
self.set_progress(i, total, self.progress.DATASET_CONVERSION)

Progress Reporting

Report progress at meaningful intervals, not on every item.

def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results(filters)
report_interval = max(1, total // 100) # Report every 1%

for i, item in enumerate(results, 1):
self._process(item)
if i % report_interval == 0 or i == total:
self.set_progress(i, total, self.progress.DATASET_CONVERSION)

Error Recovery

Handle partial failures gracefully in step-based workflows.

class ConvertStep(BaseStep[ExportContext]):
def execute(self, context: ExportContext) -> StepResult:
converted = []
errors = []

for item in context.results:
try:
converted.append(self._convert(item))
except ValueError as e:
errors.append(f"Item {item['id']}: {e}")

context.exported_count = len(converted)

if errors:
context.errors.extend(errors)
# Continue with partial results
return StepResult(
success=True,
data={"converted": converted, "errors": errors}
)

return StepResult(success=True, data={"converted": converted})

Warning: Always validate output format compliance before returning results. Invalid exports can cause downstream failures in ML pipelines.

Template-Based Export with BaseExporter

For plugin developers who want a simpler, template-based approach to building exporters, BaseExporter provides a familiar interface with pre-built file handling utilities.

Overview

BaseExporter is designed for export plugins that need:

  • Original file downloading and saving
  • JSON data export with error tracking
  • Progress and metrics reporting
  • Customizable data conversion pipeline

BaseExporter Class

from synapse_sdk.plugins.actions.export import BaseExporter

class BaseExporter:
def __init__(
self,
ctx: RuntimeContext,
export_items: Generator,
path_root: Path,
**params
):
self.ctx = ctx
self.export_items = export_items
self.path_root = Path(path_root)
self.params = params
self.run = ExporterRunAdapter(ctx.logger) # Legacy compatibility

Template Methods

Override these methods to customize export behavior:

MethodDescription
convert_data(data)Transform data during export
before_convert(data)Pre-process data before conversion
after_convert(data)Post-process data after conversion
save_original_file(result, base_path, error_list)Save original files
save_as_json(result, base_path, error_list)Save data as JSON
setup_output_directories(path, save_original)Customize directory structure
process_file_saving(...)Custom file saving logic
additional_file_saving(path)Post-export file operations

ExporterRunAdapter

The run attribute provides logging and progress tracking methods:

# Log messages
self.run.log_message("Processing item...")

# Track progress
self.run.set_progress(current, total, step="dataset_conversion")

# Log metrics
record = self.run.MetricsRecord(stand_by=100, success=0, failed=0)
self.run.log_metrics(record, category="original_file")

# Log file export status
self.run.export_log_original_file(item_id, file_info, ExportStatus.SUCCESS, "")
self.run.export_log_data_file(item_id, file_info, ExportStatus.SUCCESS, "")

MetricsRecord

Track export progress with MetricsRecord:

from synapse_sdk.plugins.actions.export import MetricsRecord

record = MetricsRecord(stand_by=100, success=0, failed=0)

# Update as items are processed
record.stand_by -= 1
record.success += 1

# Log metrics
self.run.log_metrics(record, category="data_file")

# Convert to dict
print(record.to_dict()) # {'stand_by': 99, 'success': 1, 'failed': 0}

ExportStatus Enum

Track file export status:

from synapse_sdk.plugins.actions.export import ExportStatus

ExportStatus.SUCCESS # 'success'
ExportStatus.FAILED # 'failed'
ExportStatus.STAND_BY # 'stand_by'

Example: Custom Exporter Plugin

A complete example using BaseExporter with ExportAction:

from pathlib import Path
from typing import Any, Generator

from synapse_sdk.plugins.actions.export import BaseExporter, ExportAction

class MyExporter(BaseExporter):
"""Custom exporter with data transformation."""

def convert_data(self, data: dict[str, Any]) -> dict[str, Any]:
"""Transform annotation data to custom format."""
return {
"id": data.get("id"),
"filename": data.get("files", {}).get("file_name_original"),
"annotations": self._transform_annotations(data.get("data", {}))
}

def _transform_annotations(self, data: dict) -> list:
# Custom transformation logic
return data.get("annotations", [])

def additional_file_saving(self, unique_export_path: Path) -> None:
"""Save metadata file after export."""
import json
metadata = {
"export_name": self.params.get("name"),
"total_items": self.params.get("count"),
"format_version": "1.0"
}
with (unique_export_path / "metadata.json").open("w") as f:
json.dump(metadata, f, indent=2)


class MyExportAction(ExportAction):
"""Export action using custom exporter."""

action_name = "my_export"

@property
def entrypoint(self):
return MyExporter

Plugin Configuration

Configure the action in config.yaml:

name: my_export_plugin
code: my_export_plugin
version: 1.0.0
category: export

actions:
export:
entrypoint: plugin.export.MyExportAction
annotation_types:
- image
- video

Running Locally

Test your export plugin:

synapse plugin run --mode local export --params '{
"storage": 1,
"name": "My Export",
"save_original_file": true,
"path": "exports/my-export",
"target": "task",
"filter": {"project": 123},
"extra_params": {}
}'

Target Handlers

ExportAction uses target handlers to fetch data from different sources.

TargetHandlerFactory

from synapse_sdk.plugins.actions.export import TargetHandlerFactory

# Get handler for target type
handler = TargetHandlerFactory.get_handler("assignment")
handler = TargetHandlerFactory.get_handler("ground_truth")
handler = TargetHandlerFactory.get_handler("task")

Available Handlers

HandlerTargetDescription
AssignmentExportTargetHandlerassignmentExport annotation assignments
GroundTruthExportTargetHandlerground_truthExport ground truth datasets
TaskExportTargetHandlertaskExport task data

Custom Target Handler

Implement ExportTargetHandler for custom data sources:

from synapse_sdk.plugins.actions.export import ExportTargetHandler

class CustomTargetHandler(ExportTargetHandler):
def get_results(self, client, filters: dict) -> tuple[Any, int]:
# Fetch data from custom source
results = client.custom_api_call(filters)
return iter(results), len(results)

def validate_filter(self, filters: dict, client) -> dict:
# Validate filter parameters
if "required_field" not in filters:
raise ValueError("required_field is required")
return filters

def get_export_item(self, results) -> Generator:
# Transform results for export
for item in results:
yield self._transform(item)