v2 Export Migration
This guide documents the SDK migration to the v2 backend's bulk-fetch
endpoints for the export pipeline. The migration was shipped as a single
cut-over PR (feat/SYN-6919-sdk-v2-export-migration) on top of the
Phase 1 wiring already on main (RuntimeContext.v2_client +
failure_classifier v2 error code support).
The change is additive for plugin authors — every legacy caller
continues to work via the SYNAPSE_FORCE_V1_EXPORT kill-switch or by
omitting the new v2_client from the runtime context.
SYN-6919 originally routed all three migrated callers to v2 whenever a
v2_client was wired. SYN-7082 narrowed this: v2 selection is now
gated per plugin action category. The default is v1; only the
EXPORT category opts into v2. As a result:
export/handlers.py→ still v2 (unchanged).to_task/steps/fetch_tasks.py→ back to v1 (regressed under SYN-7082).dataset/action.py_download_split→ back to v1 (regressed under SYN-7082).
See Per-category v2 opt-in
below for the authoritative policy. The SSOT is
synapse_sdk/plugins/actions/_v2_switch.py.
TL;DR
- 4 new v2 backend endpoints surfaced on the SDK client:
client.v2_client.tasks.bulk_fetch(ids)→POST /v2/tasks/bulk-fetch/client.v2_client.assignments.bulk_fetch(ids)→POST /v2/assignments/bulk-fetch/client.v2_client.ground_truths.bulk_data(ids)→POST /v2/ground-truths/bulk-data/client.v2_client.ground_truth_events.list(...)+bulk_data(pgh_ids)
- 3 caller migrations behind the
use_v2switch (post-SYN-7082 routing):export/handlers.py(Task / Assignment / GroundTruth handlers) → v2 (use_v2(ctx, category=PluginCategory.EXPORT))to_task/steps/fetch_tasks.py→ v1 (use_v2(ctx), no category — SYN-7082 regression)dataset/action.py(_download_split) → v1 (use_v2(ctx), no category — SYN-7082 regression)
- 1 nested accessor:
client.v2_client.ground_truth_datasets.versions(ds).events(ver).list(...)
- 1 runtime rollback kill-switch:
SYNAPSE_FORCE_V1_EXPORT=1→ forces every caller (including EXPORT) back to v1.
v1 → v2 caller pattern
The v2 endpoints split the legacy single-call fetch into a 2-phase pattern:
| Phase | v1 (single call) | v2 (2-phase) |
|---|---|---|
| List | client.list_tasks(params, list_all=True) returns heavy rows | client.v2_client.tasks.list(..., list_all=True) returns slim rows |
| Hydrate | (included above) | client.v2_client.tasks.bulk_fetch(ids) per chunk |
The shared helper _collect_then_bulk codifies this:
from synapse_sdk.plugins.actions._v2_switch import _collect_then_bulk
rows = _collect_then_bulk(
list_method=lambda: client.v2_client.tasks.list(project=42, list_all=True),
bulk_method=lambda ids: client.v2_client.tasks.bulk_fetch(ids),
ids_per_batch=200,
)
The default extractor is compatible with both dict rows and v2
Pydantic list models. In other words, task and assignment list rows may
surface as {"id": ...} or as objects such as TaskV2List /
AssignmentV2List with an .id attribute, and the helper treats
both shapes the same before calling bulk_fetch.
The backend caps each bulk_fetch / bulk_data call at 200 ids
(V2BulkFetchMixin.BULK_FETCH_MAX_IDS in
apps/shared/api/viewsets/bulk_fetch.py). The helper default
matches this cap; sending more ids per call surfaces as a backend 400
ids_limit_exceeded. Callers may pass a smaller ids_per_batch
to reduce per-call backpressure but never larger.
For ground_truth_events the primary key is pgh_id rather than
id, so pass an extract_id callable. Keep custom extractors
dict/model-compatible too:
def item_field(item, field):
if isinstance(item, dict):
return item[field]
return getattr(item, field)
rows = _collect_then_bulk(
list_method=lambda: client.v2_client.ground_truth_events.list(
ground_truth_dataset_version=99,
list_all=True,
),
bulk_method=lambda pgh_ids: client.v2_client.ground_truth_events.bulk_data(pgh_ids),
extract_id=lambda item: item_field(item, 'pgh_id'),
)
Throttle / retry contract on the v2 path
The v1 export handlers tuned EXPORT_THROTTLE_SECONDS /
EXPORT_PAGE_RETRIES / EXPORT_READ_TIMEOUT through
_apply_export_params so the v1 cursor paginator paced its own page
fetches. The v2 path uses cursor pagination on the list endpoint
(handled by the resource layer) plus the new bulk-fetch sequence, so
_apply_export_params no longer applies wholesale.
The migrated handlers preserve the v1 inter-call throttle on the
bulk-fetch sequence by forwarding EXPORT_THROTTLE_SECONDS to
_collect_then_bulk(..., throttle_seconds=...). That keeps the
per-handler back-pressure characteristics observed in the v1 production
exporter (Task = 0.2s, GroundTruth = 0.1s, Assignment = 0.1s — all
sleeps are between successive bulk calls, never before the first or
after the last).
EXPORT_READ_TIMEOUT is not plumbed through to the v2 client — the
v2 transport (BackendV2Client._request) applies its own timeout policy
based on the v2 cursor paginator and HTTPX defaults. If you need to
override v2 read-timeout semantics, set it at the BackendV2Client
construction site rather than on the handler.
The export handlers now forward page-level retry to the
list paginator via the pass-through (_paginator_pass_through()):
page_retries=3 and retry_on_status=(502, 503, 504, 500)
(EXPORT_LIST_RETRY_ON_STATUS — note the added 500 for transient
DB-connection-drop responses observed on slow large-project list queries).
The global DEFAULT_RETRY_ON_STATUS is unchanged, so non-export v2
consumers keep the conservative 502/503/504 default. This supersedes
the "EXPORT_PAGE_RETRIES not plumbed" note above.
Large-volume resilience (keyset / presigned / retry)
The v2 export 2-phase path is hardened for large projects as follows. See Export Actions ("대량 데이터 무중단 export" section) for the full methodology and the future-work TODO list. Summary:
- Keyset list ordering — Task / Assignment list calls add
sort='-id'(unique PK keyset) to eliminate DRFCursorPaginationoffset degradation on the non-unique-createddefault. GroundTruth already uses the-pgh_idevent-PK keyset. Backed by a(project, id)index in synapse-backend. - presigned bulk — bulk calls use
response_mode='presigned';_resolve_data_payload()downloads externalizeddatafrom the presigned URL viadownload_jsoninstead of streaming it through the API tier. - bounded retry — see the page-level retry info box above.
Next evolution — server-side async-job delegation (SYN-7104 P2)
The v2 2-phase path still has the SDK (ray job) pull every row over v2 REST and transform it in-process. For very large exports (100k+ rows with bulky originals) that pull is itself the cost/fragility hot-spot. SYN-7104 P2 layers backend async-job delegation on top of the v2 export path: when the target count exceeds a threshold, the data acquisition is handed to a backend async-job (server-side keyset scan → raw bundle) while the SDK enqueues, streams progress, reads the bundle, and then runs the same shared transform tail as the non-delegated path. Delegation only changes how the data is acquired — the deliverable is identical (transparent delegation, FR-8 parity). This is the first landing of the "(A안) server-side async export-to-storage" future-work item tracked in Export Actions.
This delegation is an additional layer above the 2-phase v2 path — it does not replace it. Exports below the threshold acquire data through the in-process head (v2 list → bulk-fetch); over-threshold exports acquire it through the delegated head. Either way the same shared tail (ConvertData → SaveFiles → Finalize) runs afterwards, so the plugin's custom logic applies on both paths.
The delegation gate is enabled by default. Delegation only moves the
data-acquisition path server-side; the deliverable is the same plugin
format as the non-delegated path, because the delegated path runs the
same shared transform tail (ConvertData → SaveFiles). The
backend-written raw bundle (JSONL + files + manifest) is the delegated
head's intermediate artifact, which the SDK reads (BundleRead) and feeds
into the tail. To revert to in-process acquisition immediately, use the
kill-switch below. Staging verification of the new delegated path is
recommended before production rollout.
Gate, threshold, kill-switch
setup_steps() does not branch on the delegation decision — it always
registers a single fixed 9-step registry (see Fixed step registry with
shared tail below). The whole gate fires
only when all of: delegation enabled (kill-switch off) AND
ctx.v2_client wired AND count > threshold (strict >) AND a
project_id is resolved. The decision is evaluated once
(_resolve_delegation_decision()) and seeded onto
context.delegation_decision; each head step's can_skip consults it so the
inactive head completes-pass. The count comes from a v1 single-page count
probe (no full scan); if the probe can't determine a count, the export
falls back to the non-delegated (in-process head) path.
| Concern | env | config | Default |
|---|---|---|---|
| Size threshold | SYNAPSE_EXPORT_ASYNC_JOB_SIZE_THRESHOLD | export_async_job_size_threshold | 1000 |
| Kill-switch (force in-process) | SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1 | export_disable_async_delegate | off (delegation on) |
This kill-switch is independent of SYNAPSE_FORCE_V1_EXPORT. The two
compose:
SYNAPSE_FORCE_V1_EXPORT | SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE | Export path |
|---|---|---|
| off | off | v2 2-phase, delegated when count > threshold |
| off | on | v2 2-phase in-process (delegation disabled) |
| on | (either) | v1 export path (whole v2 layer rolled back) |
# Disable only the delegation layer (keep v2 2-phase in-process):
export SYNAPSE_EXPORT_DISABLE_ASYNC_DELEGATE=1
# Or roll the entire export back to v1 (also bypasses delegation):
export SYNAPSE_FORCE_V1_EXPORT=1
Fixed step registry with shared tail
setup_steps() does not branch — it always registers the same fixed
9-step registry (two acquisition heads + one shared transform tail) rather
than a delegation-dependent step set. The delegation decision (probed once)
only flips each head step's can_skip, so the inactive head completes-pass
(skip → 100%) while the shared tail runs on both paths — producing the
same plugin-format deliverable (FR-8 parity):
| Area | Steps | Active when delegated | Active when not |
|---|---|---|---|
| In-process head | Initialize → FetchResults → PrepareExport (v2 list → bulk-fetch) | skip (pass) | run |
| Delegated head | DelegateEnqueue (plugin_exports.create) → DelegateStreamProgress (async_jobs.stream_progress SSE) → BundleRead (read raw bundle + url→bundle_path rewrite) | run | skip (pass) |
| Shared tail | ConvertData (plugin transform) → SaveFiles → Finalize | run | run |
The raw bundle (JSONL + files + manifest) the backend writes is the delegated
head's intermediate artifact: BundleRead reads
{prefix}/{tasks|assignments|ground_truths}.jsonl (each line
{id, data, files_manifest:{spec:{file_name_original, url, bundle_path}}}),
rewrites url → {prefix}/{bundle_path} via map_files_to_bundle_local
(traversal-guarded, output filename kept as file_name_original), and feeds
the items into the shared tail. So the delegated export still produces the
plugin format — not a raw bundle.
The delegated head uses three BackendV2Client resources —
v2_client.plugin_exports.create(...), v2_client.async_jobs.stream_progress(job_id)
(SSE), and v2_client.async_jobs.retrieve(job_id). The backend streams only
progress (processed/total); the SDK owns event/metric/user-facing logs
(set_progress step server_export, set_metrics,
export_info/export_completed/export_failed,
EXPORT_DELEGATED_TO_SERVER/EXPORT_DELEGATED_COMPLETED). See
Export Actions — 대량 export 의 서버사이드 위임
and Steps Workflow — fixed step registry
for the full contract.
Nested accessor
The events endpoint is available both as a flat resource and as a nested accessor under the owning dataset version. Both surfaces hit the same backend filter contract:
# Flat
client.v2_client.ground_truth_events.list(ground_truth_dataset_version=99)
# Nested (RPC-faithful URL)
client.v2_client.ground_truth_datasets.versions(7).events(99).list()
Use the nested form when the owning dataset / version is already part of the calling context; use the flat form for cross-dataset analytics.
Per-category v2 opt-in
SYN-7082 replaced the env-only global switch with a per-action-category
policy. The decision is centralised in use_v2 and the
_V2_CATEGORY_POLICY table
(synapse_sdk/plugins/actions/_v2_switch.py — the SSOT for this guide).
Signature change
# Before (SYN-6919)
def use_v2(ctx) -> bool: ...
# After (SYN-7082)
def use_v2(ctx, *, category: PluginCategory | None = None) -> bool: ...
Policy table
_V2_CATEGORY_POLICY: dict[PluginCategory, bool] = {
PluginCategory.EXPORT: True, # export action is the only v2 category.
}
The default is v1. A category routes to v2 only if it has an explicit
True entry here and the callsite passes the matching category=.
A None / unregistered category always falls back to v1. To roll another
category onto v2: add an explicit PluginCategory.X: True entry and
pass category=PluginCategory.X at that callsite.
Override priority (highest → lowest)
| # | Rule | Result |
|---|---|---|
| 1 | SYNAPSE_FORCE_V1_EXPORT truthy (1 / true / yes / on, case-insensitive) | v1 — runtime rollback for every caller |
| 2 | ctx.v2_client missing / None | v1 — no v2 client wired |
| 3 | _V2_CATEGORY_POLICY lookup on category | default v1; EXPORT → v2 |
Per-caller routing
| Caller | Call | Route |
|---|---|---|
export/handlers.py | use_v2(ctx, category=PluginCategory.EXPORT) | v2 (unchanged) |
to_task/steps/fetch_tasks.py | use_v2(ctx) (no category) | v1 (regressed from SYN-6919) |
dataset/action.py _download_split | use_v2(ctx) (no category) | v1 (regressed from SYN-6919) |
The v2 branch code in to_task and dataset is preserved (dead),
not deleted — re-activating it only requires adding the relevant category
to _V2_CATEGORY_POLICY and threading category= through the callsite.
Both callsites carry a SYN-7082 source comment to that effect.
Kill-switch operation
SYNAPSE_FORCE_V1_EXPORT is the top-priority runtime rollback path —
it overrides the category policy entirely:
SYNAPSE_FORCE_V1_EXPORT | Effect |
|---|---|
unset / "" / "0" / "false" | category policy applies — EXPORT → v2 (when v2_client wired), all other callers → v1 |
"1" / "true" / "yes" / "on" (case-insensitive) | v1 fallback for every caller, including EXPORT |
Setting the var has zero impact on plugin code — the SDK reads it on
each use_v2(ctx, ...) check (priority 1, before the category lookup).
Workers spawned by the ray executor inherit the env from their driver; no
per-task plumbing is required.
For runtime rollback during an incident:
export SYNAPSE_FORCE_V1_EXPORT=1 # all migrated callers go back to v1
Then restart workers / drivers to pick up the change. No PR revert is required.
Error envelope
The v2 backend wraps every failure in
{"error": {"code": "...", "detail": "..."}}. The SDK's
classify_export_error consults this envelope before the legacy
v1 regex fallthrough, so callers see specific ExportLogMessageCode
values (and retry policies) for known v2 codes:
error.code | ExportLogMessageCode |
|---|---|
invalid_ids | EXPORT_FAILED_BAD_REQUEST |
partial_failure | EXPORT_FAILED_BATCH_PARTIAL |
rate_limited | EXPORT_FAILED_RATE_LIMITED |
Unknown v2 codes fall through to v1 regex patterns; v1 regression behaviour is preserved.
Partial-failure semantics
The backend V2BulkFetchMixin silently drops unresolvable ids and
returns a 200 OK with the resolved subset (specs Q2 resolution). The SDK
returns whatever the backend returns — no client-side error, no
exception. Callers needing strict resolution should compare
len(returned) to len(requested):
ids = [1, 2, 999_999] # 999_999 doesn't exist
rows = client.v2_client.tasks.bulk_fetch(ids)
# len(rows) == 2 — backend silently dropped the missing id.
missing = set(ids) - {row['id'] for row in rows}
files_manifest nullability
The v2 task / assignment payload exposes files_manifest as
nullable per backend PR-A. Plugin code that expects file enumeration
must guard against None:
def export_files(row):
manifest = row.get('files_manifest') or {}
for key, info in manifest.items():
...
GroundTruth payloads always carry a non-null files_manifest dict
(backend PR-C contract), so no guard is needed for the
ground_truths.bulk_data path.
Plugin author migration cheat sheet
Old (v1):
def get_results(self, client, filters):
return client.list_tasks(params=filters, list_all=True)
New (v2-capable with v1 fallback):
from synapse_sdk.plugins.actions._v2_switch import _collect_then_bulk, use_v2
from synapse_sdk.plugins.enums import PluginCategory
def get_results(self, client, filters, *, ctx=None):
# Pass the action category so the per-category policy can opt this
# caller into v2. EXPORT is the only v2 category today (SYN-7082);
# omitting ``category`` (or passing a non-EXPORT one) routes to v1.
if use_v2(ctx, category=PluginCategory.EXPORT):
v2 = ctx.v2_client
rows = _collect_then_bulk(
list_method=lambda: v2.tasks.list(list_all=True, **filters),
bulk_method=lambda ids: v2.tasks.bulk_fetch(ids),
)
return rows, len(rows)
return client.list_tasks(params=filters, list_all=True)
Observability
Every v2 request exposes its meta.request_id via
client.v2_client.last_request_id. Use capture_request_ids() to
scope the slot to a block:
with client.v2_client.capture_request_ids():
rows = client.v2_client.tasks.bulk_fetch([1, 2, 3])
print(client.v2_client.last_request_id)
The request_id is automatically threaded through Sentry breadcrumbs
when the SDK's Sentry integration is enabled.
BC policy
- SYN-7082 narrowing:
to_task/steps/fetch_tasks.pyanddataset/action.py_download_splitwere narrowed back to the v1 path (they no longer pass acategorytouse_v2). Only theEXPORTcategory remains on v2. This is a deliberate behaviour change vs. SYN-6919, not a regression bug — the v2 branches stay in source as dead code pending an explicit_V2_CATEGORY_POLICYopt-in. - The kill-switch (
SYNAPSE_FORCE_V1_EXPORT=1) keeps every caller — includingEXPORT— on the v1 path indefinitely. - The handler
get_results(client, filters, *, ctx=None)signature gains a kw-onlyctxparam with aNonedefault — legacy call sites withoutctxkeep the v1 behaviour. _collect_then_bulkaccepts both dict-shaped slim rows and Pydantic list models for the defaultidextraction path.- All new v2 resource methods are additive — no v1 method was removed or repurposed.
Plugins built against the existing surface will continue to work without modification. Authors who want to opt into the v2 path can update their callers using the cheat sheet above.