본문으로 건너뛰기

Export Actions

Export action 은 어노테이션 데이터를 외부에서 사용할 수 있는 다양한 포맷으로 변환합니다. BaseExportAction 을 사용해 assignment, ground truth 데이터셋, task 를 COCO, YOLO 또는 커스텀 스키마 같은 포맷으로 변환하는 커스텀 exporter 를 구축하세요.

개요

Export action 은 다음을 구조화된 방식으로 제공합니다:

  • backend 에서 필터링된 어노테이션 결과 조회
  • 데이터를 타깃 포맷 (COCO, YOLO, Pascal VOC, CSV 등) 으로 변환
  • 내장 progress 리포팅으로 export 진행률 추적
  • generator 를 사용한 대용량 데이터셋의 효율적 처리

한눈에 보기

속성
Base ClassBaseExportAction[P]
CategoryPluginCategory.EXPORT
Progress StepDATASET_CONVERSION
ContextExportContext
실행 모드Simple / Step-Based
대용량 경로단일 고정 9-step registry — in-process 또는 server-side 위임 acquisition head + 공유 transform tail (자동, 임계 게이팅 — 서버사이드 위임 참고)

BaseExportAction

BaseExportActionBaseAction 을 확장해 export 전용 기능을 제공합니다. backend client 접근과 progress 추적을 기본 제공합니다.

from synapse_sdk.plugins.action import BaseAction
from synapse_sdk.plugins.enums import PluginCategory

class BaseExportAction(BaseAction[P]):
category = PluginCategory.EXPORT

제네릭 타입 P 는 params 모델을 의미하며, BaseModel 을 상속해야 합니다.

Progress 카테고리

Export action 은 표준화된 progress 추적을 지원합니다:

카테고리상수설명
Dataset ConversionDATASET_CONVERSION데이터 변환 및 파일 생성 단계
# 변환 진행률 추적
self.set_progress(current, total, self.progress.DATASET_CONVERSION)

주요 메서드

client 프로퍼티

API 호출을 위한 backend client 에 접근합니다. context 에 client 가 없으면 RuntimeError 를 발생시킵니다.

# backend client 접근
assignments = self.client.get_assignments(filters)

알아두기: backend client 가 필요 없다면 get_filtered_results() 를 오버라이드해 대체 소스에서 데이터를 가져오세요.

get_filtered_results()

이 추상 메서드를 오버라이드해 export 할 데이터를 가져옵니다. (results_iterator, total_count) 튜플을 반환합니다.

def get_filtered_results(self, filters: dict[str, Any]) -> tuple[Any, int]:
"""Fetch filtered results for export.

Args:
filters: Filter criteria dict.

Returns:
Tuple of (results_iterator, total_count).
"""
return self.client.get_assignments(filters)

setup_steps()

step 기반 실행을 위한 워크플로우 step 을 등록합니다. step 이 등록되면 execute() 보다 step 기반 실행이 우선합니다.

def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchDataStep())
registry.register(ConvertFormatStep())
registry.register(SaveOutputStep())

create_context()

step 기반 워크플로우를 위한 export context 를 생성합니다. context 초기화를 커스터마이즈하려면 오버라이드하세요.

def create_context(self) -> ExportContext:
params_dict = self.params.model_dump()
return ExportContext(
runtime_ctx=self.ctx,
params=params_dict,
)

ExportContext

ExportContext 는 워크플로우 step 사이의 공유 상태를 운반합니다. BaseStepContext 를 확장해 export 전용 필드를 추가합니다.

from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

from synapse_sdk.plugins.steps import BaseStepContext

if TYPE_CHECKING:
from synapse_sdk.clients.backend import BackendClient

@dataclass
class ExportContext(BaseStepContext):
# Input parameters
params: dict[str, Any] = field(default_factory=dict)

# Processing state (populated by steps)
results: Any | None = None
total_count: int = 0
exported_count: int = 0
output_path: str | None = None

@property
def client(self) -> BackendClient:
"""Backend client from runtime context."""
if self.runtime_ctx.client is None:
raise RuntimeError('No client in runtime context')
return self.runtime_ctx.client

필드

필드타입설명
paramsdict[str, Any]action 에서 전달된 export 파라미터
resultsAny | None가져온 데이터 (step 이 채움)
total_countintexport 대상 전체 항목 수
exported_countint성공적으로 export 된 항목 수
output_pathstr | None출력 파일/디렉토리 경로

프로퍼티

프로퍼티타입설명
clientBackendClientruntime context 의 backend client. client 가 없으면 RuntimeError 발생.

상속 필드

BaseStepContext 로부터:

필드타입설명
runtime_ctxRuntimeContextlogger, client, env 를 갖는 runtime context
step_resultslist[StepResult]완료된 step 의 결과
errorslist[str]누적된 에러 메시지
current_stepstr | None현재 실행 중인 step 이름

StepResult

StepResult 는 워크플로우 step 실행 결과를 나타내는 dataclass 입니다.

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

@dataclass
class StepResult:
success: bool = True
data: dict[str, Any] = field(default_factory=dict)
error: str | None = None
rollback_data: dict[str, Any] = field(default_factory=dict)
skipped: bool = False
timestamp: datetime = field(default_factory=datetime.now)
필드타입설명
successboolstep 성공 여부 (기본값: True)
datadict[str, Any]step 의 출력 데이터
errorstr | Nonestep 실패 시 에러 메시지
rollback_datadict[str, Any]실패 시 rollback 에 필요한 데이터
skippedboolstep skip 여부 (기본값: False)
timestampdatetimestep 완료 시각

실행 모드

복잡도에 따라 두 가지 실행 모드 중 선택하세요.

Simple 모드

간단한 export 로직은 execute() 를 직접 오버라이드합니다.

from typing import Any

from pydantic import BaseModel
from synapse_sdk.plugins.actions.export import BaseExportAction

class ExportParams(BaseModel):
project_id: int
output_format: str = "coco"

class SimpleExportAction(BaseExportAction[ExportParams]):
action_name = "simple_export"
params_model = ExportParams

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)

def execute(self) -> dict[str, Any]:
# Fetch data
results, total = self.get_filtered_results({
"project": self.params.project_id
})

# Initialize progress
self.set_progress(0, total, self.progress.DATASET_CONVERSION)

# Process items
exported = []
for i, item in enumerate(results, 1):
exported.append(self._convert_item(item))
self.set_progress(i, total, self.progress.DATASET_CONVERSION)

return {
"format": self.params.output_format,
"exported_count": len(exported),
"data": exported
}

def _convert_item(self, item: dict) -> dict:
# Conversion logic here
return item

: 복잡한 에러 복구가 필요 없고 export 로직이 단일 메서드에 들어맞을 때 Simple 모드를 사용하세요.

Step 기반 모드

여러 단계, progress 추적, 실패 시 자동 rollback 이 필요한 복잡한 워크플로우에는 setup_steps() 를 사용합니다.

from pathlib import Path

from synapse_sdk.plugins.actions.export import BaseExportAction, ExportContext
from synapse_sdk.plugins.steps import BaseStep, StepResult, StepRegistry

# Define workflow steps
class FetchDataStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "fetch_data"

@property
def progress_weight(self) -> float:
return 0.2 # 20% of total progress

def execute(self, context: ExportContext) -> StepResult:
filters = context.params.get("filters", {})
context.results, context.total_count = context.client.get_assignments(filters)
return StepResult(success=True)

class ConvertFormatStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "convert_format"

@property
def progress_weight(self) -> float:
return 0.6 # 60% of total progress

def execute(self, context: ExportContext) -> StepResult:
converted = []
for item in context.results:
converted.append(self._convert(item))
context.exported_count = len(converted)
return StepResult(success=True, data={"converted": converted})

def _convert(self, item: dict) -> dict:
return item

class SaveOutputStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "save_output"

@property
def progress_weight(self) -> float:
return 0.2 # 20% of total progress

def execute(self, context: ExportContext) -> StepResult:
context.output_path = "/tmp/export_output.json"
return StepResult(success=True)

def rollback(self, context: ExportContext, result: StepResult) -> None:
# Clean up output file on failure
if context.output_path:
Path(context.output_path).unlink(missing_ok=True)

# Register steps in action
class StepBasedExportAction(BaseExportAction[ExportParams]):
action_name = "step_export"
params_model = ExportParams

def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchDataStep())
registry.register(ConvertFormatStep())
registry.register(SaveOutputStep())

Step 기반 모드의 이점:

  • 자동 progress: step weight 로부터 진행률이 계산됨
  • 자동 rollback: 실패 시 rollback() 이 역순으로 호출됨
  • 재사용 가능한 step: 여러 export action 에서 step 공유
  • 테스트 가능한 단위: 각 step 을 독립적으로 테스트

Export 타깃

Export action 은 세 가지 데이터 타깃을 지원합니다.

Assignment

어노테이션 작업 (labeling 결과, review 등) 을 export 합니다.

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)

Ground Truth

ML 학습을 위해 큐레이션된 ground truth 데이터셋을 export 합니다.

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
events = self.client.list_ground_truth_events(
params=filters,
list_all=True
)
return events, len(events)

Task

task 메타데이터와 설정을 export 합니다.

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_tasks(filters)

대량 데이터 무중단 export (Large-volume resilient export)

Export 는 한 프로젝트/데이터셋의 전체 task / assignment / ground-truth 를 빠짐없이 한 번 회수해야 한다. 데이터가 커질수록 단순 REST 순회는 (1) HTTP 왕복 수 증가, (2) cursor 열화, (3) payload 의 API 티어 통과, (4) 일시적 네트워크/DB 장애 취약 — 한 번의 페이지 실패로 export 전체가 중단된다. v2 export 는 이를 4가지 기법으로 완화한다.

1. 2-phase: list(slim) → bulk-fetch

slim list 로 id 만 모은 뒤(_collect_then_bulk 헬퍼) chunk(≤200) 단위 bulk-fetch 로 heavy payload 를 회수한다. list 응답에 무거운 data 를 싣지 않아 page 비용이 작다.

2. 유니크 keyset cursor — offset 열화 제거

list 정렬을 비유니크 -created 가 아니라 유니크 PK keyset (sort='-id') 으로 한다. DRF CursorPagination 은 cursor position 이 단일 필드라, 비유니크 정렬에서 동일 값 군집을 offset 으로 처리 → 깊은 페이지가 점진적으로 느려진다. 유니크 키로 정렬하면 매 페이지가 인덱스 seek (WHERE project=X AND id < cursor ORDER BY id DESC LIMIT N) 로 일정 비용을 유지한다. backend 에 (project, id) 복합 인덱스가 동반된다(synapse-backend).

  • Task / Assignment: sort='-id' 적용.
  • GroundTruth: ground_truth_events 가 이미 -pgh_id (event PK) keyset 이라 변경 불요.

3. presigned bulk — payload 를 API 가 아닌 object storage 에서

bulk 호출을 response_mode='presigned' 로 보내면 외재화된 data 가 presigned URL wrapper 로 돌아온다. 핸들러는 _resolve_data_payload() 로 이를 감지해 synapse_sdk.utils.file.download.download_json(presigned_url)S3 에서 직접 실제 payload 를 받는다.

  • 대용량 data 가 API 티어를 통과하지 않고, inline 한도(*_INLINE_MAX_BYTES) 로 잘리지 않는다.
  • 외재화 안 된 row 는 inline raw dict 로 폴백 — 판별은 data.get('mode') in {'presigned','inline'} 보수 가드. forward-compatible.

4. bounded retry — 일시적 실패에 무중단

Export list paginator pass-through 의 retry_on_status502/503/504 에 더해 500 을 포함하고 page_retries=3 를 보장한다(EXPORT_LIST_RETRY_ON_STATUS). 느린 쿼리 중 DB 연결 끊김성 500 / 게이트웨이 일시 오류에 export 가 죽지 않고 페이지 단위로 재시도한다. bulk-fetch 사이에는 throttle_seconds 로 back-pressure 를 유지한다(Task 0.2s 등). 전역 DEFAULT_RETRY_ON_STATUS 는 불변 — export 한정 정책이다.

적용 범위

page_retries / retry_on_status 는 export list paginator pass-through 로 전달된다(v2 transport 기본값에 더해 export 한정 정책). 다른 v2 consumer 동작은 영향받지 않는다.

한계와 향후 작업 (TODO)

위 기법들은 REST 를 유지한 점진적 개선으로 헤드룸을 확보하지만, 행 단위로 REST 를 통해 끌어오는 방식 자체는 요청 분량에 선형 비례하는 구조적 한계가 남는다. 근본 확장 해법은 별도 작업(익스포트 안정성/속도 개선)으로 추적한다:

  • (A안) 서버사이드 비동기 export-to-storage — backend AsyncJob 이 서버 keyset 스캔 한 번으로 전체를 raw bundle(JSONL + files + manifest)로 storage 에 기록 → SDK 는 enqueue + 진행률 스트리밍 후 bundle 을 읽어 공유 transform tail(ConvertData → SaveFiles → Finalize) 로 플러그인 산출물을 만든다. N 회 왕복 / 클라 타임아웃 / 연결 끊김 취약성 제거. 임계 초과 시 투명 위임으로 SYN-7104 P2 에서 1차 도입됨 — 아래 대량 export 의 서버사이드 위임 참고. (확장성 최종 해법)
  • (B안) 스트리밍 응답(NDJSON) — backend StreamingHttpResponse + 서버 keyset 단일 커서로 한 connection 에 전량 스트리밍 (중간 옵션).
  • (C안) GroundTruthDatasetVersion 스냅샷 사전 생성 — 불변 GTDV 의 export 아티팩트를 version release 시점에 미리 S3 에 생성 → train/export 는 다운로드만.
  • presigned 다운로드 병렬화 — 현재 _resolve_data_payload 는 per-item 직렬. 외재화 본격 도입 시 chunk 병렬 다운로드로 전환.
  • export read timeout / page_retries / throttle_secondsplugin config 노출 — 현재 핸들러 상수(EXPORT_*).

대량 export 의 서버사이드 위임 (Backend async-job)

SYN-7182 — 전 수량 delegate · in-process 경로 deprecated

[SYN-7182] 이제 모든 export 가 수량과 무관하게 서버사이드로 위임된다. 아래 이 절의 "count > threshold 초과 시에만 위임" 서술은 과거(임계 기반) 동작이며, 임계(export_async_job_size_threshold / SYNAPSE_EXPORT_ASYNC_JOB_SIZE_THRESHOLD / ExportParams.async_job_size_threshold)는 deprecated — no-op 다. 위임 게이트는 이제 ① 위임 enabled, ② v2_client 배선, ③ project_id 해소 세 가지만 본다(건수 무관). count 는 분할 fan-out·progress 용으로 여전히 프로브되나 결정에 영향을 주지 않는다.

비-delegate(in-process) 경로는 deprecated 다(향후 릴리즈에서 제거 예정). v2 list 표면이 선택 필터 ids/ids_ex/search 를 떨궈(전역 PrimaryKeyFilterBackend/SearchFilter 누락) 선택/검색 export 시 프로젝트 전량을 silent over-export 하는 정확성 결함이 있다. 따라서 in-process 는 위임 불가 상황(v2_client 미배선 / kill-switch export_disable_async_delegate / project_id 미해소 — 예: ground_truth target)에서만 폴백으로만 동작하며, 진입 시 DeprecationWarning + 워커 로그 경고를 사유와 함께 발생시킨다. 권장: 위임을 활성 상태로 유지하라.

위의 v2 2-phase 경로(list → bulk-fetch)는 SDK(ray job)가 v2 REST 로 데이터를 끌어와 in-process 로 변환한다. 10만 건 이상 + 원본 파일이 큰 export 에서는 이 끌어오기 자체가 비용·취약성의 핵심이다. SYN-7104 P2 는 이를 한 단계 더 끌어올려, 대상 건수가 임계를 넘으면 데이터 취득(acquisition)을 backend async-job 에 위임(서버사이드 keyset 스캔 → raw bundle 기록)한다. 단, 위임은 데이터를 어떻게 취득하는가의 차이일 뿐이며, 위임/비위임 양쪽 모두 동일한 공유 transform tail 을 거쳐 동일한 플러그인 산출물을 만든다(투명 위임 · FR-8 parity).

이 위임은 기존 v2 2-phase 경로 위에 얹히는 추가 레이어다 — 임계 미만 export 는 종전과 동일하게 in-process head(list → bulk-fetch)로 데이터를 취득하고, 임계 초과 export 는 delegated head 로 취득한다. 어느 경로든 취득 후에는 같은 공유 tail(ConvertData → SaveFiles → Finalize) 이 실행되므로 플러그인 커스텀 로직이 양쪽에서 동일하게 적용된다.

기본 ON · 투명 위임 (staging 검증 권장)

위임 게이트는 **기본 활성(ON)**이다. 위임은 데이터 취득 경로만 서버사이드로 바꿀 뿐, 산출물은 비위임 경로와 동일한 플러그인 포맷이다 — 위임 경로도 비위임과 같은 공유 transform tail(ConvertData → SaveFiles) 을 거치기 때문이다(아래 투명 acquisition 과 공유 tail 참고). backend 가 쓰는 raw bundle 은 위임 head 의 중간 산출물이며, SDK 가 이를 읽어(BundleRead) tail 로 전달한다. 즉시 in-process 취득으로 되돌리려면 아래 kill-switch 를 사용한다. 신규 위임 경로는 프로덕션 반영 전 staging 검증을 권장한다.

위임 게이트와 임계

setup_steps() 는 위임 여부로 분기하지 않는다 — 항상 단일 고정 9-step registry 를 등록한다(아래 단일 고정 step registry 참고). 위임 결정(_resolve_delegation_decision())은 run() 시점에 한 번 평가되어 context.delegation_decision 에 seed 되고, 각 head step 의 can_skip 이 이를 참조해 비활성 head 를 완료-pass 한다. 위임은 아래 조건을 모두 만족할 때만 발동한다.

조건설명
위임 enabledkill-switch 가 꺼져 있어야 한다(기본 ON).
v2_client 존재ctx.v2_client 가 wiring 되어 있어야 한다(Runtime v2 Client 참고).
count > threshold[SYN-7182 deprecated] 더 이상 게이트가 아니다 — 수량 무관하게 위임한다.
project_id 확보위임 enqueue 에 필요한 프로젝트 식별자가 있어야 한다(없으면 deprecated in-process 폴백 — 예: ground_truth).

건수 산정은 풀스캔 없이 v1 단일페이지 count 프로브로 한다 — 첫 페이지의 count 만 읽고, count 를 못 구하면(프로브 실패) 비위임으로 폴백한다. 임계 기본값은 1,000이며 다음으로 오버라이드한다.

항목ExportParams 필드extra_params / config 키env
임계 overrideasync_job_size_thresholdexport_async_job_size_thresholdSYNAPSE_EXPORT_ASYNC_JOB_SIZE_THRESHOLD
kill-switch (즉시 in-process 복귀)disable_async_delegateexport_disable_async_delegateSYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1

우선순위(높음 → 낮음): ExportParams 명시 필드 > params.extra_params 의 동일 키 > 액션 config > env > 모듈 기본(1,000). ExportParams.async_job_size_threshold / disable_async_delegate 는 모두 optional 이며 기본 None — 미지정 시 종전대로 env/기본값으로 폴백한다(BC). 즉 export config 만으로 env 변경 없이 위임 임계를 조정하거나 위임을 끌 수 있다.

Kill-switch — 즉시 종전 경로 복귀

SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1(env) · config export_disable_async_delegate · ExportParams.disable_async_delegate=True어느 하나라도 켜면 게이트가 즉시 닫히고, 모든 export 가 임계와 무관하게 기존 in-process 경로로 처리된다. 인시던트 시 PR revert 없이 롤백할 수 있다(v2 export 의 SYNAPSE_FORCE_V1_EXPORT kill-switch 와 동일한 운영 패턴). 운영자 env kill-switch 가 켜져 있으면 export config 로는 재활성할 수 없다(env 우선) — config/필드는 위임을 끌 수만 있고 다시 켜지는 못한다.

단일 고정 step registry (9-step)

setup_steps() 는 위임 여부로 분기하지 않고 항상 동일한 9개 step 을 등록한다. 종전의 "분기형 registry(위임 3-step OR 비위임 6-step)" 모델은 모드에 따라 job 의 step 목록 자체가 달라져, 긴 acquisition 이 전체 진행률을 정체시켰다. 단일 고정 registry 는 이를 제거한다 — registry 는 항상 9-step 이고, 위임 결정에 따라 비활성 head 의 각 step 만 can_skip 으로 완료-pass(skip → 100%, orchestrator 가 보고) 된다.

9-step 은 두 acquisition head + 공유 tail 로 구성된다:

  • In-process head: Initialize → FetchResults → PrepareExport — v2 list → bulk-fetch 로 데이터 취득.
  • Delegated head: DelegateEnqueue → DelegateStreamProgress → BundleRead — backend async-job enqueue → SSE 진행률 → raw bundle 읽기.
  • Shared tail (plugin transform): ConvertData → SaveFiles → Finalize두 head 가 공유하는 동일 step 클래스. 플러그인 커스텀 transform 이 위임/비위임 양쪽에서 동일하게 실행된다(FR-8 투명성).

고정 9-step — 비활성 head 의 step 은 can_skip(=is_delegated(context) 또는 그 역)으로 완료-pass:

영역Step역할위임 시비위임 시
in-process headInitializestorage/path 초기화.skip(pass)실행
in-process headFetchResultsv2 list → bulk-fetch 로 결과 회수.skip(pass)실행
in-process headPrepareExportexport params build · project config.skip(pass)실행
delegated headDelegateEnqueueplugin_exports.create 로 export job enqueue.실행skip(pass)
delegated headDelegateStreamProgressasync_jobs.stream_progress (SSE) 진행률 수신.실행skip(pass)
delegated headBundleReadraw bundle(JSONL+files+manifest) 읽기 + url→bundle_path rewrite.실행skip(pass)
shared tailConvertDataplugin 커스텀 transform.실행실행
shared tailSaveFiles로컬 파일 저장.실행실행
shared tailFinalize마무리 · cleanup.실행실행

투명 acquisition 과 공유 tail

위임/비위임은 "데이터를 어떻게 취득하는가"의 차이일 뿐이며 산출물은 동일하다(FR-8 parity). 핵심은 공유 transform tail 이다 — 플러그인이 template 기반으로 override 하는 convert_data 등 커스텀 처리는 ConvertData → SaveFiles → Finalize 에서 실행되며, 이 tail 은 위임/비위임 양쪽 모두 거친다. 따라서 위임 export 도 플러그인 커스텀 로직이 그대로 적용된다(이것이 분기형(Option A)이 아니라 통합 파이프라인(Option B)을 채택한 이유다).

비위임 (in-process head)위임 (delegated head)
데이터 취득 위치SDK(ray job) — v2 list → bulk-fetchbackend async-job — 서버 keyset 스캔 → raw bundle
취득 중간 산출물in-process iteratorraw bundle(JSONL + files + manifest), SDK 가 BundleRead 로 읽음
ConvertData (plugin transform)적용됨적용됨 (공유 tail)
SaveFiles (로컬 파일 저장)적용됨적용됨 (공유 tail)
최종 산출물plugin 포맷동일한 plugin 포맷

bundle-read 계약

위임 시 backend 는 {prefix}/{tasks|assignments|ground_truths}.jsonl 를 쓰고, 각 라인은 {id, data, files_manifest: {spec: {file_name_original, url, bundle_path}}} 형태다. SDK 의 BundleRead step 은 map_files_to_bundle_local 로 각 항목의 url{prefix}/{bundle_path} 로 rewrite(traversal-guard 적용)해 로컬 bundle 을 가리키게 하고, 출력 파일명은 file_name_original 을 유지한다. rewrite 된 항목은 공유 tail 의 ConvertData/SaveFiles 로 전달되어 비위임과 동일하게 처리된다.

관측성

delegated head step 도 in-process head 와 동등한 관측성을 남긴다 — 즉 위임이라고 해서 진행률·메트릭·이벤트 로그가 비는 일은 없다. (취득 이후의 공유 tail 관측성은 위임/비위임 공통이다.)

신호API비고
진행률set_progress(..., step='server_export')step 이름은 server_export.
메트릭set_metrics(...)success / failed.
job-log 이벤트export_info / export_completed / export_failed
user-facing 메시지log_message(...)EXPORT_DELEGATED_TO_SERVER / EXPORT_DELEGATED_COMPLETED.

역할 분담: backend 는 progress(processed/total)만 SSE 로 흘리고, event / metric / user-facing 메시지는 SDK 가 전담한다. 따라서 DelegateStreamProgress 는 SSE 진행률을 set_progress 로 중계하고, enqueue / bundle-read step 이 이벤트·메트릭·메시지를 마무리한다.

v2 client resource

delegated head 는 BackendV2Client 의 다음 resource 를 사용한다(상세 wiring 은 Runtime v2 Client 참고).

Resource용도
v2_client.plugin_exports.create(...)export job enqueue (DelegateEnqueue).
v2_client.async_jobs.stream_progress(job_id)SSE 진행률 스트림 (DelegateStreamProgress).
v2_client.async_jobs.retrieve(job_id)manifest → bundle prefix 확정, BundleRead 가 raw bundle 을 읽어 tail 로 전달.

예제

예제: COCO 포맷 Exporter

어노테이션을 COCO 포맷으로 export 하는 완전한 예제입니다.

from pydantic import BaseModel, Field
from synapse_sdk.plugins.actions.export import BaseExportAction
from typing import Any

class CocoExportParams(BaseModel):
project_id: int = Field(..., description="Project ID to export")
include_images: bool = Field(True, description="Include image metadata")

class CocoExportAction(BaseExportAction[CocoExportParams]):
action_name = "coco_export"
params_model = CocoExportParams

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
return self.client.get_assignments(filters)

def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results({
"project": self.params.project_id
})

self.set_progress(0, total, self.progress.DATASET_CONVERSION)

coco_data = {
"info": {"description": "Exported from Synapse"},
"images": [],
"annotations": [],
"categories": []
}

annotation_id = 0
category_map = {}

for i, assignment in enumerate(results, 1):
# Add image
data_unit = assignment.get("data_unit", {})
image_id = data_unit.get("id")

if self.params.include_images:
coco_data["images"].append({
"id": image_id,
"file_name": data_unit.get("name", ""),
"width": data_unit.get("width", 0),
"height": data_unit.get("height", 0)
})

# Add annotations
for ann in assignment.get("data", {}).get("annotations", []):
category = ann.get("category", "default")
if category not in category_map:
cat_id = len(category_map)
category_map[category] = cat_id
coco_data["categories"].append({
"id": cat_id,
"name": category
})

coco_data["annotations"].append({
"id": annotation_id,
"image_id": image_id,
"category_id": category_map[category],
"bbox": ann.get("bbox", []),
"area": ann.get("area", 0),
"iscrowd": 0
})
annotation_id += 1

self.set_progress(i, total, self.progress.DATASET_CONVERSION)

return {
"format": "coco",
"exported_count": len(coco_data["images"]),
"annotation_count": len(coco_data["annotations"]),
"data": coco_data
}

예제: Step 을 사용한 다중 포맷 Exporter

여러 출력 포맷을 지원하는 유연한 exporter 입니다.

from enum import Enum
from pydantic import BaseModel, Field
from synapse_sdk.plugins.actions.export import BaseExportAction, ExportContext
from synapse_sdk.plugins.steps import BaseStep, StepResult, StepRegistry

class OutputFormat(str, Enum):
COCO = "coco"
YOLO = "yolo"
CSV = "csv"

class MultiFormatParams(BaseModel):
project_id: int
output_format: OutputFormat = OutputFormat.COCO
output_dir: str = Field("/tmp/export", description="Output directory")

class FetchStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "fetch"

@property
def progress_weight(self) -> float:
return 0.3

def execute(self, context: ExportContext) -> StepResult:
project_id = context.params["project_id"]
context.results, context.total_count = context.client.get_assignments({
"project": project_id
})
return StepResult(success=True)

class ConvertStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "convert"

@property
def progress_weight(self) -> float:
return 0.5

def execute(self, context: ExportContext) -> StepResult:
output_format = context.params["output_format"]

if output_format == OutputFormat.COCO:
converted = self._to_coco(context.results)
elif output_format == OutputFormat.YOLO:
converted = self._to_yolo(context.results)
else:
converted = self._to_csv(context.results)

context.exported_count = context.total_count
return StepResult(success=True, data={"output": converted})

def _to_coco(self, results) -> dict:
# COCO conversion logic
return {"format": "coco", "images": [], "annotations": []}

def _to_yolo(self, results) -> list:
# YOLO conversion logic
return []

def _to_csv(self, results) -> str:
# CSV conversion logic
return "id,label,x,y,width,height\n"

class SaveStep(BaseStep[ExportContext]):
@property
def name(self) -> str:
return "save"

@property
def progress_weight(self) -> float:
return 0.2

def execute(self, context: ExportContext) -> StepResult:
from pathlib import Path
import json

output_dir = Path(context.params["output_dir"])
output_dir.mkdir(parents=True, exist_ok=True)

output_format = context.params["output_format"]
output_data = context.step_results[-1].data["output"]

if output_format in (OutputFormat.COCO,):
output_path = output_dir / "annotations.json"
output_path.write_text(json.dumps(output_data, indent=2))
else:
output_path = output_dir / f"output.{output_format}"
output_path.write_text(str(output_data))

context.output_path = str(output_path)
return StepResult(success=True)

class MultiFormatExportAction(BaseExportAction[MultiFormatParams]):
action_name = "multi_format_export"
params_model = MultiFormatParams

def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
registry.register(FetchStep())
registry.register(ConvertStep())
registry.register(SaveStep())

모범 사례

메모리 효율적 처리

대용량 데이터셋은 모든 것을 메모리에 올리지 않도록 generator 를 사용하세요.

def get_filtered_results(self, filters: dict) -> tuple[Any, int]:
# Return a generator, not a list
results = self.client.get_assignments_stream(filters)
count = self.client.count_assignments(filters)
return results, count

def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results(self.params.filters)

for i, item in enumerate(results, 1): # Iterate without loading all
self._process_item(item)
self.set_progress(i, total, self.progress.DATASET_CONVERSION)

Progress 리포팅

매 항목마다가 아니라 의미 있는 간격으로 progress 를 리포팅하세요.

def execute(self) -> dict[str, Any]:
results, total = self.get_filtered_results(filters)
report_interval = max(1, total // 100) # Report every 1%

for i, item in enumerate(results, 1):
self._process(item)
if i % report_interval == 0 or i == total:
self.set_progress(i, total, self.progress.DATASET_CONVERSION)

에러 복구

step 기반 워크플로우에서 부분 실패를 우아하게 처리하세요.

class ConvertStep(BaseStep[ExportContext]):
def execute(self, context: ExportContext) -> StepResult:
converted = []
errors = []

for item in context.results:
try:
converted.append(self._convert(item))
except ValueError as e:
errors.append(f"Item {item['id']}: {e}")

context.exported_count = len(converted)

if errors:
context.errors.extend(errors)
# Continue with partial results
return StepResult(
success=True,
data={"converted": converted, "errors": errors}
)

return StepResult(success=True, data={"converted": converted})

경고: 결과를 반환하기 전에 항상 출력 포맷의 규격 준수를 검증하세요. 잘못된 export 는 ML 파이프라인의 downstream 실패를 유발할 수 있습니다.

BaseExporter 를 사용한 템플릿 기반 Export

exporter 구축에 더 단순한 템플릿 기반 접근을 원하는 plugin 개발자를 위해, BaseExporter 는 사전 구축된 파일 처리 유틸리티와 함께 익숙한 인터페이스를 제공합니다.

개요

BaseExporter 는 다음이 필요한 export plugin 을 위해 설계되었습니다:

  • 원본 파일 다운로드 및 저장
  • 에러 추적이 포함된 JSON 데이터 export
  • progress 및 metric 리포팅
  • 커스터마이즈 가능한 데이터 변환 파이프라인

BaseExporter 클래스

from synapse_sdk.plugins.actions.export import BaseExporter

class BaseExporter:
def __init__(
self,
ctx: RuntimeContext,
export_items: Generator,
path_root: Path,
**params
):
self.ctx = ctx
self.export_items = export_items
self.path_root = Path(path_root)
self.params = params
self.run = ExporterRunAdapter(ctx.logger) # Legacy compatibility

템플릿 메서드

Export 동작을 커스터마이즈하려면 다음 메서드를 오버라이드하세요:

메서드설명
convert_data(data)export 중 데이터 변환
before_convert(data)변환 전 데이터 전처리
after_convert(data)변환 후 데이터 후처리
save_original_file(result, base_path, error_list)원본 파일 저장
save_as_json(result, base_path, error_list)데이터를 JSON 으로 저장
setup_output_directories(path, save_original)디렉토리 구조 커스터마이즈
process_file_saving(...)커스텀 파일 저장 로직
additional_file_saving(path)export 후 파일 작업

ExporterRunAdapter

run 속성은 로깅 및 progress 추적 메서드를 제공합니다:

# Log messages
self.run.log_message("Processing item...")

# Track progress
self.run.set_progress(current, total, step="dataset_conversion")

# Log metrics
record = self.run.MetricsRecord(stand_by=100, success=0, failed=0)
self.run.log_metrics(record, category="original_file")

# Log file export status
self.run.export_log_original_file(item_id, file_info, ExportStatus.SUCCESS, "")
self.run.export_log_data_file(item_id, file_info, ExportStatus.SUCCESS, "")

MetricsRecord

MetricsRecord 로 export 진행 상황을 추적하세요:

from synapse_sdk.plugins.actions.export import MetricsRecord

record = MetricsRecord(stand_by=100, success=0, failed=0)

# Update as items are processed
record.stand_by -= 1
record.success += 1

# Log metrics
self.run.log_metrics(record, category="data_file")

# Convert to dict
print(record.to_dict()) # {'stand_by': 99, 'success': 1, 'failed': 0}

ExportStatus Enum

파일 export 상태를 추적합니다:

from synapse_sdk.plugins.actions.export import ExportStatus

ExportStatus.SUCCESS # 'success'
ExportStatus.FAILED # 'failed'
ExportStatus.STAND_BY # 'stand_by'

예제: 커스텀 Exporter Plugin

BaseExporterExportAction 을 사용하는 완전한 예제:

from pathlib import Path
from typing import Any, Generator

from synapse_sdk.plugins.actions.export import BaseExporter, ExportAction

class MyExporter(BaseExporter):
"""Custom exporter with data transformation."""

def convert_data(self, data: dict[str, Any]) -> dict[str, Any]:
"""Transform annotation data to custom format."""
return {
"id": data.get("id"),
"filename": data.get("files", {}).get("file_name_original"),
"annotations": self._transform_annotations(data.get("data", {}))
}

def _transform_annotations(self, data: dict) -> list:
# Custom transformation logic
return data.get("annotations", [])

def additional_file_saving(self, unique_export_path: Path) -> None:
"""Save metadata file after export."""
import json
metadata = {
"export_name": self.params.get("name"),
"total_items": self.params.get("count"),
"format_version": "1.0"
}
with (unique_export_path / "metadata.json").open("w") as f:
json.dump(metadata, f, indent=2)


class MyExportAction(ExportAction):
"""Export action using custom exporter."""

action_name = "my_export"

@property
def entrypoint(self):
return MyExporter

Plugin 설정

config.yaml 에서 action 을 설정합니다:

name: my_export_plugin
code: my_export_plugin
version: 1.0.0
category: export

actions:
export:
entrypoint: plugin.export.MyExportAction
annotation_types:
- image
- video

로컬에서 실행

Export plugin 을 테스트합니다:

synapse plugin run --mode local export --params '{
"storage": 1,
"name": "My Export",
"save_original_file": true,
"path": "exports/my-export",
"target": "task",
"filter": {"project": 123},
"extra_params": {}
}'

Target Handler

ExportAction 은 target handler 를 사용해 서로 다른 소스에서 데이터를 가져옵니다.

TargetHandlerFactory

from synapse_sdk.plugins.actions.export import TargetHandlerFactory

# Get handler for target type
handler = TargetHandlerFactory.get_handler("assignment")
handler = TargetHandlerFactory.get_handler("ground_truth")
handler = TargetHandlerFactory.get_handler("task")

사용 가능한 Handler

HandlerTarget설명
AssignmentExportTargetHandlerassignment어노테이션 assignment export
GroundTruthExportTargetHandlerground_truthground truth 데이터셋 export
TaskExportTargetHandlertasktask 데이터 export

커스텀 Target Handler

커스텀 데이터 소스를 위해 ExportTargetHandler 를 구현하세요:

from synapse_sdk.plugins.actions.export import ExportTargetHandler

class CustomTargetHandler(ExportTargetHandler):
def get_results(self, client, filters: dict) -> tuple[Any, int]:
# Fetch data from custom source
results = client.custom_api_call(filters)
return iter(results), len(results)

def validate_filter(self, filters: dict, client) -> dict:
# Validate filter parameters
if "required_field" not in filters:
raise ValueError("required_field is required")
return filters

def get_export_item(self, results) -> Generator:
# Transform results for export
for item in results:
yield self._transform(item)

관련 문서