Export Actions
Export actions transform annotation data into various formats for external use. Use BaseExportAction to build custom exporters that convert assignments, ground truth datasets, or tasks into formats like COCO, YOLO, or custom schemas.
Overview
Export actions provide a structured way to:
- Query filtered annotation results from the backend
- Transform data into target formats (COCO, YOLO, Pascal VOC, CSV, etc.)
- Track export progress with built-in progress reporting
- Handle large datasets efficiently using generators
At a Glance
| Property | Value |
|---|---|
| Base Class | BaseExportAction[P] |
| Category | PluginCategory.EXPORT |
| Progress Step | DATASET_CONVERSION |
| Context | ExportContext |
| Execution Modes | Simple / Step-Based |
| Large-volume path | Single fixed 9-step registry — in-process or server-side delegated acquisition head + shared transform tail (auto, threshold-gated — see 서버사이드 위임) |
BaseExportAction
BaseExportAction extends BaseAction with export-specific functionality. It provides backend client access and progress tracking out of the box.
from synapse_sdk.plugins.action import BaseAction
from synapse_sdk.plugins.enums import PluginCategory
class BaseExportAction(BaseAction[P]):
category = PluginCategory.EXPORT
The generic type P represents your params model, which must inherit from BaseModel.
Progress Categories
Export actions support standardized progress tracking:
| Category | Constant | Description |
|---|---|---|
| Dataset Conversion | DATASET_CONVERSION | Data transformation and file generation phase |
# Track conversion progress
self.set_progress(current, total, self.progress.DATASET_CONVERSION)
Key Methods
client Property
Access the backend client for API calls. Raises RuntimeError if no client exists in the context.
# Access backend client
assignments = self.client.get_assignments(filters)
Good to know: If you don't need the backend client, override
get_filtered_results()to fetch data from alternative sources.
get_filtered_results()
Override this abstract method to fetch data for export. Returns a tuple of (results_iterator, total_count).
def get_filtered_results(self, filters: dict[str, Any]) -> tuple[Any, int]:
"""Fetch filtered results for export.
Args:
filters: Filter criteria dict.
Returns:
Tuple of (results_iterator, total_count).
"""
return self.client.get_assignments(filters)
setup_steps()
Register workflow steps for step-based execution. If steps are registered, step-based execution takes precedence over execute().
def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchDataStep())
registry.register(ConvertFormatStep())
registry.register(SaveOutputStep())
create_context()
Create the export context for step-based workflows. Override to customize context initialization.
def create_context(self) -> ExportContext:
params_dict = self.params.model_dump()
return ExportContext(
runtime_ctx=self.ctx,
params=params_dict,
)
ExportContext
ExportContext carries shared state between workflow steps. It extends BaseStepContext with export-specific fields.
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from synapse_sdk.plugins.steps import BaseStepContext
if TYPE_CHECKING:
from synapse_sdk.clients.backend import BackendClient
@dataclass
class ExportContext(BaseStepContext):
# Input parameters
params: dict[str, Any] = field(default_factory=dict)
# Processing state (populated by steps)
results: Any | None = None
total_count: int = 0
exported_count: int = 0
output_path: str | None = None
@property
def client(self) -> BackendClient:
"""Backend client from runtime context."""
if self.runtime_ctx.client is None:
raise RuntimeError('No client in runtime context')
return self.runtime_ctx.client
Fields
| Field | Type | Description |
|---|---|---|
params | dict[str, Any] | Export parameters from action |
results | Any | None | Fetched data (populated by steps) |
total_count | int | Total items to export |
exported_count | int | Successfully exported items |
output_path | str | None | Output file/directory path |
Properties
| Property | Type | Description |
|---|---|---|
client | BackendClient | Backend client from runtime context. Raises RuntimeError if no client. |
Inherited Fields
From BaseStepContext:
| Field | Type | Description |
|---|---|---|
runtime_ctx | RuntimeContext | Runtime context with logger, client, env |
step_results | list[StepResult] | Results from completed steps |
errors | list[str] | Accumulated error messages |
current_step | str | None | Currently executing step name |
StepResult
StepResult is a dataclass representing the result of a workflow step execution.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass
class StepResult:
success: bool = True
data: dict[str, Any] = field(default_factory=dict)
error: str | None = None
rollback_data: dict[str, Any] = field(default_factory=dict)
skipped: bool = False
timestamp: datetime = field(default_factory=datetime.now)
| Field | Type | Description |
|---|---|---|
success | bool | Whether the step completed successfully (default: True) |
data | dict[str, Any] | Output data from the step |
error | str | None | Error message if step failed |
rollback_data | dict[str, Any] | Data needed for rollback on failure |
skipped | bool | Whether the step was skipped (default: False) |
timestamp | datetime | When the step completed |
Execution Modes
Choose between two execution modes based on complexity.
Simple Mode
Override execute() directly for straightforward export logic.
from typing import Any
from pydantic import BaseModel
from synapse_sdk.plugins.actions.export import BaseExportAction
class ExportParams(BaseModel):
project_id: int
output_format: str = "coco"
class SimpleExportAction(BaseExportAction[ExportParams]):
action_name = "simple_export"
params_model = ExportParams
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)
def execute(self) -> dict[str, Any]:
# Fetch data
results, total = self.get_filtered_results({
"project": self.params.project_id
})
# Initialize progress
self.set_progress(0, total, self.progress.DATASET_CONVERSION)
# Process items
exported = []
for i, item in enumerate(results, 1):
exported.append(self._convert_item(item))
self.set_progress(i, total, self.progress.DATASET_CONVERSION)
return {
"format": self.params.output_format,
"exported_count": len(exported),
"data": exported
}
def _convert_item(self, item: dict) -> dict:
# Conversion logic here
return item
Tip: Use Simple Mode when your export logic fits in a single method without complex error recovery needs.
Step-Based Mode
Use setup_steps() for complex workflows with multiple phases, progress tracking, and automatic rollback on failure.
from pathlib import Path
from synapse_sdk.plugins.actions.export import BaseExportAction, ExportContext
from synapse_sdk.plugins.steps import BaseStep, StepResult, StepRegistry
# Define workflow steps
class FetchDataStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "fetch_data"
@property
def progress_weight(self) -> float:
return 0.2 # 20% of total progress
def execute(self, context: ExportContext) -> StepResult:
filters = context.params.get("filters", {})
context.results, context.total_count = context.client.get_assignments(filters)
return StepResult(success=True)
class ConvertFormatStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "convert_format"
@property
def progress_weight(self) -> float:
return 0.6 # 60% of total progress
def execute(self, context: ExportContext) -> StepResult:
converted = []
for item in context.results:
converted.append(self._convert(item))
context.exported_count = len(converted)
return StepResult(success=True, data={"converted": converted})
def _convert(self, item: dict) -> dict:
return item
class SaveOutputStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "save_output"
@property
def progress_weight(self) -> float:
return 0.2 # 20% of total progress
def execute(self, context: ExportContext) -> StepResult:
context.output_path = "/tmp/export_output.json"
return StepResult(success=True)
def rollback(self, context: ExportContext, result: StepResult) -> None:
# Clean up output file on failure
if context.output_path:
Path(context.output_path).unlink(missing_ok=True)
# Register steps in action
class StepBasedExportAction(BaseExportAction[ExportParams]):
action_name = "step_export"
params_model = ExportParams
def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchDataStep())
registry.register(ConvertFormatStep())
registry.register(SaveOutputStep())
Step-Based Mode Benefits:
- Automatic progress: Progress calculated from step weights
- Automatic rollback: On failure,
rollback()called in reverse order - Reusable steps: Share steps across multiple export actions
- Testable units: Test each step independently
Export Targets
Export actions support three data targets.
Assignment
Export annotation work (labeling results, reviews, etc.).
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)
Ground Truth
Export curated ground truth datasets for ML training.
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
events = self.client.list_ground_truth_events(
params=filters,
list_all=True
)
return events, len(events)
Task
Export task metadata and configurations.
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_tasks(filters)
대량 데이터 무중단 export (Large-volume resilient export)
Export 는 한 프로젝트/데이터셋의 전체 task / assignment / ground-truth 를 빠짐없이 한 번 회수해야 한다. 데이터가 커질수록 단순 REST 순회는 (1) HTTP 왕복 수 증가, (2) cursor 열화, (3) payload 의 API 티어 통과, (4) 일시적 네트워크/DB 장애 취약 — 한 번의 페이지 실패로 export 전체가 중단된다. v2 export 는 이를 4가지 기법으로 완화한다.
1. 2-phase: list(slim) → bulk-fetch
slim list 로 id 만 모은 뒤(_collect_then_bulk 헬퍼) chunk(≤200) 단위 bulk-fetch 로 heavy payload 를 회수한다. list 응답에 무거운 data 를 싣지 않아 page 비용이 작다.
2. 유니크 keyset cursor — offset 열화 제거
list 정렬을 비유니크 -created 가 아니라 유니크 PK keyset (sort='-id') 으로 한다. DRF CursorPagination 은 cursor position 이 단일 필드라, 비유니크 정렬에서 동일 값 군집을 offset 으로 처리 → 깊은 페이지가 점진적으로 느려진다. 유니크 키로 정렬하면 매 페이지가 인덱스 seek (WHERE project=X AND id < cursor ORDER BY id DESC LIMIT N) 로 일정 비용을 유지한다. backend 에 (project, id) 복합 인덱스가 동반된다(synapse-backend).
- Task / Assignment:
sort='-id'적용. - GroundTruth:
ground_truth_events가 이미-pgh_id(event PK) keyset 이라 변경 불요.
3. presigned bulk — payload 를 API 가 아닌 object storage 에서
bulk 호출을 response_mode='presigned' 로 보내면 외재화된 data 가 presigned URL wrapper 로 돌아온다. 핸들러는 _resolve_data_payload() 로 이를 감지해 synapse_sdk.utils.file.download.download_json(presigned_url) 로 S3 에서 직접 실제 payload 를 받는다.
- 대용량
data가 API 티어를 통과하지 않고, inline 한도(*_INLINE_MAX_BYTES) 로 잘리지 않는다. - 외재화 안 된 row 는 inline raw dict 로 폴백 — 판별은
data.get('mode') in {'presigned','inline'}보수 가드. forward-compatible.
4. bounded retry — 일시적 실패에 무중단
Export list paginator pass-through 의 retry_on_status 에 502/503/504 에 더해 500 을 포함하고 page_retries=3 를 보장한다(EXPORT_LIST_RETRY_ON_STATUS). 느린 쿼리 중 DB 연결 끊김성 500 / 게이트웨이 일시 오류에 export 가 죽지 않고 페이지 단위로 재시도한다. bulk-fetch 사이에는 throttle_seconds 로 back-pressure 를 유지한다(Task 0.2s 등). 전역 DEFAULT_RETRY_ON_STATUS 는 불변 — export 한정 정책이다.
page_retries / retry_on_status 는 export list paginator pass-through 로 전달된다(v2 transport 기본값에 더해 export 한정 정책). 다른 v2 consumer 동작은 영향받지 않는다.
한계와 향후 작업 (TODO)
위 기법들은 REST 를 유지한 점진적 개선으로 헤드룸을 확보하지만, 행 단위로 REST 를 통해 끌어오는 방식 자체는 요청 분량에 선형 비례하는 구조적 한계가 남는다. 근본 확장 해법은 별도 작업(익스포트 안정성/속도 개선)으로 추적한다:
- (A안) 서버사이드 비동기 export-to-storage — backend AsyncJob 이 서버 keyset 스캔 한 번으로 전체를 raw bundle(JSONL + files + manifest)로 storage 에 기록 → SDK 는 enqueue + 진행률 스트리밍 후 bundle 을 읽어 공유 transform tail(ConvertData → SaveFiles → Finalize) 로 플러그인 산출물을 만든다. N 회 왕복 / 클라 타임아웃 / 연결 끊김 취약성 제거. 임계 초과 시 투명 위임으로 SYN-7104 P2 에서 1차 도입됨 — 아래 대량 export 의 서버사이드 위임 참고. (확장성 최종 해법)
- (B안) 스트리밍 응답(NDJSON) — backend
StreamingHttpResponse+ 서버 keyset 단일 커서로 한 connection 에 전량 스트리밍 (중간 옵션). - (C안) GroundTruthDatasetVersion 스냅샷 사전 생성 — 불변 GTDV 의 export 아티팩트를 version release 시점에 미리 S3 에 생성 → train/export 는 다운로드만.
- presigned 다운로드 병렬화 — 현재
_resolve_data_payload는 per-item 직렬. 외재화 본격 도입 시 chunk 병렬 다운로드로 전환. - export read timeout /
page_retries/throttle_seconds를 plugin config 노출 — 현재 핸들러 상수(EXPORT_*).
대량 export 의 서버사이드 위임 (Backend async-job)
[SYN-7182] 이제 모든 export 가 수량과 무관하게 서버사이드로 위임된다. 아래 이 절의 "count > threshold 초과 시에만 위임" 서술은 과거(임계 기반) 동작이며, 임계(export_async_job_size_threshold / SYNAPSE_EXPORT_ASYNC_JOB_SIZE_THRESHOLD / ExportParams.async_job_size_threshold)는 deprecated — no-op 다. 위임 게이트는 이제 ① 위임 enabled, ② v2_client 배선, ③ 위임 scope 해소 세 가지만 본다(건수 무관). count 는 분할 fan-out·progress 용으로 여전히 프로브되나 결정에 영향을 주지 않는다.
ground_truth 는 dataset 기준으로 위임위 ③ "위임 scope" 는 target 방식별로 다르다. task/assignment 는 project_id(context → filter['project'])로 해소하고, ground_truth 는 project 가 아니라 ground_truth_dataset_version(s) 로 해소한다(GT 는 project 와 decouple — GroundTruth.project 는 nullable·derived). 즉 GT export 는 filter 에 ground_truth_dataset_version(GroundTruthDatasetVersion id 목록 — version 문자열 아님)이 있으면 project_id 없이도 위임된다. backend 는 GT 를 tenant + dataset-membership + 요청 version 으로 격리한다. (이전에는 GT 가 project_id 미해소로 항상 deprecated in-process 폴백했으나 그건 의도가 아닌 fragile 한 부작용이었다.)
비-delegate(in-process) 경로는 deprecated 다(향후 릴리즈에서 제거 예정). v2 list 표면이 선택 필터 ids/ids_ex/search 를 떨궈(전역 PrimaryKeyFilterBackend/SearchFilter 누락) 선택/검색 export 시 프로젝트 전량을 silent over-export 하는 정확성 결함이 있다. 따라서 in-process 는 위임 불가 상황(v2_client 미배선 / kill-switch export_disable_async_delegate / 위임 scope 미해소 — task/assignment 의 project_id 부재 또는 ground_truth 의 dataset version scope 부재)에서만 폴백으로만 동작하며, 진입 시 DeprecationWarning + 워커 로그 경고를 사유와 함께 발생시킨다. 권장: 위임을 활성 상태로 유지하라.
:::
위의 v2 2-phase 경로(list → bulk-fetch)는 SDK(ray job)가 v2 REST 로 데이터를 끌어와 in-process 로 변환한다. 10만 건 이상 + 원본 파일이 큰 export 에서는 이 끌어오기 자체가 비용·취약성의 핵심이다. SYN-7104 P2 는 이를 한 단계 더 끌어올려, 대상 건수가 임계를 넘으면 데이터 취득(acquisition)을 backend async-job 에 위임(서버사이드 keyset 스캔 → raw bundle 기록)한다. 단, 위임은 데이터를 어떻게 취득하는가의 차이일 뿐이며, 위임/비위임 양쪽 모두 동일한 공유 transform tail 을 거쳐 동일한 플러그인 산출물을 만든다(투명 위임 · FR-8 parity).
이 위임은 기존 v2 2-phase 경로 위에 얹히는 추가 레이어다 — 임계 미만 export 는 종전과 동일하게 in-process head(list → bulk-fetch)로 데이터를 취득하고, 임계 초과 export 는 delegated head 로 취득한다. 어느 경로든 취득 후에는 같은 공유 tail(ConvertData → SaveFiles → Finalize) 이 실행되므로 플러그인 커스텀 로직이 양쪽에서 동일하게 적용된다.
위임 게이트는 **기본 활성(ON)**이다. 위임은 데이터 취득 경로만 서버사이드로 바꿀 뿐, 산출물은 비위임 경로와 동일한 플러그인 포맷이다 — 위임 경로도 비위임과 같은 공유 transform tail(ConvertData → SaveFiles) 을 거치기 때문이다(아래 투명 acquisition 과 공유 tail 참고). backend 가 쓰는 raw bundle 은 위임 head 의 중간 산출물이며, SDK 가 이를 읽어(BundleRead) tail 로 전달한다. 즉시 in-process 취득으로 되돌리려면 아래 kill-switch 를 사용한다. 신규 위임 경로는 프로덕션 반영 전 staging 검증을 권장한다.
위임 게이트와 임계
setup_steps() 는 위임 여부로 분기하지 않는다 — 항상 단일 고정 9-step registry 를 등록한다(아래 단일 고정 step registry 참고). 위임 결정(_resolve_delegation_decision())은 run() 시점에 한 번 평가되어 context.delegation_decision 에 seed 되고, 각 head step 의 can_skip 이 이를 참조해 비활성 head 를 완료-pass 한다. 위임은 아래 조건을 모두 만족할 때만 발동한다.
| 조건 | 설명 |
|---|---|
| 위임 enabled | kill-switch 가 꺼져 있어야 한다(기본 ON). |
v2_client 존재 | ctx.v2_client 가 wiring 되어 있어야 한다(Runtime v2 Client 참고). |
count > threshold | [SYN-7182 deprecated] 더 이상 게이트가 아니다 — 수량 무관하게 위임한다. |
| 위임 scope 해소 | 위임 enqueue 에 필요한 scope 가 있어야 한다 — task/assignment 는 project_id, ground_truth 는 ground_truth_dataset_version(s)(GroundTruthDatasetVersion id). 둘 다 없으면 deprecated in-process 폴백. |
건수 산정은 풀스캔 없이 v1 단일페이지 count 프로브로 한다 — 첫 페이지의 count 만 읽고, count 를 못 구하면(프로브 실패) 비위임으로 폴백한다. 임계 기본값은 1,000이며 다음으로 오버라이드한다.
| 항목 | ExportParams 필드 | extra_params / config 키 | env |
|---|---|---|---|
| 임계 override | async_job_size_threshold | export_async_job_size_threshold | SYNAPSE_EXPORT_ASYNC_JOB_SIZE_THRESHOLD |
| kill-switch (즉시 in-process 복귀) | disable_async_delegate | export_disable_async_delegate | SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1 |
우선순위(높음 → 낮음): ExportParams 명시 필드 > params.extra_params 의 동일 키 > 액션 config > env > 모듈 기본(1,000). ExportParams.async_job_size_threshold / disable_async_delegate 는 모두 optional 이며 기본 None — 미지정 시 종전대로 env/기본값으로 폴백한다(BC). 즉 export config 만으로 env 변경 없이 위임 임계를 조정하거나 위임을 끌 수 있다.
SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1(env) · config export_disable_async_delegate · ExportParams.disable_async_delegate=True 중 어느 하나라도 켜면 게이트가 즉시 닫히고, 모든 export 가 임계와 무관하게 기존 in-process 경로로 처리된다. 인시던트 시 PR revert 없이 롤백할 수 있다(v2 export 의 SYNAPSE_FORCE_V1_EXPORT kill-switch 와 동일한 운영 패턴). 운영자 env kill-switch 가 켜져 있으면 export config 로는 재활성할 수 없다(env 우선) — config/필드는 위임을 끌 수만 있고 다시 켜지는 못한다.
단일 고정 step registry (9-step)
setup_steps() 는 위임 여부로 분기하지 않고 항상 동일한 9개 step 을 등록한다. 종전의 "분기형 registry(위임 3-step OR 비위임 6-step)" 모델은 모드에 따라 job 의 step 목록 자체가 달라져, 긴 acquisition 이 전체 진행률을 정체시켰다. 단일 고정 registry 는 이를 제거한다 — registry 는 항상 9-step 이고, 위임 결정에 따라 비활성 head 의 각 step 만 can_skip 으로 완료-pass(skip → 100%, orchestrator 가 보고) 된다.
9-step 은 두 acquisition head + 공유 tail 로 구성된다:
- In-process head:
Initialize → FetchResults → PrepareExport— v2 list → bulk-fetch 로 데이터 취득. - Delegated head:
DelegateEnqueue → DelegateStreamProgress → BundleRead— backend async-job enqueue → SSE 진행률 → raw bundle 읽기. - Shared tail (plugin transform):
ConvertData → SaveFiles → Finalize— 두 head 가 공유하는 동일 step 클래스. 플러그인 커스텀 transform 이 위임/비위임 양쪽에서 동일하게 실행된다(FR-8 투명성).
고정 9-step — 비활성 head 의 step 은 can_skip(=is_delegated(context) 또는 그 역)으로 완료-pass:
| 영역 | Step | 역할 | 위임 시 | 비위임 시 |
|---|---|---|---|---|
| in-process head | Initialize | storage/path 초기화. | skip(pass) | 실행 |
| in-process head | FetchResults | v2 list → bulk-fetch 로 결과 회수. | skip(pass) | 실행 |
| in-process head | PrepareExport | export params build · project config. | skip(pass) | 실행 |
| delegated head | DelegateEnqueue | plugin_exports.create 로 export job enqueue. | 실행 | skip(pass) |
| delegated head | DelegateStreamProgress | async_jobs.stream_progress (SSE) 진행률 수신. | 실행 | skip(pass) |
| delegated head | BundleRead | raw bundle(JSONL+files+manifest) 읽기 + url→bundle_path rewrite. | 실행 | skip(pass) |
| shared tail | ConvertData | plugin 커스텀 transform. | 실행 | 실행 |
| shared tail | SaveFiles | 로컬 파일 저장. | 실행 | 실행 |
| shared tail | Finalize | 마무리 · cleanup. | 실행 | 실행 |
투명 acquisition 과 공유 tail
위임/비위임은 "데이터를 어떻게 취득하는가"의 차이일 뿐이며 산출물은 동일하다(FR-8 parity). 핵심은 공유 transform tail 이다 — 플러그인이 template 기반으로 override 하는 convert_data 등 커스텀 처리는 ConvertData → SaveFiles → Finalize 에서 실행되며, 이 tail 은 위임/비위임 양쪽 모두 거친다. 따라서 위임 export 도 플러그인 커스텀 로직이 그대로 적용된다(이것이 분기형(Option A)이 아니라 통합 파이프라인(Option B)을 채택한 이유다).
| 비위임 (in-process head) | 위임 (delegated head) | |
|---|---|---|
| 데이터 취득 위치 | SDK(ray job) — v2 list → bulk-fetch | backend async-job — 서버 keyset 스캔 → raw bundle |
| 취득 중간 산출물 | in-process iterator | raw bundle(JSONL + files + manifest), SDK 가 BundleRead 로 읽음 |
ConvertData (plugin transform) | 적용됨 | 적용됨 (공유 tail) |
SaveFiles (로컬 파일 저장) | 적용됨 | 적용됨 (공유 tail) |
| 최종 산출물 | plugin 포맷 | 동일한 plugin 포맷 |
bundle-read 계약
위임 시 backend 는 {prefix}/{tasks|assignments|ground_truths}.jsonl 를 쓰고, 각 라인은 {id, data, files_manifest: {spec: {file_name_original, url, bundle_path}}} 형태다. SDK 의 BundleRead step 은 map_files_to_bundle_local 로 각 항목의 url 을 {prefix}/{bundle_path} 로 rewrite(traversal-guard 적용)해 로컬 bundle 을 가리키게 하고, 출력 파일명은 file_name_original 을 유지한다. rewrite 된 항목은 공유 tail 의 ConvertData/SaveFiles 로 전달되어 비위임과 동일하게 처리된다.
관측성
delegated head step 도 in-process head 와 동등한 관측성을 남긴다 — 즉 위임이라고 해서 진행률·메트릭·이벤트 로그가 비는 일은 없다. (취득 이후의 공유 tail 관측성은 위임/비위임 공통이다.)
| 신호 | API | 비고 |
|---|---|---|
| 진행률 | set_progress(..., step='server_export') | step 이름은 server_export. |
| 메트릭 | set_metrics(...) | success / failed. |
| job-log 이벤트 | export_info / export_completed / export_failed | |
| user-facing 메시지 | log_message(...) | EXPORT_DELEGATED_TO_SERVER / EXPORT_DELEGATED_COMPLETED. |
역할 분담: backend 는 progress(processed/total)만 SSE 로 흘리고, event / metric / user-facing 메시지는 SDK 가 전담한다. 따라서 DelegateStreamProgress 는 SSE 진행률을 set_progress 로 중계하고, enqueue / bundle-read step 이 이벤트·메트릭·메시지를 마무리한다.
v2 client resource
delegated head 는 BackendV2Client 의 다음 resource 를 사용한다(상세 wiring 은 Runtime v2 Client 참고).
| Resource | 용도 |
|---|---|
v2_client.plugin_exports.create(...) | export job enqueue (DelegateEnqueue). |
v2_client.async_jobs.stream_progress(job_id) | SSE 진행률 스트림 (DelegateStreamProgress). |
v2_client.async_jobs.retrieve(job_id) | manifest → bundle prefix 확정, BundleRead 가 raw bundle 을 읽어 tail 로 전달. |
Examples
Example: COCO Format Exporter
A complete example exporting annotations to COCO format.
from pydantic import BaseModel, Field
from synapse_sdk.plugins.actions.export import BaseExportAction
from typing import Any
class CocoExportParams(BaseModel):
project_id: int = Field(..., description="Project ID to export")
include_images: bool = Field(True, description="Include image metadata")
class CocoExportAction(BaseExportAction[CocoExportParams]):
action_name = "coco_export"
params_model = CocoExportParams
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)
def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results({
"project": self.params.project_id
})
self.set_progress(0, total, self.progress.DATASET_CONVERSION)
coco_data = {
"info": {"description": "Exported from Synapse"},
"images": [],
"annotations": [],
"categories": []
}
annotation_id = 0
category_map = {}
for i, assignment in enumerate(results, 1):
# Add image
data_unit = assignment.get("data_unit", {})
image_id = data_unit.get("id")
if self.params.include_images:
coco_data["images"].append({
"id": image_id,
"file_name": data_unit.get("name", ""),
"width": data_unit.get("width", 0),
"height": data_unit.get("height", 0)
})
# Add annotations
for ann in assignment.get("data", {}).get("annotations", []):
category = ann.get("category", "default")
if category not in category_map:
cat_id = len(category_map)
category_map[category] = cat_id
coco_data["categories"].append({
"id": cat_id,
"name": category
})
coco_data["annotations"].append({
"id": annotation_id,
"image_id": image_id,
"category_id": category_map[category],
"bbox": ann.get("bbox", []),
"area": ann.get("area", 0),
"iscrowd": 0
})
annotation_id += 1
self.set_progress(i, total, self.progress.DATASET_CONVERSION)
return {
"format": "coco",
"exported_count": len(coco_data["images"]),
"annotation_count": len(coco_data["annotations"]),
"data": coco_data
}
Example: Multi-Format Exporter with Steps
A flexible exporter supporting multiple output formats.
from enum import Enum
from pydantic import BaseModel, Field
from synapse_sdk.plugins.actions.export import BaseExportAction, ExportContext
from synapse_sdk.plugins.steps import BaseStep, StepResult, StepRegistry
class OutputFormat(str, Enum):
COCO = "coco"
YOLO = "yolo"
CSV = "csv"
class MultiFormatParams(BaseModel):
project_id: int
output_format: OutputFormat = OutputFormat.COCO
output_dir: str = Field("/tmp/export", description="Output directory")
class FetchStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "fetch"
@property
def progress_weight(self) -> float:
return 0.3
def execute(self, context: ExportContext) -> StepResult:
project_id = context.params["project_id"]
context.results, context.total_count = context.client.get_assignments({
"project": project_id
})
return StepResult(success=True)
class ConvertStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "convert"
@property
def progress_weight(self) -> float:
return 0.5
def execute(self, context: ExportContext) -> StepResult:
output_format = context.params["output_format"]
if output_format == OutputFormat.COCO:
converted = self._to_coco(context.results)
elif output_format == OutputFormat.YOLO:
converted = self._to_yolo(context.results)
else:
converted = self._to_csv(context.results)
context.exported_count = context.total_count
return StepResult(success=True, data={"output": converted})
def _to_coco(self, results) -> dict:
# COCO conversion logic
return {"format": "coco", "images": [], "annotations": []}
def _to_yolo(self, results) -> list:
# YOLO conversion logic
return []
def _to_csv(self, results) -> str:
# CSV conversion logic
return "id,label,x,y,width,height\n"
class SaveStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "save"
@property
def progress_weight(self) -> float:
return 0.2
def execute(self, context: ExportContext) -> StepResult:
from pathlib import Path
import json
output_dir = Path(context.params["output_dir"])
output_dir.mkdir(parents=True, exist_ok=True)
output_format = context.params["output_format"]
output_data = context.step_results[-1].data["output"]
if output_format in (OutputFormat.COCO,):
output_path = output_dir / "annotations.json"
output_path.write_text(json.dumps(output_data, indent=2))
else:
output_path = output_dir / f"output.{output_format}"
output_path.write_text(str(output_data))
context.output_path = str(output_path)
return StepResult(success=True)
class MultiFormatExportAction(BaseExportAction[MultiFormatParams]):
action_name = "multi_format_export"
params_model = MultiFormatParams
def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchStep())
registry.register(ConvertStep())
registry.register(SaveStep())
Best Practices
Memory-Efficient Processing
Use generators for large datasets to avoid loading everything into memory.
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
# Return a generator, not a list
results = self.client.get_assignments_stream(filters)
count = self.client.count_assignments(filters)
return results, count
def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results(self.params.filters)
for i, item in enumerate(results, 1): # Iterate without loading all
self._process_item(item)
self.set_progress(i, total, self.progress.DATASET_CONVERSION)
Progress Reporting
Report progress at meaningful intervals, not on every item.
def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results(filters)
report_interval = max(1, total // 100) # Report every 1%
for i, item in enumerate(results, 1):
self._process(item)
if i % report_interval == 0 or i == total:
self.set_progress(i, total, self.progress.DATASET_CONVERSION)
Error Recovery
Handle partial failures gracefully in step-based workflows.
class ConvertStep(BaseStep[ExportContext]):
def execute(self, context: ExportContext) -> StepResult:
converted = []
errors = []
for item in context.results:
try:
converted.append(self._convert(item))
except ValueError as e:
errors.append(f"Item {item['id']}: {e}")
context.exported_count = len(converted)
if errors:
context.errors.extend(errors)
# Continue with partial results
return StepResult(
success=True,
data={"converted": converted, "errors": errors}
)
return StepResult(success=True, data={"converted": converted})
Warning: Always validate output format compliance before returning results. Invalid exports can cause downstream failures in ML pipelines.
Template-Based Export with BaseExporter
For plugin developers who want a simpler, template-based approach to building exporters, BaseExporter provides a familiar interface with pre-built file handling utilities.
Overview
BaseExporter is designed for export plugins that need:
- Original file downloading and saving
- JSON data export with error tracking
- Progress and metrics reporting
- Customizable data conversion pipeline
BaseExporter Class
from synapse_sdk.plugins.actions.export import BaseExporter
class BaseExporter:
def __init__(
self,
ctx: RuntimeContext,
export_items: Generator,
path_root: Path,
**params
):
self.ctx = ctx
self.export_items = export_items
self.path_root = Path(path_root)
self.params = params
self.run = ExporterRunAdapter(ctx.logger) # Legacy compatibility
Template Methods
Override these methods to customize export behavior:
| Method | Description |
|---|---|
convert_data(data) | Transform data during export |
before_convert(data) | Pre-process data before conversion |
after_convert(data) | Post-process data after conversion |
save_original_file(result, base_path, error_list) | Save original files |
save_as_json(result, base_path, error_list) | Save data as JSON |
setup_output_directories(path, save_original) | Customize directory structure |
process_file_saving(...) | Custom file saving logic |
additional_file_saving(path) | Post-export file operations |
ExporterRunAdapter
The run attribute provides logging and progress tracking methods:
# Log messages
self.run.log_message("Processing item...")
# Track progress
self.run.set_progress(current, total, step="dataset_conversion")
# Log metrics
record = self.run.MetricsRecord(stand_by=100, success=0, failed=0)
self.run.log_metrics(record, category="original_file")
# Log file export status
self.run.export_log_original_file(item_id, file_info, ExportStatus.SUCCESS, "")
self.run.export_log_data_file(item_id, file_info, ExportStatus.SUCCESS, "")
MetricsRecord
Track export progress with MetricsRecord:
from synapse_sdk.plugins.actions.export import MetricsRecord
record = MetricsRecord(stand_by=100, success=0, failed=0)
# Update as items are processed
record.stand_by -= 1
record.success += 1
# Log metrics
self.run.log_metrics(record, category="data_file")
# Convert to dict
print(record.to_dict()) # {'stand_by': 99, 'success': 1, 'failed': 0}
ExportStatus Enum
Track file export status:
from synapse_sdk.plugins.actions.export import ExportStatus
ExportStatus.SUCCESS # 'success'
ExportStatus.FAILED # 'failed'
ExportStatus.STAND_BY # 'stand_by'
Example: Custom Exporter Plugin
A complete example using BaseExporter with ExportAction:
from pathlib import Path
from typing import Any, Generator
from synapse_sdk.plugins.actions.export import BaseExporter, ExportAction
class MyExporter(BaseExporter):
"""Custom exporter with data transformation."""
def convert_data(self, data: dict[str, Any]) -> dict[str, Any]:
"""Transform annotation data to custom format."""
return {
"id": data.get("id"),
"filename": data.get("files", {}).get("file_name_original"),
"annotations": self._transform_annotations(data.get("data", {}))
}
def _transform_annotations(self, data: dict) -> list:
# Custom transformation logic
return data.get("annotations", [])
def additional_file_saving(self, unique_export_path: Path) -> None:
"""Save metadata file after export."""
import json
metadata = {
"export_name": self.params.get("name"),
"total_items": self.params.get("count"),
"format_version": "1.0"
}
with (unique_export_path / "metadata.json").open("w") as f:
json.dump(metadata, f, indent=2)
class MyExportAction(ExportAction):
"""Export action using custom exporter."""
action_name = "my_export"
@property
def entrypoint(self):
return MyExporter
Plugin Configuration
Configure the action in config.yaml:
name: my_export_plugin
code: my_export_plugin
version: 1.0.0
category: export
actions:
export:
entrypoint: plugin.export.MyExportAction
annotation_types:
- image
- video
Running Locally
Test your export plugin:
synapse plugin run --mode local export --params '{
"storage": 1,
"name": "My Export",
"save_original_file": true,
"path": "exports/my-export",
"target": "task",
"filter": {"project": 123},
"extra_params": {}
}'
Target Handlers
ExportAction uses target handlers to fetch data from different sources.
TargetHandlerFactory
from synapse_sdk.plugins.actions.export import TargetHandlerFactory
# Get handler for target type
handler = TargetHandlerFactory.get_handler("assignment")
handler = TargetHandlerFactory.get_handler("ground_truth")
handler = TargetHandlerFactory.get_handler("task")
Available Handlers
| Handler | Target | Description |
|---|---|---|
AssignmentExportTargetHandler | assignment | Export annotation assignments |
GroundTruthExportTargetHandler | ground_truth | Export ground truth datasets |
TaskExportTargetHandler | task | Export task data |
Custom Target Handler
Implement ExportTargetHandler for custom data sources:
from synapse_sdk.plugins.actions.export import ExportTargetHandler
class CustomTargetHandler(ExportTargetHandler):
def get_results(self, client, filters: dict) -> tuple[Any, int]:
# Fetch data from custom source
results = client.custom_api_call(filters)
return iter(results), len(results)
def validate_filter(self, filters: dict, client) -> dict:
# Validate filter parameters
if "required_field" not in filters:
raise ValueError("required_field is required")
return filters
def get_export_item(self, results) -> Generator:
# Transform results for export
for item in results:
yield self._transform(item)
Related
- Defining Actions - Action definition basics
- RuntimeContext - Context API reference
- Steps Workflow - Step-based workflow guide (fixed step registry with head/tail for export delegation)
- Runtime v2 Client Wiring -
plugin_exports/async_jobsresources used by the delegated path - Dataset Conversion - Format conversion utilities
- Export Error Handling & Failure Classification -
ExportLogMessageCodereference + v2error.codecatalog (SYN-6919) - v2 Export Migration - v2 2-phase export contract (list → bulk-fetch) + keyset/presigned/retry resilience