Export Actions
Export action 은 어노테이션 데이터를 외부에서 사용할 수 있는 다양한 포맷으로 변환합니다. BaseExportAction 을 사용해 assignment, ground truth 데이터셋, task 를 COCO, YOLO 또는 커스텀 스키마 같은 포맷으로 변환하는 커스텀 exporter 를 구축하세요.
개요
Export action 은 다음을 구조화된 방식으로 제공합니다:
- backend 에서 필터링된 어노테이션 결과 조회
- 데이터를 타깃 포맷 (COCO, YOLO, Pascal VOC, CSV 등) 으로 변환
- 내장 progress 리포팅으로 export 진행률 추적
- generator 를 사용한 대용량 데이터셋의 효율적 처리
한눈에 보기
| 속성 | 값 |
|---|---|
| Base Class | BaseExportAction[P] |
| Category | PluginCategory.EXPORT |
| Progress Step | DATASET_CONVERSION |
| Context | ExportContext |
| 실행 모드 | Simple / Step-Based |
| 대용량 경로 | 단일 고정 9-step registry — in-process 또는 server-side 위임 acquisition head + 공유 transform tail (자동, 임계 게이팅 — 서버사이드 위임 참고) |
BaseExportAction
BaseExportAction 은 BaseAction 을 확장해 export 전용 기능을 제공합니다. backend client 접근과 progress 추적을 기본 제공합니다.
from synapse_sdk.plugins.action import BaseAction
from synapse_sdk.plugins.enums import PluginCategory
class BaseExportAction(BaseAction[P]):
category = PluginCategory.EXPORT
제네릭 타입 P 는 params 모델을 의미하며, BaseModel 을 상속해야 합니다.
Progress 카테고리
Export action 은 표준화된 progress 추적을 지원합니다:
| 카테고리 | 상수 | 설명 |
|---|---|---|
| Dataset Conversion | DATASET_CONVERSION | 데이터 변환 및 파일 생성 단계 |
# 변환 진행률 추적
self.set_progress(current, total, self.progress.DATASET_CONVERSION)
주요 메서드
client 프로퍼티
API 호출을 위한 backend client 에 접근합니다. context 에 client 가 없으면 RuntimeError 를 발생시킵니다.
# backend client 접근
assignments = self.client.get_assignments(filters)
알아두기: backend client 가 필요 없다면
get_filtered_results()를 오버라이드해 대체 소스에서 데이터를 가져오세요.
get_filtered_results()
이 추상 메서드를 오버라이드해 export 할 데이터를 가져옵니다. (results_iterator, total_count) 튜플을 반환합니다.
def get_filtered_results(self, filters: dict[str, Any]) -> tuple[Any, int]:
"""Fetch filtered results for export.
Args:
filters: Filter criteria dict.
Returns:
Tuple of (results_iterator, total_count).
"""
return self.client.get_assignments(filters)
setup_steps()
step 기반 실행을 위한 워크플로우 step 을 등록합니다. step 이 등록되면 execute() 보다 step 기반 실행이 우선합니다.
def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchDataStep())
registry.register(ConvertFormatStep())
registry.register(SaveOutputStep())
create_context()
step 기반 워크플로우를 위한 export context 를 생성합니다. context 초기화를 커스터마이즈하려면 오버라이드하세요.
def create_context(self) -> ExportContext:
params_dict = self.params.model_dump()
return ExportContext(
runtime_ctx=self.ctx,
params=params_dict,
)
ExportContext
ExportContext 는 워크플로우 step 사이의 공유 상태를 운반합니다. BaseStepContext 를 확장해 export 전용 필드를 추가합니다.
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from synapse_sdk.plugins.steps import BaseStepContext
if TYPE_CHECKING:
from synapse_sdk.clients.backend import BackendClient
@dataclass
class ExportContext(BaseStepContext):
# Input parameters
params: dict[str, Any] = field(default_factory=dict)
# Processing state (populated by steps)
results: Any | None = None
total_count: int = 0
exported_count: int = 0
output_path: str | None = None
@property
def client(self) -> BackendClient:
"""Backend client from runtime context."""
if self.runtime_ctx.client is None:
raise RuntimeError('No client in runtime context')
return self.runtime_ctx.client
필드
| 필드 | 타입 | 설명 |
|---|---|---|
params | dict[str, Any] | action 에서 전달된 export 파라미터 |
results | Any | None | 가져온 데이터 (step 이 채움) |
total_count | int | export 대상 전체 항목 수 |
exported_count | int | 성공적으로 export 된 항목 수 |
output_path | str | None | 출력 파일/디렉토리 경로 |
프로퍼티
| 프로퍼티 | 타입 | 설명 |
|---|---|---|
client | BackendClient | runtime context 의 backend client. client 가 없으면 RuntimeError 발생. |
상속 필드
BaseStepContext 로부터:
| 필드 | 타입 | 설명 |
|---|---|---|
runtime_ctx | RuntimeContext | logger, client, env 를 갖는 runtime context |
step_results | list[StepResult] | 완료된 step 의 결과 |
errors | list[str] | 누적된 에러 메시지 |
current_step | str | None | 현재 실행 중인 step 이름 |
StepResult
StepResult 는 워크플로우 step 실행 결과를 나타내는 dataclass 입니다.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass
class StepResult:
success: bool = True
data: dict[str, Any] = field(default_factory=dict)
error: str | None = None
rollback_data: dict[str, Any] = field(default_factory=dict)
skipped: bool = False
timestamp: datetime = field(default_factory=datetime.now)
| 필드 | 타입 | 설명 |
|---|---|---|
success | bool | step 성공 여부 (기본값: True) |
data | dict[str, Any] | step 의 출력 데이터 |
error | str | None | step 실패 시 에러 메시지 |
rollback_data | dict[str, Any] | 실패 시 rollback 에 필요한 데이터 |
skipped | bool | step skip 여부 (기본값: False) |
timestamp | datetime | step 완료 시각 |
실행 모드
복잡도에 따라 두 가지 실행 모드 중 선택하세요.
Simple 모드
간단한 export 로직은 execute() 를 직접 오버라이드합니다.
from typing import Any
from pydantic import BaseModel
from synapse_sdk.plugins.actions.export import BaseExportAction
class ExportParams(BaseModel):
project_id: int
output_format: str = "coco"
class SimpleExportAction(BaseExportAction[ExportParams]):
action_name = "simple_export"
params_model = ExportParams
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)
def execute(self) -> dict[str, Any]:
# Fetch data
results, total = self.get_filtered_results({
"project": self.params.project_id
})
# Initialize progress
self.set_progress(0, total, self.progress.DATASET_CONVERSION)
# Process items
exported = []
for i, item in enumerate(results, 1):
exported.append(self._convert_item(item))
self.set_progress(i, total, self.progress.DATASET_CONVERSION)
return {
"format": self.params.output_format,
"exported_count": len(exported),
"data": exported
}
def _convert_item(self, item: dict) -> dict:
# Conversion logic here
return item
팁: 복잡한 에러 복구가 필요 없고 export 로직이 단일 메서드에 들어맞을 때 Simple 모드를 사용하세요.
Step 기반 모드
여러 단계, progress 추적, 실패 시 자동 rollback 이 필요한 복잡한 워크플로우에는 setup_steps() 를 사용합니다.
from pathlib import Path
from synapse_sdk.plugins.actions.export import BaseExportAction, ExportContext
from synapse_sdk.plugins.steps import BaseStep, StepResult, StepRegistry
# Define workflow steps
class FetchDataStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "fetch_data"
@property
def progress_weight(self) -> float:
return 0.2 # 20% of total progress
def execute(self, context: ExportContext) -> StepResult:
filters = context.params.get("filters", {})
context.results, context.total_count = context.client.get_assignments(filters)
return StepResult(success=True)
class ConvertFormatStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "convert_format"
@property
def progress_weight(self) -> float:
return 0.6 # 60% of total progress
def execute(self, context: ExportContext) -> StepResult:
converted = []
for item in context.results:
converted.append(self._convert(item))
context.exported_count = len(converted)
return StepResult(success=True, data={"converted": converted})
def _convert(self, item: dict) -> dict:
return item
class SaveOutputStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "save_output"
@property
def progress_weight(self) -> float:
return 0.2 # 20% of total progress
def execute(self, context: ExportContext) -> StepResult:
context.output_path = "/tmp/export_output.json"
return StepResult(success=True)
def rollback(self, context: ExportContext, result: StepResult) -> None:
# Clean up output file on failure
if context.output_path:
Path(context.output_path).unlink(missing_ok=True)
# Register steps in action
class StepBasedExportAction(BaseExportAction[ExportParams]):
action_name = "step_export"
params_model = ExportParams
def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchDataStep())
registry.register(ConvertFormatStep())
registry.register(SaveOutputStep())
Step 기반 모드의 이점:
- 자동 progress: step weight 로부터 진행률이 계산됨
- 자동 rollback: 실패 시
rollback()이 역순으로 호출됨 - 재사용 가능한 step: 여러 export action 에서 step 공유
- 테스트 가능한 단위: 각 step 을 독립적으로 테스트
Export 타깃
Export action 은 세 가지 데이터 타깃을 지원합니다.
Assignment
어노테이션 작업 (labeling 결과, review 등) 을 export 합니다.
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)
Ground Truth
ML 학습을 위해 큐레이션된 ground truth 데이터셋을 export 합니다.
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
events = self.client.list_ground_truth_events(
params=filters,
list_all=True
)
return events, len(events)
Task
task 메타데이터와 설정을 export 합니다.
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_tasks(filters)
대량 데이터 무중단 export (Large-volume resilient export)
Export 는 한 프로젝트/데이터셋의 전체 task / assignment / ground-truth 를 빠짐없이 한 번 회수해야 한다. 데이터가 커질수록 단순 REST 순회는 (1) HTTP 왕복 수 증가, (2) cursor 열화, (3) payload 의 API 티어 통과, (4) 일시적 네트워크/DB 장애 취약 — 한 번의 페이지 실패로 export 전체가 중단된다. v2 export 는 이를 4가지 기법으로 완화한다.
1. 2-phase: list(slim) → bulk-fetch
slim list 로 id 만 모은 뒤(_collect_then_bulk 헬퍼) chunk(≤200) 단위 bulk-fetch 로 heavy payload 를 회수한다. list 응답에 무거운 data 를 싣지 않아 page 비용이 작다.
2. 유니크 keyset cursor — offset 열화 제거
list 정렬을 비유니크 -created 가 아니라 유니크 PK keyset (sort='-id') 으로 한다. DRF CursorPagination 은 cursor position 이 단일 필드라, 비유니크 정렬에서 동일 값 군집을 offset 으로 처리 → 깊은 페이지가 점진적으로 느려진다. 유니크 키로 정렬하면 매 페이지가 인덱스 seek (WHERE project=X AND id < cursor ORDER BY id DESC LIMIT N) 로 일정 비용을 유지한다. backend 에 (project, id) 복합 인덱스가 동반된다(synapse-backend).
- Task / Assignment:
sort='-id'적용. - GroundTruth:
ground_truth_events가 이미-pgh_id(event PK) keyset 이라 변경 불요.
3. presigned bulk — payload 를 API 가 아닌 object storage 에서
bulk 호출을 response_mode='presigned' 로 보내면 외재화된 data 가 presigned URL wrapper 로 돌아온다. 핸들러는 _resolve_data_payload() 로 이를 감지해 synapse_sdk.utils.file.download.download_json(presigned_url) 로 S3 에서 직접 실제 payload 를 받는다.
- 대용량
data가 API 티어를 통과하지 않고, inline 한도(*_INLINE_MAX_BYTES) 로 잘리지 않는다. - 외재화 안 된 row 는 inline raw dict 로 폴백 — 판별은
data.get('mode') in {'presigned','inline'}보수 가드. forward-compatible.
4. bounded retry — 일시적 실패에 무중단
Export list paginator pass-through 의 retry_on_status 에 502/503/504 에 더해 500 을 포함하고 page_retries=3 를 보장한다(EXPORT_LIST_RETRY_ON_STATUS). 느린 쿼리 중 DB 연결 끊김성 500 / 게이트웨이 일시 오류에 export 가 죽지 않고 페이지 단위로 재시도한다. bulk-fetch 사이에는 throttle_seconds 로 back-pressure 를 유지한다(Task 0.2s 등). 전역 DEFAULT_RETRY_ON_STATUS 는 불변 — export 한정 정책이다.
page_retries / retry_on_status 는 export list paginator pass-through 로 전달된다(v2 transport 기본값에 더해 export 한정 정책). 다른 v2 consumer 동작은 영향받지 않는다.
한계와 향후 작업 (TODO)
위 기법들은 REST 를 유지한 점진적 개선으로 헤드룸을 확보하지만, 행 단위로 REST 를 통해 끌어오는 방식 자체는 요청 분량에 선형 비례하는 구조적 한계가 남는다. 근본 확장 해법은 별도 작업(익스포트 안정성/속도 개선)으로 추적한다:
- (A안) 서버사이드 비동기 export-to-storage — backend AsyncJob 이 서버 keyset 스캔 한 번으로 전체를 raw bundle(JSONL + files + manifest)로 storage 에 기록 → SDK 는 enqueue + 진행률 스트리밍 후 bundle 을 읽어 공유 transform tail(ConvertData → SaveFiles → Finalize) 로 플러그인 산출물을 만든다. N 회 왕복 / 클라 타임아웃 / 연결 끊김 취약성 제거. 임계 초과 시 투명 위임으로 SYN-7104 P2 에서 1차 도입됨 — 아래 대량 export 의 서버사이드 위임 참고. (확장성 최종 해법)
- (B안) 스트리밍 응답(NDJSON) — backend
StreamingHttpResponse+ 서버 keyset 단일 커서로 한 connection 에 전량 스트리밍 (중간 옵션). - (C안) GroundTruthDatasetVersion 스냅샷 사전 생성 — 불변 GTDV 의 export 아티팩트를 version release 시점에 미리 S3 에 생성 → train/export 는 다운로드만.
- presigned 다운로드 병렬화 — 현재
_resolve_data_payload는 per-item 직렬. 외재화 본격 도입 시 chunk 병렬 다운로드로 전환. - export read timeout /
page_retries/throttle_seconds를 plugin config 노출 — 현재 핸들러 상수(EXPORT_*).
대량 export 의 서버사이드 위임 (Backend async-job)
[SYN-7182] 이제 모든 export 가 수량과 무관하게 서버사이드로 위임된다. 아래 이 절의 "count > threshold 초과 시에만 위임" 서술은 과거(임계 기반) 동작이며, 임계(export_async_job_size_threshold / SYNAPSE_EXPORT_ASYNC_JOB_SIZE_THRESHOLD / ExportParams.async_job_size_threshold)는 deprecated — no-op 다. 위임 게이트는 이제 ① 위임 enabled, ② v2_client 배선, ③ project_id 해소 세 가지만 본다(건수 무관). count 는 분할 fan-out·progress 용으로 여전히 프로브되나 결정에 영향을 주지 않는다.
비-delegate(in-process) 경로는 deprecated 다(향후 릴리즈에서 제거 예정). v2 list 표면이 선택 필터 ids/ids_ex/search 를 떨궈(전역 PrimaryKeyFilterBackend/SearchFilter 누락) 선택/검색 export 시 프로젝트 전량을 silent over-export 하는 정확성 결함이 있다. 따라서 in-process 는 위임 불가 상황(v2_client 미배선 / kill-switch export_disable_async_delegate / project_id 미해소 — 예: ground_truth target)에서만 폴백으로만 동작하며, 진입 시 DeprecationWarning + 워커 로그 경고를 사유와 함께 발생시킨다. 권장: 위임을 활성 상태로 유지하라.
위의 v2 2-phase 경로(list → bulk-fetch)는 SDK(ray job)가 v2 REST 로 데이터를 끌어와 in-process 로 변환한다. 10만 건 이상 + 원본 파일이 큰 export 에서는 이 끌어오기 자체가 비용·취약성의 핵심이다. SYN-7104 P2 는 이를 한 단계 더 끌어올려, 대상 건수가 임계를 넘으면 데이터 취득(acquisition)을 backend async-job 에 위임(서버사이드 keyset 스캔 → raw bundle 기록)한다. 단, 위임은 데이터를 어떻게 취득하는가의 차이일 뿐이며, 위임/비위임 양쪽 모두 동일한 공유 transform tail 을 거쳐 동일한 플러그인 산출물을 만든다(투명 위임 · FR-8 parity).
이 위임은 기존 v2 2-phase 경로 위에 얹히는 추가 레이어다 — 임계 미만 export 는 종전과 동일하게 in-process head(list → bulk-fetch)로 데이터를 취득하고, 임계 초과 export 는 delegated head 로 취득한다. 어느 경로든 취득 후에는 같은 공유 tail(ConvertData → SaveFiles → Finalize) 이 실행되므로 플러그인 커스텀 로직이 양쪽에서 동일하게 적용된다.
위임 게이트는 **기본 활성(ON)**이다. 위임은 데이터 취득 경로만 서버사이드로 바꿀 뿐, 산출물은 비위임 경로와 동일한 플러그인 포맷이다 — 위임 경로도 비위임과 같은 공유 transform tail(ConvertData → SaveFiles) 을 거치기 때문이다(아래 투명 acquisition 과 공유 tail 참고). backend 가 쓰는 raw bundle 은 위임 head 의 중간 산출물이며, SDK 가 이를 읽어(BundleRead) tail 로 전달한다. 즉시 in-process 취득으로 되돌리려면 아래 kill-switch 를 사용한다. 신규 위임 경로는 프로덕션 반영 전 staging 검증을 권장한다.
위임 게이트와 임계
setup_steps() 는 위임 여부로 분기하지 않는다 — 항상 단일 고정 9-step registry 를 등록한다(아래 단일 고정 step registry 참고). 위임 결정(_resolve_delegation_decision())은 run() 시점에 한 번 평가되어 context.delegation_decision 에 seed 되고, 각 head step 의 can_skip 이 이를 참조해 비활성 head 를 완료-pass 한다. 위임은 아래 조건을 모두 만족할 때만 발동한다.
| 조건 | 설명 |
|---|---|
| 위임 enabled | kill-switch 가 꺼져 있어야 한다(기본 ON). |
v2_client 존재 | ctx.v2_client 가 wiring 되어 있어야 한다(Runtime v2 Client 참고). |
count > threshold | [SYN-7182 deprecated] 더 이상 게이트가 아니다 — 수량 무관하게 위임한다. |
project_id 확보 | 위임 enqueue 에 필요한 프로젝트 식별자가 있어야 한다(없으면 deprecated in-process 폴백 — 예: ground_truth). |
건수 산정은 풀스캔 없이 v1 단일페이지 count 프로브로 한다 — 첫 페이지의 count 만 읽고, count 를 못 구하면(프로브 실패) 비위임으로 폴백한다. 임계 기본값은 1,000이며 다음으로 오버라이드한다.
| 항목 | ExportParams 필드 | extra_params / config 키 | env |
|---|---|---|---|
| 임계 override | async_job_size_threshold | export_async_job_size_threshold | SYNAPSE_EXPORT_ASYNC_JOB_SIZE_THRESHOLD |
| kill-switch (즉시 in-process 복귀) | disable_async_delegate | export_disable_async_delegate | SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1 |
우선순위(높음 → 낮음): ExportParams 명시 필드 > params.extra_params 의 동일 키 > 액션 config > env > 모듈 기본(1,000). ExportParams.async_job_size_threshold / disable_async_delegate 는 모두 optional 이며 기본 None — 미지정 시 종전대로 env/기본값으로 폴백한다(BC). 즉 export config 만으로 env 변경 없이 위임 임계를 조정하거나 위임을 끌 수 있다.
SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1(env) · config export_disable_async_delegate · ExportParams.disable_async_delegate=True 중 어느 하나라도 켜면 게이트가 즉시 닫히고, 모든 export 가 임계와 무관하게 기존 in-process 경로로 처리된다. 인시던트 시 PR revert 없이 롤백할 수 있다(v2 export 의 SYNAPSE_FORCE_V1_EXPORT kill-switch 와 동일한 운영 패턴). 운영자 env kill-switch 가 켜져 있으면 export config 로는 재활성할 수 없다(env 우선) — config/필드는 위임을 끌 수만 있고 다시 켜지는 못한다.
단일 고정 step registry (9-step)
setup_steps() 는 위임 여부로 분기하지 않고 항상 동일한 9개 step 을 등록한다. 종전의 "분기형 registry(위임 3-step OR 비위임 6-step)" 모델은 모드에 따라 job 의 step 목록 자체가 달라져, 긴 acquisition 이 전체 진행률을 정체시켰다. 단일 고정 registry 는 이를 제거한다 — registry 는 항상 9-step 이고, 위임 결정에 따라 비활성 head 의 각 step 만 can_skip 으로 완료-pass(skip → 100%, orchestrator 가 보고) 된다.
9-step 은 두 acquisition head + 공유 tail 로 구성된다:
- In-process head:
Initialize → FetchResults → PrepareExport— v2 list → bulk-fetch 로 데이터 취득. - Delegated head:
DelegateEnqueue → DelegateStreamProgress → BundleRead— backend async-job enqueue → SSE 진행률 → raw bundle 읽기. - Shared tail (plugin transform):
ConvertData → SaveFiles → Finalize— 두 head 가 공유하는 동일 step 클래스. 플러그인 커스텀 transform 이 위임/비위임 양쪽에서 동일하게 실행된다(FR-8 투명성).
고정 9-step — 비활성 head 의 step 은 can_skip(=is_delegated(context) 또는 그 역)으로 완료-pass:
| 영역 | Step | 역할 | 위임 시 | 비위임 시 |
|---|---|---|---|---|
| in-process head | Initialize | storage/path 초기화. | skip(pass) | 실행 |
| in-process head | FetchResults | v2 list → bulk-fetch 로 결과 회수. | skip(pass) | 실행 |
| in-process head | PrepareExport | export params build · project config. | skip(pass) | 실행 |
| delegated head | DelegateEnqueue | plugin_exports.create 로 export job enqueue. | 실행 | skip(pass) |
| delegated head | DelegateStreamProgress | async_jobs.stream_progress (SSE) 진행률 수신. | 실행 | skip(pass) |
| delegated head | BundleRead | raw bundle(JSONL+files+manifest) 읽기 + url→bundle_path rewrite. | 실행 | skip(pass) |
| shared tail | ConvertData | plugin 커스텀 transform. | 실행 | 실행 |
| shared tail | SaveFiles | 로컬 파일 저장. | 실행 | 실행 |
| shared tail | Finalize | 마무리 · cleanup. | 실행 | 실행 |
투명 acquisition 과 공유 tail
위임/비위임은 "데이터를 어떻게 취득하는가"의 차이일 뿐이며 산출물은 동일하다(FR-8 parity). 핵심은 공유 transform tail 이다 — 플러그인이 template 기반으로 override 하는 convert_data 등 커스텀 처리는 ConvertData → SaveFiles → Finalize 에서 실행되며, 이 tail 은 위임/비위임 양쪽 모두 거친다. 따라서 위임 export 도 플러그인 커스텀 로직이 그대로 적용된다(이것이 분기형(Option A)이 아니라 통합 파이프라인(Option B)을 채택한 이유다).
| 비위임 (in-process head) | 위임 (delegated head) | |
|---|---|---|
| 데이터 취득 위치 | SDK(ray job) — v2 list → bulk-fetch | backend async-job — 서버 keyset 스캔 → raw bundle |
| 취득 중간 산출물 | in-process iterator | raw bundle(JSONL + files + manifest), SDK 가 BundleRead 로 읽음 |
ConvertData (plugin transform) | 적용됨 | 적용됨 (공유 tail) |
SaveFiles (로컬 파일 저장) | 적용됨 | 적용됨 (공유 tail) |
| 최종 산출물 | plugin 포맷 | 동일한 plugin 포맷 |
bundle-read 계약
위임 시 backend 는 {prefix}/{tasks|assignments|ground_truths}.jsonl 를 쓰고, 각 라인은 {id, data, files_manifest: {spec: {file_name_original, url, bundle_path}}} 형태다. SDK 의 BundleRead step 은 map_files_to_bundle_local 로 각 항목의 url 을 {prefix}/{bundle_path} 로 rewrite(traversal-guard 적용)해 로컬 bundle 을 가리키게 하고, 출력 파일명은 file_name_original 을 유지한다. rewrite 된 항목은 공유 tail 의 ConvertData/SaveFiles 로 전달되어 비위임과 동일하게 처리된다.
관측성
delegated head step 도 in-process head 와 동등한 관측성을 남긴다 — 즉 위임이라고 해서 진행률·메트릭·이벤트 로그가 비는 일은 없다. (취득 이후의 공유 tail 관측성은 위임/비위임 공통이다.)
| 신호 | API | 비고 |
|---|---|---|
| 진행률 | set_progress(..., step='server_export') | step 이름은 server_export. |
| 메트릭 | set_metrics(...) | success / failed. |
| job-log 이벤트 | export_info / export_completed / export_failed | |
| user-facing 메시지 | log_message(...) | EXPORT_DELEGATED_TO_SERVER / EXPORT_DELEGATED_COMPLETED. |
역할 분담: backend 는 progress(processed/total)만 SSE 로 흘리고, event / metric / user-facing 메시지는 SDK 가 전담한다. 따라서 DelegateStreamProgress 는 SSE 진행률을 set_progress 로 중계하고, enqueue / bundle-read step 이 이벤트·메트릭·메시지를 마무리한다.
v2 client resource
delegated head 는 BackendV2Client 의 다음 resource 를 사용한다(상세 wiring 은 Runtime v2 Client 참고).
| Resource | 용도 |
|---|---|
v2_client.plugin_exports.create(...) | export job enqueue (DelegateEnqueue). |
v2_client.async_jobs.stream_progress(job_id) | SSE 진행률 스트림 (DelegateStreamProgress). |
v2_client.async_jobs.retrieve(job_id) | manifest → bundle prefix 확정, BundleRead 가 raw bundle 을 읽어 tail 로 전달. |
예제
예제: COCO 포맷 Exporter
어노테이션을 COCO 포맷으로 export 하는 완전한 예제입니다.
from pydantic import BaseModel, Field
from synapse_sdk.plugins.actions.export import BaseExportAction
from typing import Any
class CocoExportParams(BaseModel):
project_id: int = Field(..., description="Project ID to export")
include_images: bool = Field(True, description="Include image metadata")
class CocoExportAction(BaseExportAction[CocoExportParams]):
action_name = "coco_export"
params_model = CocoExportParams
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)
def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results({
"project": self.params.project_id
})
self.set_progress(0, total, self.progress.DATASET_CONVERSION)
coco_data = {
"info": {"description": "Exported from Synapse"},
"images": [],
"annotations": [],
"categories": []
}
annotation_id = 0
category_map = {}
for i, assignment in enumerate(results, 1):
# Add image
data_unit = assignment.get("data_unit", {})
image_id = data_unit.get("id")
if self.params.include_images:
coco_data["images"].append({
"id": image_id,
"file_name": data_unit.get("name", ""),
"width": data_unit.get("width", 0),
"height": data_unit.get("height", 0)
})
# Add annotations
for ann in assignment.get("data", {}).get("annotations", []):
category = ann.get("category", "default")
if category not in category_map:
cat_id = len(category_map)
category_map[category] = cat_id
coco_data["categories"].append({
"id": cat_id,
"name": category
})
coco_data["annotations"].append({
"id": annotation_id,
"image_id": image_id,
"category_id": category_map[category],
"bbox": ann.get("bbox", []),
"area": ann.get("area", 0),
"iscrowd": 0
})
annotation_id += 1
self.set_progress(i, total, self.progress.DATASET_CONVERSION)
return {
"format": "coco",
"exported_count": len(coco_data["images"]),
"annotation_count": len(coco_data["annotations"]),
"data": coco_data
}
예제: Step 을 사용한 다중 포맷 Exporter
여러 출력 포맷을 지원하는 유연한 exporter 입니다.
from enum import Enum
from pydantic import BaseModel, Field
from synapse_sdk.plugins.actions.export import BaseExportAction, ExportContext
from synapse_sdk.plugins.steps import BaseStep, StepResult, StepRegistry
class OutputFormat(str, Enum):
COCO = "coco"
YOLO = "yolo"
CSV = "csv"
class MultiFormatParams(BaseModel):
project_id: int
output_format: OutputFormat = OutputFormat.COCO
output_dir: str = Field("/tmp/export", description="Output directory")
class FetchStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "fetch"
@property
def progress_weight(self) -> float:
return 0.3
def execute(self, context: ExportContext) -> StepResult:
project_id = context.params["project_id"]
context.results, context.total_count = context.client.get_assignments({
"project": project_id
})
return StepResult(success=True)
class ConvertStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "convert"
@property
def progress_weight(self) -> float:
return 0.5
def execute(self, context: ExportContext) -> StepResult:
output_format = context.params["output_format"]
if output_format == OutputFormat.COCO:
converted = self._to_coco(context.results)
elif output_format == OutputFormat.YOLO:
converted = self._to_yolo(context.results)
else:
converted = self._to_csv(context.results)
context.exported_count = context.total_count
return StepResult(success=True, data={"output": converted})
def _to_coco(self, results) -> dict:
# COCO conversion logic
return {"format": "coco", "images": [], "annotations": []}
def _to_yolo(self, results) -> list:
# YOLO conversion logic
return []
def _to_csv(self, results) -> str:
# CSV conversion logic
return "id,label,x,y,width,height\n"
class SaveStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "save"
@property
def progress_weight(self) -> float:
return 0.2
def execute(self, context: ExportContext) -> StepResult:
from pathlib import Path
import json
output_dir = Path(context.params["output_dir"])
output_dir.mkdir(parents=True, exist_ok=True)
output_format = context.params["output_format"]
output_data = context.step_results[-1].data["output"]
if output_format in (OutputFormat.COCO,):
output_path = output_dir / "annotations.json"
output_path.write_text(json.dumps(output_data, indent=2))
else:
output_path = output_dir / f"output.{output_format}"
output_path.write_text(str(output_data))
context.output_path = str(output_path)
return StepResult(success=True)
class MultiFormatExportAction(BaseExportAction[MultiFormatParams]):
action_name = "multi_format_export"
params_model = MultiFormatParams
def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchStep())
registry.register(ConvertStep())
registry.register(SaveStep())
모범 사례
메모리 효율적 처리
대용량 데이터셋은 모든 것을 메모리에 올리지 않도록 generator 를 사용하세요.
def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
# Return a generator, not a list
results = self.client.get_assignments_stream(filters)
count = self.client.count_assignments(filters)
return results, count
def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results(self.params.filters)
for i, item in enumerate(results, 1): # Iterate without loading all
self._process_item(item)
self.set_progress(i, total, self.progress.DATASET_CONVERSION)
Progress 리포팅
매 항목마다가 아니라 의미 있는 간격으로 progress 를 리포팅하세요.
def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results(filters)
report_interval = max(1, total // 100) # Report every 1%
for i, item in enumerate(results, 1):
self._process(item)
if i % report_interval == 0 or i == total:
self.set_progress(i, total, self.progress.DATASET_CONVERSION)
에러 복구
step 기반 워크플로우에서 부분 실패를 우아하게 처리하세요.
class ConvertStep(BaseStep[ExportContext]):
def execute(self, context: ExportContext) -> StepResult:
converted = []
errors = []
for item in context.results:
try:
converted.append(self._convert(item))
except ValueError as e:
errors.append(f"Item {item['id']}: {e}")
context.exported_count = len(converted)
if errors:
context.errors.extend(errors)
# Continue with partial results
return StepResult(
success=True,
data={"converted": converted, "errors": errors}
)
return StepResult(success=True, data={"converted": converted})
경고: 결과를 반환하기 전에 항상 출력 포맷의 규격 준수를 검증하세요. 잘못된 export 는 ML 파이프라인의 downstream 실패를 유발할 수 있습니다.
BaseExporter 를 사용한 템플릿 기반 Export
exporter 구축에 더 단순한 템플릿 기반 접근을 원하는 plugin 개발자를 위해, BaseExporter 는 사전 구축된 파일 처리 유틸리티와 함께 익숙한 인터페이스를 제공합니다.
개요
BaseExporter 는 다음이 필요한 export plugin 을 위해 설계되었습니다:
- 원본 파일 다운로드 및 저장
- 에러 추적이 포함된 JSON 데이터 export
- progress 및 metric 리포팅
- 커스터마이즈 가능한 데이터 변환 파이프라인
BaseExporter 클래스
from synapse_sdk.plugins.actions.export import BaseExporter
class BaseExporter:
def __init__(
self,
ctx: RuntimeContext,
export_items: Generator,
path_root: Path,
**params
):
self.ctx = ctx
self.export_items = export_items
self.path_root = Path(path_root)
self.params = params
self.run = ExporterRunAdapter(ctx.logger) # Legacy compatibility
템플릿 메서드
Export 동작을 커스터마이즈하려면 다음 메서드를 오버라이드하세요:
| 메서드 | 설명 |
|---|---|
convert_data(data) | export 중 데이터 변환 |
before_convert(data) | 변환 전 데이터 전처리 |
after_convert(data) | 변환 후 데이터 후처리 |
save_original_file(result, base_path, error_list) | 원본 파일 저장 |
save_as_json(result, base_path, error_list) | 데이터를 JSON 으로 저장 |
setup_output_directories(path, save_original) | 디렉토리 구조 커스터마이즈 |
process_file_saving(...) | 커스텀 파일 저장 로직 |
additional_file_saving(path) | export 후 파일 작업 |
ExporterRunAdapter
run 속성은 로깅 및 progress 추적 메서드를 제공합니다:
# Log messages
self.run.log_message("Processing item...")
# Track progress
self.run.set_progress(current, total, step="dataset_conversion")
# Log metrics
record = self.run.MetricsRecord(stand_by=100, success=0, failed=0)
self.run.log_metrics(record, category="original_file")
# Log file export status
self.run.export_log_original_file(item_id, file_info, ExportStatus.SUCCESS, "")
self.run.export_log_data_file(item_id, file_info, ExportStatus.SUCCESS, "")
MetricsRecord
MetricsRecord 로 export 진행 상황을 추적하세요:
from synapse_sdk.plugins.actions.export import MetricsRecord
record = MetricsRecord(stand_by=100, success=0, failed=0)
# Update as items are processed
record.stand_by -= 1
record.success += 1
# Log metrics
self.run.log_metrics(record, category="data_file")
# Convert to dict
print(record.to_dict()) # {'stand_by': 99, 'success': 1, 'failed': 0}
ExportStatus Enum
파일 export 상태를 추적합니다:
from synapse_sdk.plugins.actions.export import ExportStatus
ExportStatus.SUCCESS # 'success'
ExportStatus.FAILED # 'failed'
ExportStatus.STAND_BY # 'stand_by'
예제: 커스텀 Exporter Plugin
BaseExporter 와 ExportAction 을 사용하는 완전한 예제:
from pathlib import Path
from typing import Any, Generator
from synapse_sdk.plugins.actions.export import BaseExporter, ExportAction
class MyExporter(BaseExporter):
"""Custom exporter with data transformation."""
def convert_data(self, data: dict[str, Any]) -> dict[str, Any]:
"""Transform annotation data to custom format."""
return {
"id": data.get("id"),
"filename": data.get("files", {}).get("file_name_original"),
"annotations": self._transform_annotations(data.get("data", {}))
}
def _transform_annotations(self, data: dict) -> list:
# Custom transformation logic
return data.get("annotations", [])
def additional_file_saving(self, unique_export_path: Path) -> None:
"""Save metadata file after export."""
import json
metadata = {
"export_name": self.params.get("name"),
"total_items": self.params.get("count"),
"format_version": "1.0"
}
with (unique_export_path / "metadata.json").open("w") as f:
json.dump(metadata, f, indent=2)
class MyExportAction(ExportAction):
"""Export action using custom exporter."""
action_name = "my_export"
@property
def entrypoint(self):
return MyExporter
Plugin 설정
config.yaml 에서 action 을 설정합니다:
name: my_export_plugin
code: my_export_plugin
version: 1.0.0
category: export
actions:
export:
entrypoint: plugin.export.MyExportAction
annotation_types:
- image
- video
로컬에서 실행
Export plugin 을 테스트합니다:
synapse plugin run --mode local export --params '{
"storage": 1,
"name": "My Export",
"save_original_file": true,
"path": "exports/my-export",
"target": "task",
"filter": {"project": 123},
"extra_params": {}
}'
Target Handler
ExportAction 은 target handler 를 사용해 서로 다른 소스에서 데이터를 가져옵니다.
TargetHandlerFactory
from synapse_sdk.plugins.actions.export import TargetHandlerFactory
# Get handler for target type
handler = TargetHandlerFactory.get_handler("assignment")
handler = TargetHandlerFactory.get_handler("ground_truth")
handler = TargetHandlerFactory.get_handler("task")
사용 가능한 Handler
| Handler | Target | 설명 |
|---|---|---|
AssignmentExportTargetHandler | assignment | 어노테이션 assignment export |
GroundTruthExportTargetHandler | ground_truth | ground truth 데이터셋 export |
TaskExportTargetHandler | task | task 데이터 export |
커스텀 Target Handler
커스텀 데이터 소스를 위해 ExportTargetHandler 를 구현하세요:
from synapse_sdk.plugins.actions.export import ExportTargetHandler
class CustomTargetHandler(ExportTargetHandler):
def get_results(self, client, filters: dict) -> tuple[Any, int]:
# Fetch data from custom source
results = client.custom_api_call(filters)
return iter(results), len(results)
def validate_filter(self, filters: dict, client) -> dict:
# Validate filter parameters
if "required_field" not in filters:
raise ValueError("required_field is required")
return filters
def get_export_item(self, results) -> Generator:
# Transform results for export
for item in results:
yield self._transform(item)
관련 문서
- Defining Actions - Action 정의 기초
- RuntimeContext - Context API 레퍼런스
- Steps Workflow - Step 기반 워크플로우 가이드 (export 위임에 대한 고정 step registry + head/tail)
- Runtime v2 Client Wiring - 위임 경로가 사용하는
plugin_exports/async_jobsresource - Dataset Conversion - 포맷 변환 유틸리티
- Export Error Handling & Failure Classification -
ExportLogMessageCode레퍼런스 + v2error.code카탈로그 (SYN-6919) - v2 Export Migration - v2 2-phase export 계약 (list → bulk-fetch) + keyset/presigned/retry 복원력