본문으로 건너뛰기

v2 Export 마이그레이션

본 문서는 SDK 의 export 파이프라인을 v2 백엔드의 bulk-fetch endpoint 로 전환한 작업을 설명합니다. Phase 1 (RuntimeContext.v2_client wiring + failure_classifier v2 error code 지원) 이 main 에 머지된 상태에서, 단일 통합 PR (feat/SYN-6919-sdk-v2-export-migration) 로 cut-over 했습니다.

본 변경은 plugin 작성자 입장에서 additive 입니다 — 기존 caller 는 SYNAPSE_FORCE_V1_EXPORT kill-switch 또는 runtime context 에 v2_client 를 주입하지 않는 것만으로 그대로 동작합니다.

대체됨 — category 별 v2 opt-in

SYN-6919 는 원래 v2_client 가 wired 되어 있으면 마이그된 3개 caller 모두 v2 로 라우팅했습니다. SYN-7082 가 이를 좁혔습니다: 이제 v2 선택은 plugin action category 단위로 게이팅됩니다. 기본값은 v1 이며, EXPORT category 만 v2 로 opt-in 합니다. 결과적으로:

  • export/handlers.py → 여전히 v2 (불변).
  • to_task/steps/fetch_tasks.pyv1 회귀 (SYN-7082 행동 변경).
  • dataset/action.py _download_splitv1 회귀 (SYN-7082 행동 변경).

권위 있는 정책은 아래 category 별 v2 opt-in 을 참고하세요. SSOT 는 synapse_sdk/plugins/actions/_v2_switch.py 입니다.

요약

  • SDK 클라이언트에 4개의 v2 backend endpoint 추가:
    • client.v2_client.tasks.bulk_fetch(ids)POST /v2/tasks/bulk-fetch/
    • client.v2_client.assignments.bulk_fetch(ids)POST /v2/assignments/bulk-fetch/
    • client.v2_client.ground_truths.bulk_data(ids)POST /v2/ground-truths/bulk-data/
    • client.v2_client.ground_truth_events.list(...) + bulk_data(pgh_ids)
  • use_v2 스위치 하에 3개 caller 마이그레이션 (SYN-7082 이후 라우팅):
    • export/handlers.py (Task / Assignment / GroundTruth handler) → v2 (use_v2(ctx, category=PluginCategory.EXPORT))
    • to_task/steps/fetch_tasks.pyv1 (use_v2(ctx), category 미전달 — SYN-7082 회귀)
    • dataset/action.py (_download_split) → v1 (use_v2(ctx), category 미전달 — SYN-7082 회귀)
  • nested accessor 1개:
    • client.v2_client.ground_truth_datasets.versions(ds).events(ver).list(...)
  • runtime 롤백 kill-switch 1개:
    • SYNAPSE_FORCE_V1_EXPORT=1EXPORT 포함 모든 caller 를 v1 로 회귀.

v1 → v2 caller 패턴

v2 endpoint 는 기존 단일 호출 fetch 를 2-phase 패턴으로 분리합니다:

Phasev1 (단일 호출)v2 (2-phase)
Listclient.list_tasks(params, list_all=True) — heavy row 반환client.v2_client.tasks.list(..., list_all=True) — slim row 반환
Hydrate(위 호출에 포함)client.v2_client.tasks.bulk_fetch(ids) chunk 별 호출

공용 helper _collect_then_bulk 가 이 패턴을 codify 합니다:

from synapse_sdk.plugins.actions._v2_switch import _collect_then_bulk

rows = _collect_then_bulk(
list_method=lambda: client.v2_client.tasks.list(project=42, list_all=True),
bulk_method=lambda ids: client.v2_client.tasks.bulk_fetch(ids),
ids_per_batch=200,
)

기본 extractor 는 dict row 와 v2 Pydantic list model 양쪽을 모두 처리합니다. 즉 task / assignment list row 가 {"id": ...} 형태의 dict 로 오거나 TaskV2List / AssignmentV2List 처럼 .id attribute 를 가진 object 로 오더라도, helper 는 동일하게 id 를 수집한 뒤 bulk_fetch 를 호출합니다.

Backend 는 bulk_fetch / bulk_data 1회 호출 당 200 ids 까지 처리합니다 (V2BulkFetchMixin.BULK_FETCH_MAX_IDSapps/shared/api/viewsets/bulk_fetch.py). Helper 기본값은 이 cap 과 동일한 200 입니다. 더 큰 chunk 를 보내면 backend 가 400 ids_limit_exceeded 로 응답합니다. per-call 압박을 줄여야 하는 경우 더 작은 값을 전달할 수 있지만, 200 을 초과할 수는 없습니다.

ground_truth_events 의 primary key 는 id 가 아닌 pgh_id 입니다. extract_id callable 을 전달하세요. custom extractor 역시 dict/model 양쪽과 호환되게 작성해야 합니다:

def item_field(item, field):
if isinstance(item, dict):
return item[field]
return getattr(item, field)


rows = _collect_then_bulk(
list_method=lambda: client.v2_client.ground_truth_events.list(
ground_truth_dataset_version=99,
list_all=True,
),
bulk_method=lambda pgh_ids: client.v2_client.ground_truth_events.bulk_data(pgh_ids),
extract_id=lambda item: item_field(item, 'pgh_id'),
)

v2 path 의 throttle / retry 계약

v1 export handler 는 _apply_export_params 를 통해 EXPORT_THROTTLE_SECONDS / EXPORT_PAGE_RETRIES / EXPORT_READ_TIMEOUT 를 cursor paginator 에 전달, page fetch 페이싱을 조절했습니다. v2 path 는 list endpoint cursor pagination (resource layer 가 처리) + bulk-fetch sequence 의 조합이므로 _apply_export_params 의 wholesale 적용이 불가합니다.

마이그된 handler 는 bulk-fetch sequence 의 v1 inter-call throttle 을 보존합니다 — EXPORT_THROTTLE_SECONDS_collect_then_bulk(..., throttle_seconds=...) 로 forward 함. v1 production exporter 에서 검증된 per-handler back-pressure (Task = 0.2s, GroundTruth = 0.1s, Assignment = 0.1s — 첫 호출 이전 / 마지막 호출 이후에는 sleep 없음) 가 그대로 유지됩니다.

EXPORT_PAGE_RETRIES / EXPORT_READ_TIMEOUT 은 v2 client 까지 plumb 되지 않습니다 — v2 transport (BackendV2Client._request) 가 자체 retry + timeout policy 를 적용합니다 (v2 cursor paginator + HTTPX defaults). v2 retry semantics 를 조정하려면 handler 가 아닌 BackendV2Client 생성 위치에서 설정하세요.

다음 진화 — server-side async-job 위임 (SYN-7104 P2)

v2 2-phase path 는 여전히 SDK (ray job) 가 v2 REST 로 모든 row 를 끌어와 in-process 로 변환합니다. 10만 건 이상 + 원본 파일이 큰 export 에서는 이 끌어오기 자체가 비용·취약성의 핵심입니다. SYN-7104 P2 는 v2 export path 위에 backend async-job 위임 레이어를 얹습니다: 대상 건수가 임계를 넘으면 데이터 취득(acquisition) 을 backend async-job (서버사이드 keyset 스캔 → raw bundle) 에 넘기고, SDK 는 enqueue · 진행률 스트리밍 · bundle 읽기 후 비위임 경로와 동일한 공유 transform tail 을 실행합니다. 위임은 데이터를 어떻게 취득하는가 만 바꿀 뿐 산출물은 동일합니다 (투명 위임, FR-8 parity). 이는 Export Actions 의 future-work 항목으로 추적되던 "(A안) server-side async export-to-storage" 의 첫 도입입니다.

이 위임은 2-phase v2 path 를 대체하지 않고 그 위에 얹히는 추가 레이어입니다. 임계 미만 export 는 in-process head (v2 list → bulk-fetch) 로, 임계 초과 export 는 delegated head 로 데이터를 취득합니다. 어느 경로든 취득 후에는 같은 공유 tail (ConvertData → SaveFiles → Finalize) 이 실행되므로 플러그인 커스텀 로직이 양쪽에 동일하게 적용됩니다.

기본 ON · 투명 위임 (staging 검증 권장)

위임 게이트는 기본 활성(ON) 입니다. 위임은 데이터 취득 경로만 서버사이드로 옮길 뿐, 산출물은 비위임 경로와 동일한 plugin 포맷 입니다 — 위임 경로도 같은 공유 transform tail (ConvertDataSaveFiles) 을 거치기 때문입니다. backend 가 쓰는 raw bundle (JSONL + files + manifest) 은 delegated head 의 중간 산출물이며, SDK 가 이를 읽어 (BundleRead) tail 로 전달합니다. 즉시 in-process 취득으로 되돌리려면 아래 kill-switch 를 사용합니다. 신규 위임 경로는 프로덕션 반영 전 staging 검증을 권장합니다.

게이트, 임계, kill-switch

setup_steps() 는 위임 결정으로 분기하지 않고, 단일 고정 9-step registry 를 항상 등록합니다 (아래 고정 step registry 와 공유 tail 참고). 전체 게이트는 다음을 모두 만족할 때만 발동합니다 — 위임 enabled (kill-switch off) AND ctx.v2_client wired AND count > threshold (strict >) AND project_id 확보. 결정은 한 번 (_resolve_delegation_decision()) 평가되어 context.delegation_decision 에 seed 되고, 각 head step 의 can_skip 이 이를 참조해 비활성 head 를 완료-pass 합니다. 건수는 v1 단일페이지 count 프로브 (풀스캔 없음) 로 산정하며, 프로브가 count 를 못 구하면 export 는 비위임 (in-process head) 경로로 폴백합니다.

항목envconfig기본값
Size thresholdSYNAPSE_EXPORT_ASYNC_JOB_SIZE_THRESHOLDexport_async_job_size_threshold1000
Kill-switch (in-process 강제)SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1export_disable_async_delegateoff (위임 on)

이 kill-switch 는 SYNAPSE_FORCE_V1_EXPORT독립적 입니다. 둘은 조합됩니다:

SYNAPSE_FORCE_V1_EXPORTSYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATEExport path
offoffv2 2-phase, count > threshold 시 위임
offonv2 2-phase in-process (위임 비활성)
on(둘 중 무엇이든)v1 export path (v2 레이어 전체 롤백)
# 위임 레이어만 끄기 (v2 2-phase in-process 유지):
export SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1

# 또는 export 전체를 v1 으로 롤백 (위임도 함께 우회):
export SYNAPSE_FORCE_V1_EXPORT=1

고정 step registry 와 공유 tail

setup_steps()분기하지 않고 항상 같은 고정 9-step registry (두 acquisition head + 하나의 공유 transform tail) 를 등록합니다. (한 번 프로브된) 위임 결정은 각 head step 의 can_skip 만 뒤집어, 비활성 head 는 완료-pass (skip → 100%) 하고 공유 tail 은 양쪽 경로에서 실행 되어 동일한 plugin 포맷 산출물 을 만듭니다 (FR-8 parity):

영역Steps위임 시비위임 시
in-process headInitialize → FetchResults → PrepareExport (v2 list → bulk-fetch)skip (pass)실행
delegated headDelegateEnqueue (plugin_exports.create) → DelegateStreamProgress (async_jobs.stream_progress SSE) → BundleRead (raw bundle 읽기 + urlbundle_path rewrite)실행skip (pass)
공유 tailConvertData (plugin transform) → SaveFiles → Finalize실행실행

backend 가 쓰는 raw bundle (JSONL + files + manifest) 은 delegated head 의 중간 산출물입니다: BundleRead{prefix}/{tasks|assignments|ground_truths}.jsonl (각 라인 {id, data, files_manifest:{spec:{file_name_original, url, bundle_path}}}) 을 읽어, map_files_to_bundle_localurl{prefix}/{bundle_path} rewrite (traversal-guard, 출력 파일명은 file_name_original 유지) 한 뒤 공유 tail 로 전달합니다. 따라서 위임 export 도 raw bundle 이 아니라 plugin 포맷을 산출합니다.

delegated head 는 세 개의 BackendV2Client resource 를 사용합니다 — v2_client.plugin_exports.create(...), v2_client.async_jobs.stream_progress(job_id) (SSE), v2_client.async_jobs.retrieve(job_id). backend 는 progress (processed/total) 만 흘리고, event/metric/user-facing 로그는 SDK 가 전담 합니다 (set_progress step server_export, set_metrics, export_info/export_completed/export_failed, EXPORT_DELEGATED_TO_SERVER/EXPORT_DELEGATED_COMPLETED). 전체 계약은 Export Actions — 대량 export 의 서버사이드 위임Steps Workflow — fixed step registry 를 참고하세요.

Nested accessor

Events endpoint 는 flat resource 와 nested accessor 둘 다 제공합니다. 둘 모두 동일한 backend filter contract 를 사용합니다:

# Flat
client.v2_client.ground_truth_events.list(ground_truth_dataset_version=99)

# Nested (RPC-faithful URL)
client.v2_client.ground_truth_datasets.versions(7).events(99).list()

호출 context 가 이미 owning dataset / version 을 보유한 경우 nested 형식을, cross-dataset analytics 인 경우 flat 형식을 사용하세요.

category 별 v2 opt-in

SYN-7082 는 env 전역 스위치를 action category 별 정책으로 교체했습니다. 이 결정은 use_v2_V2_CATEGORY_POLICY 테이블 (synapse_sdk/plugins/actions/_v2_switch.py — 본 가이드의 SSOT) 에 중앙화되어 있습니다.

시그니처 변경

# 변경 전 (SYN-6919)
def use_v2(ctx) -> bool: ...

# 변경 후 (SYN-7082)
def use_v2(ctx, *, category: PluginCategory | None = None) -> bool: ...

정책 테이블

_V2_CATEGORY_POLICY: dict[PluginCategory, bool] = {
PluginCategory.EXPORT: True, # export action 만 v2 category.
}

기본값은 v1 입니다. category 가 v2 로 라우팅되려면 여기에 명시적 True 항목이 있고 동시에 그 callsite 가 일치하는 category= 를 전달해야 합니다. None / 미등록 category 는 항상 v1 로 fallback 합니다. 다른 category 를 v2 로 올리려면: 명시적 PluginCategory.X: True 항목을 추가하고 해당 callsite 에 category=PluginCategory.X 를 전달하세요.

Override 우선순위 (최상위 → 최하위)

#규칙결과
1SYNAPSE_FORCE_V1_EXPORT truthy (1 / true / yes / on, 대소문자 무관)v1 — 모든 caller runtime 롤백
2ctx.v2_client 없음 / Nonev1 — v2 client 미wiring
3category_V2_CATEGORY_POLICY lookup기본 v1; EXPORTv2

caller 별 라우팅

Caller호출라우팅
export/handlers.pyuse_v2(ctx, category=PluginCategory.EXPORT)v2 (불변)
to_task/steps/fetch_tasks.pyuse_v2(ctx) (category 미전달)v1 (SYN-6919 대비 회귀)
dataset/action.py _download_splituse_v2(ctx) (category 미전달)v1 (SYN-6919 대비 회귀)

to_task / dataset 의 v2 분기 코드는 삭제되지 않고 보존 (dead) 됩니다 — 재활성화하려면 해당 category 를 _V2_CATEGORY_POLICY 에 추가하고 callsite 에 category= 를 연결하면 됩니다. 두 callsite 모두 그 취지의 SYN-7082 소스 주석을 가지고 있습니다.

Kill-switch 운영

SYNAPSE_FORCE_V1_EXPORT 는 최상위 우선순위 runtime 롤백 path 로, category 정책을 완전히 덮어씁니다:

SYNAPSE_FORCE_V1_EXPORT효과
미설정 / "" / "0" / "false"category 정책 적용 — EXPORT → v2 (v2_client wiring 시), 그 외 caller → v1
"1" / "true" / "yes" / "on" (대소문자 무관)EXPORT 포함 모든 caller v1 fallback

이 var 설정은 plugin code 에 zero impact 입니다 — SDK 가 각 use_v2(ctx, ...) 체크 (우선순위 1, category lookup 이전) 에서 매번 읽습니다. Ray executor 가 spawn 하는 worker 는 driver 의 env 를 상속하므로 per-task 배선이 불필요합니다.

운영 중 사고 시 runtime rollback:

export SYNAPSE_FORCE_V1_EXPORT=1   # 마이그된 모든 caller 가 v1 로 회귀

이후 worker / driver 를 재기동하면 적용됩니다. PR revert 가 필요 없습니다.

Error envelope

v2 backend 는 모든 실패를 {"error": {"code": "...", "detail": "..."}} envelope 으로 감쌉니다. SDK 의 classify_export_error 는 v1 regex fallthrough 보다 먼저 이 envelope 을 조회하므로, 알려진 v2 code 는 전용 ExportLogMessageCode (및 retry policy) 로 매핑됩니다:

error.codeExportLogMessageCode
invalid_idsEXPORT_FAILED_BAD_REQUEST
partial_failureEXPORT_FAILED_BATCH_PARTIAL
rate_limitedEXPORT_FAILED_RATE_LIMITED

미지의 v2 code 는 v1 regex 로 fallthrough — v1 회귀 동작이 보존됩니다.

Partial-failure semantics

Backend V2BulkFetchMixin 은 resolve 불가능한 id 를 silently drop 하고 200 OK 응답에 resolve 된 subset 만 반환합니다 (specs Q2 결정). SDK 는 backend 가 반환한 그대로 반환합니다 — client-side error 도, exception 도 없습니다. 엄격한 resolution 이 필요한 caller 는 len(returned)len(requested) 를 비교하세요:

ids = [1, 2, 999_999]   # 999_999 는 존재하지 않음
rows = client.v2_client.tasks.bulk_fetch(ids)
# len(rows) == 2 — backend 가 missing id 를 silently drop
missing = set(ids) - {row['id'] for row in rows}

files_manifest nullability

v2 task / assignment payload 의 files_manifest 는 backend PR-A 에 따라 nullable 입니다. File 열거를 기대하는 plugin code 는 None 가드가 필요합니다:

def export_files(row):
manifest = row.get('files_manifest') or {}
for key, info in manifest.items():
...

GroundTruth payload 는 backend PR-C contract 에 따라 항상 non-null dict — ground_truths.bulk_data path 에서는 가드가 불필요합니다.

Plugin 작성자 마이그레이션 cheat sheet

기존 (v1):

def get_results(self, client, filters):
return client.list_tasks(params=filters, list_all=True)

v2 대응 (v1 fallback 포함):

from synapse_sdk.plugins.actions._v2_switch import _collect_then_bulk, use_v2
from synapse_sdk.plugins.enums import PluginCategory

def get_results(self, client, filters, *, ctx=None):
# action category 를 전달해 per-category 정책이 이 caller 를 v2 로
# opt-in 할 수 있게 합니다. 현재 v2 category 는 EXPORT 뿐입니다
# (SYN-7082); ``category`` 를 생략하거나 EXPORT 가 아닌 값을 넘기면
# v1 로 라우팅됩니다.
if use_v2(ctx, category=PluginCategory.EXPORT):
v2 = ctx.v2_client
rows = _collect_then_bulk(
list_method=lambda: v2.tasks.list(list_all=True, **filters),
bulk_method=lambda ids: v2.tasks.bulk_fetch(ids),
)
return rows, len(rows)
return client.list_tasks(params=filters, list_all=True)

Observability

모든 v2 request 의 meta.request_idclient.v2_client.last_request_id 로 노출됩니다. capture_request_ids() 로 block scope:

with client.v2_client.capture_request_ids():
rows = client.v2_client.tasks.bulk_fetch([1, 2, 3])
print(client.v2_client.last_request_id)

SDK 의 Sentry integration 이 켜진 경우 request_id 가 Sentry breadcrumb 에 자동 첨부됩니다.

BC policy

  • SYN-7082 narrowing: to_task/steps/fetch_tasks.pydataset/action.py _download_split 는 v1 path 로 좁혀졌습니다 (더 이상 use_v2category 를 전달하지 않음). EXPORT category 만 v2 에 남습니다. 이는 SYN-6919 대비 의도된 행동 변경이며 regression 버그가 아닙니다 — v2 분기는 명시적 _V2_CATEGORY_POLICY opt-in 이 있을 때까지 dead code 로 소스에 남아 있습니다.
  • kill-switch (SYNAPSE_FORCE_V1_EXPORT=1) 가 EXPORT 포함 모든 caller 를 v1 path 에 무기한 유지합니다.
  • handler get_results(client, filters, *, ctx=None) 시그니처는 kw-only ctx param 을 None default 로 추가 — ctx 없는 legacy 호출은 v1 동작을 그대로 유지합니다.
  • _collect_then_bulk 는 기본 id extraction path 에서 dict 형태의 slim row 와 Pydantic list model 을 모두 처리합니다.
  • 새 v2 resource method 는 모두 additive — 어떤 v1 method 도 제거 / 재용도되지 않았습니다.

기존 surface 에 대해 작성된 plugin 은 수정 없이 계속 동작합니다. v2 path 를 opt-in 하려는 작성자는 위 cheat sheet 를 참고하세요.