v2 Export 마이그레이션
본 문서는 SDK 의 export 파이프라인을 v2 백엔드의 bulk-fetch endpoint
로 전환한 작업을 설명합니다. Phase 1 (RuntimeContext.v2_client
wiring + failure_classifier v2 error code 지원) 이 main 에
머지된 상태에서, 단일 통합 PR (feat/SYN-6919-sdk-v2-export-migration)
로 cut-over 했습니다.
본 변경은 plugin 작성자 입장에서 additive 입니다 — 기존 caller
는 SYNAPSE_FORCE_V1_EXPORT kill-switch 또는 runtime context 에
v2_client 를 주입하지 않는 것만으로 그대로 동작합니다.
SYN-6919 는 원래 v2_client 가 wired 되어 있으면 마이그된 3개 caller
모두 v2 로 라우팅했습니다. SYN-7082 가 이를 좁혔습니다: 이제 v2 선택은
plugin action category 단위로 게이팅됩니다. 기본값은 v1 이며, EXPORT
category 만 v2 로 opt-in 합니다. 결과적으로:
export/handlers.py→ 여전히 v2 (불변).to_task/steps/fetch_tasks.py→ v1 회귀 (SYN-7082 행동 변경).dataset/action.py_download_split→ v1 회귀 (SYN-7082 행동 변경).
권위 있는 정책은 아래
category 별 v2 opt-in
을 참고하세요. SSOT 는 synapse_sdk/plugins/actions/_v2_switch.py 입니다.
요약
- SDK 클라이언트에 4개의 v2 backend endpoint 추가:
client.v2_client.tasks.bulk_fetch(ids)→POST /v2/tasks/bulk-fetch/client.v2_client.assignments.bulk_fetch(ids)→POST /v2/assignments/bulk-fetch/client.v2_client.ground_truths.bulk_data(ids)→POST /v2/ground-truths/bulk-data/client.v2_client.ground_truth_events.list(...)+bulk_data(pgh_ids)
use_v2스위치 하에 3개 caller 마이그레이션 (SYN-7082 이후 라우팅):export/handlers.py(Task / Assignment / GroundTruth handler) → v2 (use_v2(ctx, category=PluginCategory.EXPORT))to_task/steps/fetch_tasks.py→ v1 (use_v2(ctx), category 미전달 — SYN-7082 회귀)dataset/action.py(_download_split) → v1 (use_v2(ctx), category 미전달 — SYN-7082 회귀)
- nested accessor 1개:
client.v2_client.ground_truth_datasets.versions(ds).events(ver).list(...)
- runtime 롤백 kill-switch 1개:
SYNAPSE_FORCE_V1_EXPORT=1→EXPORT포함 모든 caller 를 v1 로 회귀.
v1 → v2 caller 패턴
v2 endpoint 는 기존 단일 호출 fetch 를 2-phase 패턴으로 분리합니다:
| Phase | v1 (단일 호출) | v2 (2-phase) |
|---|---|---|
| List | client.list_tasks(params, list_all=True) — heavy row 반환 | client.v2_client.tasks.list(..., list_all=True) — slim row 반환 |
| Hydrate | (위 호출에 포함) | client.v2_client.tasks.bulk_fetch(ids) chunk 별 호출 |
공용 helper _collect_then_bulk 가 이 패턴을 codify 합니다:
from synapse_sdk.plugins.actions._v2_switch import _collect_then_bulk
rows = _collect_then_bulk(
list_method=lambda: client.v2_client.tasks.list(project=42, list_all=True),
bulk_method=lambda ids: client.v2_client.tasks.bulk_fetch(ids),
ids_per_batch=200,
)
기본 extractor 는 dict row 와 v2 Pydantic list model 양쪽을 모두
처리합니다. 즉 task / assignment list row 가 {"id": ...} 형태의
dict 로 오거나 TaskV2List / AssignmentV2List 처럼 .id
attribute 를 가진 object 로 오더라도, helper 는 동일하게 id 를
수집한 뒤 bulk_fetch 를 호출합니다.
Backend 는 bulk_fetch / bulk_data 1회 호출 당 200 ids 까지
처리합니다 (V2BulkFetchMixin.BULK_FETCH_MAX_IDS —
apps/shared/api/viewsets/bulk_fetch.py). Helper 기본값은 이 cap 과
동일한 200 입니다. 더 큰 chunk 를 보내면 backend 가 400
ids_limit_exceeded 로 응답합니다. per-call 압박을 줄여야 하는
경우 더 작은 값을 전달할 수 있지만, 200 을 초과할 수는 없습니다.
ground_truth_events 의 primary key 는 id 가 아닌 pgh_id
입니다. extract_id callable 을 전달하세요. custom extractor 역시
dict/model 양쪽과 호환되게 작성해야 합니다:
def item_field(item, field):
if isinstance(item, dict):
return item[field]
return getattr(item, field)
rows = _collect_then_bulk(
list_method=lambda: client.v2_client.ground_truth_events.list(
ground_truth_dataset_version=99,
list_all=True,
),
bulk_method=lambda pgh_ids: client.v2_client.ground_truth_events.bulk_data(pgh_ids),
extract_id=lambda item: item_field(item, 'pgh_id'),
)
v2 path 의 throttle / retry 계약
v1 export handler 는 _apply_export_params 를 통해
EXPORT_THROTTLE_SECONDS / EXPORT_PAGE_RETRIES /
EXPORT_READ_TIMEOUT 를 cursor paginator 에 전달, page fetch 페이싱을
조절했습니다. v2 path 는 list endpoint cursor pagination (resource layer
가 처리) + bulk-fetch sequence 의 조합이므로 _apply_export_params 의
wholesale 적용이 불가합니다.
마이그된 handler 는 bulk-fetch sequence 의 v1 inter-call throttle 을
보존합니다 — EXPORT_THROTTLE_SECONDS 를
_collect_then_bulk(..., throttle_seconds=...) 로 forward 함. v1
production exporter 에서 검증된 per-handler back-pressure (Task = 0.2s,
GroundTruth = 0.1s, Assignment = 0.1s — 첫 호출 이전 / 마지막 호출
이후에는 sleep 없음) 가 그대로 유지됩니다.
EXPORT_PAGE_RETRIES / EXPORT_READ_TIMEOUT 은 v2 client 까지
plumb 되지 않습니다 — v2 transport (BackendV2Client._request)
가 자체 retry + timeout policy 를 적용합니다 (v2 cursor paginator +
HTTPX defaults). v2 retry semantics 를 조정하려면 handler 가 아닌
BackendV2Client 생성 위치에서 설정하세요.
다음 진화 — server-side async-job 위임 (SYN-7104 P2)
v2 2-phase path 는 여전히 SDK (ray job) 가 v2 REST 로 모든 row 를 끌어와 in-process 로 변환합니다. 10만 건 이상 + 원본 파일이 큰 export 에서는 이 끌어오기 자체가 비용·취약성의 핵심입니다. SYN-7104 P2 는 v2 export path 위에 backend async-job 위임 레이어를 얹습니다: 대상 건수가 임계를 넘으면 데이터 취득(acquisition) 을 backend async-job (서버사이드 keyset 스캔 → raw bundle) 에 넘기고, SDK 는 enqueue · 진행률 스트리밍 · bundle 읽기 후 비위임 경로와 동일한 공유 transform tail 을 실행합니다. 위임은 데이터를 어떻게 취득하는가 만 바꿀 뿐 산출물은 동일합니다 (투명 위임, FR-8 parity). 이는 Export Actions 의 future-work 항목으로 추적되던 "(A안) server-side async export-to-storage" 의 첫 도입입니다.
이 위임은 2-phase v2 path 를 대체하지 않고 그 위에 얹히는 추가 레이어입니다. 임계 미만 export 는 in-process head (v2 list → bulk-fetch) 로, 임계 초과 export 는 delegated head 로 데이터를 취득합니다. 어느 경로든 취득 후에는 같은 공유 tail (ConvertData → SaveFiles → Finalize) 이 실행되므로 플러그인 커스텀 로직이 양쪽에 동일하게 적용됩니다.
위임 게이트는 기본 활성(ON) 입니다. 위임은 데이터 취득 경로만
서버사이드로 옮길 뿐, 산출물은 비위임 경로와 동일한 plugin 포맷 입니다
— 위임 경로도 같은 공유 transform tail (ConvertData → SaveFiles) 을
거치기 때문입니다. backend 가 쓰는 raw bundle (JSONL + files + manifest)
은 delegated head 의 중간 산출물이며, SDK 가 이를 읽어 (BundleRead) tail
로 전달합니다. 즉시 in-process 취득으로 되돌리려면 아래 kill-switch 를
사용합니다. 신규 위임 경로는 프로덕션 반영 전 staging 검증을 권장합니다.
게이트, 임계, kill-switch
setup_steps() 는 위임 결정으로 분기하지 않고, 단일 고정 9-step registry
를 항상 등록합니다 (아래 고정 step registry 와 공유 tail 참고).
전체 게이트는 다음을 모두 만족할 때만 발동합니다 — 위임 enabled
(kill-switch off) AND ctx.v2_client wired AND count > threshold
(strict >) AND project_id 확보. 결정은 한 번
(_resolve_delegation_decision()) 평가되어 context.delegation_decision 에
seed 되고, 각 head step 의 can_skip 이 이를 참조해 비활성 head 를
완료-pass 합니다. 건수는 v1 단일페이지 count 프로브 (풀스캔 없음) 로
산정하며, 프로브가 count 를 못 구하면 export 는 비위임 (in-process head)
경로로 폴백합니다.
| 항목 | env | config | 기본값 |
|---|---|---|---|
| Size threshold | SYNAPSE_EXPORT_ASYNC_JOB_SIZE_THRESHOLD | export_async_job_size_threshold | 1000 |
| Kill-switch (in-process 강제) | SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1 | export_disable_async_delegate | off (위임 on) |
이 kill-switch 는 SYNAPSE_FORCE_V1_EXPORT 와 독립적 입니다. 둘은 조합됩니다:
SYNAPSE_FORCE_V1_EXPORT | SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE | Export path |
|---|---|---|
| off | off | v2 2-phase, count > threshold 시 위임 |
| off | on | v2 2-phase in-process (위임 비활성) |
| on | (둘 중 무엇이든) | v1 export path (v2 레이어 전체 롤백) |
# 위임 레이어만 끄기 (v2 2-phase in-process 유지):
export SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1
# 또는 export 전체를 v1 으로 롤백 (위임도 함께 우회):
export SYNAPSE_FORCE_V1_EXPORT=1
고정 step registry 와 공유 tail
setup_steps() 는 분기하지 않고 항상 같은 고정 9-step registry (두
acquisition head + 하나의 공유 transform tail) 를 등록합니다. (한 번 프로브된)
위임 결정은 각 head step 의 can_skip 만 뒤집어, 비활성 head 는 완료-pass
(skip → 100%) 하고 공유 tail 은 양쪽 경로에서 실행 되어 동일한 plugin
포맷 산출물 을 만듭니다 (FR-8 parity):
| 영역 | Steps | 위임 시 | 비위임 시 |
|---|---|---|---|
| in-process head | Initialize → FetchResults → PrepareExport (v2 list → bulk-fetch) | skip (pass) | 실행 |
| delegated head | DelegateEnqueue (plugin_exports.create) → DelegateStreamProgress (async_jobs.stream_progress SSE) → BundleRead (raw bundle 읽기 + url→bundle_path rewrite) | 실행 | skip (pass) |
| 공유 tail | ConvertData (plugin transform) → SaveFiles → Finalize | 실행 | 실행 |
backend 가 쓰는 raw bundle (JSONL + files + manifest) 은 delegated head 의
중간 산출물입니다: BundleRead 가
{prefix}/{tasks|assignments|ground_truths}.jsonl (각 라인
{id, data, files_manifest:{spec:{file_name_original, url, bundle_path}}}) 을
읽어, map_files_to_bundle_local 로 url → {prefix}/{bundle_path}
rewrite (traversal-guard, 출력 파일명은 file_name_original 유지) 한 뒤
공유 tail 로 전달합니다. 따라서 위임 export 도 raw bundle 이 아니라 plugin
포맷을 산출합니다.
delegated head 는 세 개의 BackendV2Client resource 를 사용합니다 —
v2_client.plugin_exports.create(...), v2_client.async_jobs.stream_progress(job_id)
(SSE), v2_client.async_jobs.retrieve(job_id). backend 는 progress
(processed/total) 만 흘리고, event/metric/user-facing 로그는 SDK 가 전담
합니다 (set_progress step server_export, set_metrics,
export_info/export_completed/export_failed,
EXPORT_DELEGATED_TO_SERVER/EXPORT_DELEGATED_COMPLETED). 전체 계약은
Export Actions — 대량 export 의 서버사이드 위임
과 Steps Workflow — fixed step registry
를 참고하세요.
Nested accessor
Events endpoint 는 flat resource 와 nested accessor 둘 다 제공합니다. 둘 모두 동일한 backend filter contract 를 사용합니다:
# Flat
client.v2_client.ground_truth_events.list(ground_truth_dataset_version=99)
# Nested (RPC-faithful URL)
client.v2_client.ground_truth_datasets.versions(7).events(99).list()
호출 context 가 이미 owning dataset / version 을 보유한 경우 nested 형식을, cross-dataset analytics 인 경우 flat 형식을 사용하세요.
category 별 v2 opt-in
SYN-7082 는 env 전역 스위치를 action category 별 정책으로 교체했습니다.
이 결정은 use_v2 와 _V2_CATEGORY_POLICY 테이블
(synapse_sdk/plugins/actions/_v2_switch.py — 본 가이드의 SSOT) 에
중앙화되어 있습니다.
시그니처 변경
# 변경 전 (SYN-6919)
def use_v2(ctx) -> bool: ...
# 변경 후 (SYN-7082)
def use_v2(ctx, *, category: PluginCategory | None = None) -> bool: ...
정책 테이블
_V2_CATEGORY_POLICY: dict[PluginCategory, bool] = {
PluginCategory.EXPORT: True, # export action 만 v2 category.
}
기본값은 v1 입니다. category 가 v2 로 라우팅되려면 여기에 명시적
True 항목이 있고 동시에 그 callsite 가 일치하는 category= 를
전달해야 합니다. None / 미등록 category 는 항상 v1 로 fallback 합니다.
다른 category 를 v2 로 올리려면: 명시적 PluginCategory.X: True 항목을
추가하고 해당 callsite 에 category=PluginCategory.X 를 전달하세요.
Override 우선순위 (최상위 → 최하위)
| # | 규칙 | 결과 |
|---|---|---|
| 1 | SYNAPSE_FORCE_V1_EXPORT truthy (1 / true / yes / on, 대소문자 무관) | v1 — 모든 caller runtime 롤백 |
| 2 | ctx.v2_client 없음 / None | v1 — v2 client 미wiring |
| 3 | category 로 _V2_CATEGORY_POLICY lookup | 기본 v1; EXPORT → v2 |
caller 별 라우팅
| Caller | 호출 | 라우팅 |
|---|---|---|
export/handlers.py | use_v2(ctx, category=PluginCategory.EXPORT) | v2 (불변) |
to_task/steps/fetch_tasks.py | use_v2(ctx) (category 미전달) | v1 (SYN-6919 대비 회귀) |
dataset/action.py _download_split | use_v2(ctx) (category 미전달) | v1 (SYN-6919 대비 회귀) |
to_task / dataset 의 v2 분기 코드는 삭제되지 않고 보존 (dead) 됩니다
— 재활성화하려면 해당 category 를 _V2_CATEGORY_POLICY 에 추가하고 callsite
에 category= 를 연결하면 됩니다. 두 callsite 모두 그 취지의 SYN-7082 소스
주석을 가지고 있습니다.
Kill-switch 운영
SYNAPSE_FORCE_V1_EXPORT 는 최상위 우선순위 runtime 롤백 path 로,
category 정책을 완전히 덮어씁니다:
SYNAPSE_FORCE_V1_EXPORT | 효과 |
|---|---|
미설정 / "" / "0" / "false" | category 정책 적용 — EXPORT → v2 (v2_client wiring 시), 그 외 caller → v1 |
"1" / "true" / "yes" / "on" (대소문자 무관) | EXPORT 포함 모든 caller v1 fallback |
이 var 설정은 plugin code 에 zero impact 입니다 — SDK 가 각
use_v2(ctx, ...) 체크 (우선순위 1, category lookup 이전) 에서 매번
읽습니다. Ray executor 가 spawn 하는 worker 는 driver 의 env 를 상속하므로
per-task 배선이 불필요합니다.
운영 중 사고 시 runtime rollback:
export SYNAPSE_FORCE_V1_EXPORT=1 # 마이그된 모든 caller 가 v1 로 회귀
이후 worker / driver 를 재기동하면 적용됩니다. PR revert 가 필요 없습니다.
Error envelope
v2 backend 는 모든 실패를 {"error": {"code": "...", "detail": "..."}}
envelope 으로 감쌉니다. SDK 의 classify_export_error 는 v1 regex
fallthrough 보다 먼저 이 envelope 을 조회하므로, 알려진 v2 code
는 전용 ExportLogMessageCode (및 retry policy) 로 매핑됩니다:
error.code | ExportLogMessageCode |
|---|---|
invalid_ids | EXPORT_FAILED_BAD_REQUEST |
partial_failure | EXPORT_FAILED_BATCH_PARTIAL |
rate_limited | EXPORT_FAILED_RATE_LIMITED |
미지의 v2 code 는 v1 regex 로 fallthrough — v1 회귀 동작이 보존됩니다.
Partial-failure semantics
Backend V2BulkFetchMixin 은 resolve 불가능한 id 를 silently drop
하고 200 OK 응답에 resolve 된 subset 만 반환합니다 (specs Q2 결정).
SDK 는 backend 가 반환한 그대로 반환합니다 — client-side error 도,
exception 도 없습니다. 엄격한 resolution 이 필요한 caller 는
len(returned) 와 len(requested) 를 비교하세요:
ids = [1, 2, 999_999] # 999_999 는 존재하지 않음
rows = client.v2_client.tasks.bulk_fetch(ids)
# len(rows) == 2 — backend 가 missing id 를 silently drop
missing = set(ids) - {row['id'] for row in rows}
files_manifest nullability
v2 task / assignment payload 의 files_manifest 는 backend PR-A 에
따라 nullable 입니다. File 열거를 기대하는 plugin code 는 None
가드가 필요합니다:
def export_files(row):
manifest = row.get('files_manifest') or {}
for key, info in manifest.items():
...
GroundTruth payload 는 backend PR-C contract 에 따라 항상 non-null
dict — ground_truths.bulk_data path 에서는 가드가 불필요합니다.
Plugin 작성자 마이그레이션 cheat sheet
기존 (v1):
def get_results(self, client, filters):
return client.list_tasks(params=filters, list_all=True)
v2 대응 (v1 fallback 포함):
from synapse_sdk.plugins.actions._v2_switch import _collect_then_bulk, use_v2
from synapse_sdk.plugins.enums import PluginCategory
def get_results(self, client, filters, *, ctx=None):
# action category 를 전달해 per-category 정책이 이 caller 를 v2 로
# opt-in 할 수 있게 합니다. 현재 v2 category 는 EXPORT 뿐입니다
# (SYN-7082); ``category`` 를 생략하거나 EXPORT 가 아닌 값을 넘기면
# v1 로 라우팅됩니다.
if use_v2(ctx, category=PluginCategory.EXPORT):
v2 = ctx.v2_client
rows = _collect_then_bulk(
list_method=lambda: v2.tasks.list(list_all=True, **filters),
bulk_method=lambda ids: v2.tasks.bulk_fetch(ids),
)
return rows, len(rows)
return client.list_tasks(params=filters, list_all=True)
Observability
모든 v2 request 의 meta.request_id 는
client.v2_client.last_request_id 로 노출됩니다.
capture_request_ids() 로 block scope:
with client.v2_client.capture_request_ids():
rows = client.v2_client.tasks.bulk_fetch([1, 2, 3])
print(client.v2_client.last_request_id)
SDK 의 Sentry integration 이 켜진 경우 request_id 가 Sentry
breadcrumb 에 자동 첨부됩니다.
BC policy
- SYN-7082 narrowing:
to_task/steps/fetch_tasks.py와dataset/action.py_download_split는 v1 path 로 좁혀졌습니다 (더 이상use_v2에category를 전달하지 않음).EXPORTcategory 만 v2 에 남습니다. 이는 SYN-6919 대비 의도된 행동 변경이며 regression 버그가 아닙니다 — v2 분기는 명시적_V2_CATEGORY_POLICYopt-in 이 있을 때까지 dead code 로 소스에 남아 있습니다. - kill-switch (
SYNAPSE_FORCE_V1_EXPORT=1) 가EXPORT포함 모든 caller 를 v1 path 에 무기한 유지합니다. - handler
get_results(client, filters, *, ctx=None)시그니처는 kw-onlyctxparam 을Nonedefault 로 추가 —ctx없는 legacy 호출은 v1 동작을 그대로 유지합니다. _collect_then_bulk는 기본idextraction path 에서 dict 형태의 slim row 와 Pydantic list model 을 모두 처리합니다.- 새 v2 resource method 는 모두 additive — 어떤 v1 method 도 제거 / 재용도되지 않았습니다.
기존 surface 에 대해 작성된 plugin 은 수정 없이 계속 동작합니다. v2 path 를 opt-in 하려는 작성자는 위 cheat sheet 를 참고하세요.