# Steps & Workflow

Step 프레임워크는 복잡한 워크플로우를 재사용 가능한 단위로 분리하고, 자동 progress 추적과 실패 rollback 을 제공합니다.

## 개요[​](#개요 "개요에 대한 직접 링크")

Step 기반 워크플로우는 다음 이점을 제공합니다:

* **모듈성**: 복잡한 작업을 독립적인 step 으로 분할
* **Progress 추적**: `progress_weight` 기반 자동 진행률 계산
* **에러 복구**: 실패 시 실행된 step 의 자동 rollback
* **재사용성**: 서로 다른 워크플로우에서 step 재사용

### 한눈에 보기[​](#한눈에-보기 "한눈에 보기에 대한 직접 링크")

| 컴포넌트             | 역할                           | Import Path                 |
| -------------------- | ------------------------------ | --------------------------- |
| `BaseStep[C]`        | 추상 step 클래스               | `synapse_sdk.plugins.steps` |
| `StepResult`         | step 실행 결과                 | `synapse_sdk.plugins.steps` |
| `BaseStepContext`    | step 간 공유 상태              | `synapse_sdk.plugins.steps` |
| `StepRegistry[C]`    | step 등록 및 순서 관리         | `synapse_sdk.plugins.steps` |
| `Orchestrator[C]`    | 순차 step 실행 엔진            | `synapse_sdk.plugins.steps` |
| `LoggingStep[C]`     | step 로깅 wrapper              | `synapse_sdk.plugins.steps` |
| `TimingStep[C]`      | 시간 측정 wrapper              | `synapse_sdk.plugins.steps` |
| `ValidationStep[C]`  | context 검증 step              | `synapse_sdk.plugins.steps` |
| `ExportDatasetStep`  | Synapse 에서 데이터셋 다운로드 | `synapse_sdk.plugins.steps` |
| `ConvertDatasetStep` | 데이터셋 포맷 변환             | `synapse_sdk.plugins.steps` |

## 핵심 컴포넌트[​](#핵심-컴포넌트 "핵심 컴포넌트에 대한 직접 링크")

### BaseStep[​](#basestep "BaseStep에 대한 직접 링크")

`BaseStep[C]` 는 모든 step 의 추상 base 클래스입니다. 타입 파라미터 `C` 는 `BaseStepContext` 를 확장하는 context 타입입니다.

```python
class BaseStep[C: BaseStepContext](ABC):
    @property
    @abstractmethod
    def name(self) -> str:
        """Unique step identifier."""
        ...

    @property
    @abstractmethod
    def progress_weight(self) -> float:
        """Progress weight (0.0-1.0)."""
        ...

    @abstractmethod
    def execute(self, context: C) -> StepResult:
        """Execute the step."""
        ...

    def can_skip(self, context: C) -> bool:
        """Skip condition (default: False)."""
        return False

    def rollback(self, context: C, result: StepResult) -> None:
        """Rollback logic (default: no-op)."""
        pass

```

**필수 구현:**

| 프로퍼티/메서드    | 타입         | 설명                                        |
| ------------------ | ------------ | ------------------------------------------- |
| `name`             | `str`        | 고유 step 식별자                            |
| `progress_weight`  | `float`      | 전체 progress 중 이 step 의 비중 (0.0\~1.0) |
| `execute(context)` | `StepResult` | step 로직 실행                              |

**선택적 오버라이드:**

| 메서드                      | 기본값  | 설명              |
| --------------------------- | ------- | ----------------- |
| `can_skip(context)`         | `False` | 조건부 skip 로직  |
| `rollback(context, result)` | No-op   | 실패 시 정리 로직 |

### StepResult[​](#stepresult "StepResult에 대한 직접 링크")

`StepResult` 는 step 실행 결과를 담는 dataclass 입니다.

```python
@dataclass
class StepResult:
    success: bool = True
    data: dict[str, Any] = field(default_factory=dict)
    error: str | None = None
    rollback_data: dict[str, Any] = field(default_factory=dict)
    skipped: bool = False
    timestamp: datetime = field(default_factory=datetime.now)

```

| 필드            | 타입             | 기본값  | 설명                      |
| --------------- | ---------------- | ------- | ------------------------- |
| `success`       | `bool`           | `True`  | step 성공 여부            |
| `data`          | `dict[str, Any]` | `{}`    | 결과 데이터               |
| `error`         | `str \| None`    | `None`  | 실패 시 에러 메시지       |
| `rollback_data` | `dict[str, Any]` | `{}`    | rollback 에 필요한 데이터 |
| `skipped`       | `bool`           | `False` | step skip 여부            |
| `timestamp`     | `datetime`       | `now()` | 완료 시각                 |

### BaseStepContext[​](#basestepcontext "BaseStepContext에 대한 직접 링크")

`BaseStepContext` 는 step 간 공유 상태를 위한 base 클래스입니다.

```python
@dataclass
class BaseStepContext:
    runtime_ctx: RuntimeContext
    step_results: list[StepResult] = field(default_factory=list)
    errors: list[str] = field(default_factory=list)
    current_step: str | None = field(default=None, init=False)
    current_step_order: int | None = field(default=None, init=False)

    def log(self, event: str, data: dict[str, Any], file: str | None = None) -> None:
        """Log an event."""
        ...

    def set_progress(self, current: int, total: int, step: str | None = None) -> None:
        """Set progress. Uses current_step as step if not provided."""
        ...

    def set_metrics(self, value: dict[str, Any], step: str | None = None) -> None:
        """Set metrics. Uses current_step as step if not provided."""
        ...

```

> **알아두기**: step 내부에서 `step` 을 지정하지 않고 `set_progress()` 또는 `set_metrics()` 를 호출하면, 현재 step 의 `name` 이 자동으로 step 으로 사용됩니다.

### StepRegistry[​](#stepregistry "StepRegistry에 대한 직접 링크")

`StepRegistry[C]` 는 step 등록과 실행 순서를 관리합니다.

```python
class StepRegistry[C: BaseStepContext]:
    def register(self, step: BaseStep[C]) -> None:
        """Add step to the end."""

    def unregister(self, name: str) -> None:
        """Remove step by name."""

    def get_steps(self) -> list[BaseStep[C]]:
        """Get registered step list."""

    def insert_after(self, after_name: str, step: BaseStep[C]) -> None:
        """Insert step after another step."""

    def insert_before(self, before_name: str, step: BaseStep[C]) -> None:
        """Insert step before another step."""

    def get_step_orders(self) -> dict[str, int]:
        """Get 0-based order mapping for all steps."""

    @property
    def total_weight(self) -> float:
        """Sum of all step progress_weights."""

```

### Orchestrator[​](#orchestrator "Orchestrator에 대한 직접 링크")

`Orchestrator[C]` 는 등록된 step 을 순차 실행하고 실패 시 rollback 을 처리합니다.

progress\_callback 은 필수입니다

`progress_callback` 파라미터는 올바른 job status 관리를 위해 **필수** 입니다. 이것이 없으면 backend 가 progress 업데이트를 받지 못하고, 완료 시 **job status 가 'success' 로 전이되지 않습니다**.

```python
class Orchestrator[C: BaseStepContext]:
    def __init__(
        self,
        registry: StepRegistry[C],
        context: C,
        progress_callback: Callable[[int, int], None],  # Required!
    ) -> None:
        """Initialize orchestrator.

        Args:
            registry: StepRegistry with steps to execute.
            context: Shared context for steps.
            progress_callback: Callback(current, total) for progress updates.
                Required for proper job status management.
                Typically: lambda curr, total: self.set_progress(curr, total)
        """

    def execute(self) -> dict[str, Any]:
        """Execute all steps.

        Returns:
            Dict with success status and execution info.

        Raises:
            RuntimeError: On step failure (after rollback).
        """

```

## Step 생성[​](#step-생성 "Step 생성에 대한 직접 링크")

### 기본 Step[​](#기본-step "기본 Step에 대한 직접 링크")

`BaseStep` 을 서브클래싱해 커스텀 step 을 정의합니다.

```python
from dataclasses import dataclass, field
from synapse_sdk.plugins.steps import BaseStep, BaseStepContext, StepResult


@dataclass
class MyContext(BaseStepContext):
    """Custom context."""
    data: list[str] = field(default_factory=list)
    processed_count: int = 0


class LoadDataStep(BaseStep[MyContext]):
    """Data loading step."""

    @property
    def name(self) -> str:
        return 'load_data'

    @property
    def progress_weight(self) -> float:
        return 0.3  # 30% of total progress

    def execute(self, context: MyContext) -> StepResult:
        # Update progress (automatically uses 'load_data' as step)
        context.set_progress(0, 100)

        # Load data logic
        context.data = ['item1', 'item2', 'item3']

        context.set_progress(100, 100)
        return StepResult(success=True, data={'count': len(context.data)})

```

### Rollback 이 있는 Step[​](#rollback-이-있는-step "Rollback 이 있는 Step에 대한 직접 링크")

실패 시 정리가 필요한 step 은 `rollback()` 을 구현하세요.

```python
from pathlib import Path
from synapse_sdk.plugins.steps import BaseStep, StepResult


class CreateFilesStep(BaseStep[MyContext]):
    """File creation step (with rollback support)."""

    @property
    def name(self) -> str:
        return 'create_files'

    @property
    def progress_weight(self) -> float:
        return 0.2

    def execute(self, context: MyContext) -> StepResult:
        created_files = []

        for item in context.data:
            path = Path(f'/tmp/{item}.txt')
            path.write_text(item)
            created_files.append(str(path))

        # Store info needed for rollback
        return StepResult(
            success=True,
            data={'files': created_files},
            rollback_data={'created_files': created_files},
        )

    def rollback(self, context: MyContext, result: StepResult) -> None:
        """Delete created files."""
        for file_path in result.rollback_data.get('created_files', []):
            Path(file_path).unlink(missing_ok=True)
        context.log('rollback', {'step': self.name, 'files_removed': len(result.rollback_data.get('created_files', []))})

```

### 조건부 Skip[​](#조건부-skip "조건부 Skip에 대한 직접 링크")

조건부 step 실행은 `can_skip()` 을 구현하세요.

```python
class CacheStep(BaseStep[MyContext]):
    """Cache loading step (skip if cache exists)."""

    @property
    def name(self) -> str:
        return 'load_cache'

    @property
    def progress_weight(self) -> float:
        return 0.1

    def can_skip(self, context: MyContext) -> bool:
        # Skip if data already loaded
        return len(context.data) > 0

    def execute(self, context: MyContext) -> StepResult:
        # Cache loading logic (implement your own cache loading)
        context.data = ['cached_item1', 'cached_item2']
        return StepResult(success=True)

```

## 워크플로우 구축[​](#워크플로우-구축 "워크플로우 구축에 대한 직접 링크")

### 기본 워크플로우[​](#기본-워크플로우 "기본 워크플로우에 대한 직접 링크")

```python
from synapse_sdk.plugins.steps import Orchestrator, StepRegistry


# 1. Create context
context = MyContext(runtime_ctx=runtime_ctx)

# 2. Register steps
registry = StepRegistry[MyContext]()
registry.register(LoadDataStep())
registry.register(CreateFilesStep())
registry.register(ProcessStep())

# 3. Execute with orchestrator (progress_callback is required!)
orchestrator = Orchestrator(
    registry=registry,
    context=context,
    progress_callback=lambda curr, total: context.set_progress(curr, total),
)
result = orchestrator.execute()

print(result)
# {'success': True, 'steps_executed': 3, 'steps_total': 3}

```

### 동적 Step 삽입[​](#동적-step-삽입 "동적 Step 삽입에 대한 직접 링크")

기존 워크플로우에 step 을 동적으로 삽입합니다.

```python
registry = StepRegistry[MyContext]()
registry.register(LoadDataStep())
registry.register(ProcessStep())
registry.register(SaveStep())

# Insert validation step before 'process'
registry.insert_before('process', ValidateStep())

# Insert audit step after 'process'
registry.insert_after('process', AuditStep())

# Execution order: load_data -> validate -> process -> audit -> save

```

기존 step 을 로깅으로 감싸려면 처음부터 `LoggingStep` 으로 등록하세요:

```python
registry = StepRegistry[MyContext]()
registry.register(LoggingStep(LoadDataStep()))
registry.register(LoggingStep(ProcessStep()))
registry.register(LoggingStep(SaveStep()))

# All steps are wrapped with logging from registration

```

> **알아두기**: `LoggingStep` 은 step 이름에 `logged_` 접두사를 붙입니다 (예: `load_data` 는 `logged_load_data` 가 됨). wrapping 된 step 으로 동적 삽입을 할 때는 접두사가 붙은 이름을 참조하세요: `registry.insert_before('logged_process', ValidateStep())`.

### Fixed Step Registry (export 위임)[​](#fixed-step-registry-export-delegation "Fixed Step Registry (export 위임)에 대한 직접 링크")

`setup_steps()` 는 위임 여부로 **분기하지 않습니다** — **항상 단일 고정 9-step registry** 를 등록합니다. 9-step 은 두 acquisition head + 하나의 공유 transform tail 로 구성됩니다:

* **In-process head**: `Initialize → FetchResults → PrepareExport` — v2 list → bulk-fetch 경로로 데이터를 취득합니다.
* **Delegated head**: `DelegateEnqueue → DelegateStreamProgress → BundleRead` — backend async-job 을 enqueue 하고 SSE 진행률을 중계한 뒤 raw bundle 을 읽습니다.
* **Shared tail (plugin transform)**: `ConvertData → SaveFiles → Finalize` — **두 head 가 합류하는 동일 step 클래스**로, 플러그인 커스텀 transform 이 위임/비위임 양쪽에서 동일하게 실행됩니다(FR-8 parity).

위임 결정은 `run()` 시점에 `_resolve_delegation_decision()` 으로 **한 번** 평가되어 `context.delegation_decision` 에 seed 됩니다. 게이트는 다음 조건을 **모두** 만족할 때만 발동합니다: kill-switch 미설정, `v2_client` 존재, 프로브한 `count` 가 임계를 **엄격 초과**(`> 1000`), `project_id` 가용. 각 head step 의 `can_skip` 이 이 seed 된 결정을 참조하므로, **비활성 head** 는 완료-pass (skip → 100%, orchestrator 가 보고)되고 **활성 head** 만 실행됩니다.

```python
def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
    # 항상 동일한 고정 9-step registry 를 등록 — 분기 없음.
    # _resolve_delegation_decision() 은 run() 시점에 1회 평가되어
    # context.delegation_decision 에 seed 되고, 각 head step 의 can_skip()
    # 이 이를 참조해 비활성 head 를 완료-pass(skip → 100%) 처리한다.

    # In-process acquisition head — 비위임 시 활성
    registry.register(InitializeStep())            # 위임 시 can_skip
    registry.register(FetchResultsStep())          # v2 list → bulk-fetch
    registry.register(PrepareExportStep())

    # Delegated acquisition head — 위임 시 활성
    registry.register(DelegateEnqueueStep())       # plugin_exports.create
    registry.register(DelegateStreamProgressStep())  # async_jobs.stream_progress (SSE)
    registry.register(BundleReadStep())            # raw bundle 읽기 → tail

    # Shared transform tail — 양쪽 경로 공통 실행 (FR-8 parity)
    registry.register(ConvertDataStep())           # plugin 커스텀 transform
    registry.register(SaveFilesStep())
    registry.register(FinalizeStep())

```

두 head 는 **하나의 공유 transform tail 로 합류**하므로, 위임/비위임 export 는 **동일한 plugin 포맷 산출물**을 만듭니다 — *데이터 취득 경로*만 다릅니다. raw bundle 은 delegated head 의 중간 산출물이며, `BundleReadStep` 이 이를 읽어 공유 tail 로 전달하면 `ConvertData`/`SaveFiles` 가 in-process 경로와 동일하게 처리합니다.

> **알아두기**: 단일 고정 registry 는 모드와 무관하게 job 의 step 목록을 안정적으로 유지합니다. 종전의 "분기형 registry" 모델은 경로별로 다른 step 집합을 등록해 긴 acquisition 이 job 전체 진행률을 정체시켰는데, 고정 registry 는 비활성 head 의 step 만 skip(완료-pass)함으로써 이를 제거합니다.

전체 게이트(임계, kill-switch), 위임 raw-bundle 산출물, 관측성 계약은 [Export Actions — 대량 export 의 서버사이드 위임](/ko/docs/plugins/action-types/export-actions#%EB%8C%80%EB%9F%89-export-%EC%9D%98-%EC%84%9C%EB%B2%84%EC%82%AC%EC%9D%B4%EB%93%9C-%EC%9C%84%EC%9E%84-backend-async-job) 을 참고하세요.

### Progress Callback[​](#progress-callback "Progress Callback에 대한 직접 링크")

`progress_callback` 은 **필수** 이며, job status 관리를 위해 backend 에 progress 를 리포팅합니다.

```python
# In an action class, use self.set_progress for proper backend reporting
orchestrator = Orchestrator(
    registry=registry,
    context=context,
    progress_callback=lambda curr, total: self.set_progress(curr, total),
)
orchestrator.execute()

# Progress is reported to backend at each step completion:
# 30%   (LoadDataStep complete, weight=0.3)
# 50%   (CreateFilesStep complete, weight=0.2)
# 100%  (ProcessStep complete, weight=0.5)

```

action 인스턴스가 없는 standalone 워크플로우에서는 context 의 `set_progress` 를 사용할 수 있습니다:

```python
orchestrator = Orchestrator(
    registry=registry,
    context=context,
    progress_callback=lambda curr, total: context.set_progress(curr, total),
)

```

## Action 에서 Step 사용[​](#action-에서-step-사용 "Action 에서 Step 사용에 대한 직접 링크")

`BaseTrainAction` 은 `setup_steps()` 메서드를 통해 step 기반 실행을 지원합니다.

### BaseTrainAction 통합[​](#basetrainaction-통합 "BaseTrainAction 통합에 대한 직접 링크")

```python
from synapse_sdk.plugins import BaseTrainAction
from synapse_sdk.plugins.actions.train import TrainContext
from synapse_sdk.plugins.steps import StepRegistry


class MyTrainAction(BaseTrainAction[MyTrainParams]):
    """Step-based training action."""

    def setup_steps(self, registry: StepRegistry[TrainContext]) -> None:
        """Register workflow steps."""
        registry.register(LoadDatasetStep())
        registry.register(PreprocessStep())
        registry.register(TrainModelStep())
        registry.register(UploadModelStep())

    # Don't implement execute() - step-based execution is automatic when steps are registered

```

> **알아두기**: `setup_steps()` 가 step 을 하나라도 등록하면, `execute()` 호출 대신 step 기반 실행이 자동으로 활성화됩니다.

### 커스텀 Context[​](#커스텀-context "커스텀 Context에 대한 직접 링크")

추가 상태 필드를 더하려면 `TrainContext` 를 확장하세요.

```python
from dataclasses import dataclass, field
from synapse_sdk.plugins.actions.train import TrainContext


@dataclass
class MyTrainContext(TrainContext):
    """Extended training context."""
    augmentation_config: dict = field(default_factory=dict)
    validation_results: list = field(default_factory=list)

```

## 내장 Step 유틸리티[​](#내장-step-유틸리티 "내장 Step 유틸리티에 대한 직접 링크")

### LoggingStep[​](#loggingstep "LoggingStep에 대한 직접 링크")

step 실행을 타이밍과 함께 자동으로 로깅합니다.

```python
from synapse_sdk.plugins.steps import LoggingStep

# Wrap existing step with logging
logged_step = LoggingStep(ProcessStep())
registry.register(logged_step)

# Log output:
# step_start: {'step': 'process'}
# step_end: {'step': 'process', 'elapsed': 1.234, 'success': True}

```

**LoggingStep 이 로깅하는 이벤트:**

| 이벤트          | 타이밍      | 데이터                                                                 |
| --------------- | ----------- | ---------------------------------------------------------------------- |
| `step_start`    | 실행 전     | `{'step': name}`                                                       |
| `step_end`      | 실행 후     | `{'step': name, 'elapsed': seconds, 'success': bool, 'skipped': bool}` |
| `step_rollback` | rollback 시 | `{'step': name}`                                                       |

### TimingStep[​](#timingstep "TimingStep에 대한 직접 링크")

실행 시간을 측정해 step 의 결과 데이터에 추가합니다.

```python
from synapse_sdk.plugins.steps import TimingStep

timed_step = TimingStep(ProcessStep())
registry.register(timed_step)

# After execution, the StepResult.data includes duration_seconds
# Access via context.step_results[-1].data['duration_seconds']

```

> **알아두기**: `TimingStep` 은 step 이름에 `timed_` 접두사를 붙입니다 (예: `process` 는 `timed_process` 가 됨).

### ValidationStep[​](#validationstep "ValidationStep에 대한 직접 링크")

context 검증 step 을 손쉽게 생성합니다.

```python
from synapse_sdk.plugins.steps import ValidationStep


def check_data_loaded(ctx: MyContext) -> tuple[bool, str | None]:
    """Validate data is loaded."""
    if not ctx.data:
        return False, 'No data loaded'
    return True, None


# Create ValidationStep
validation = ValidationStep(
    validator=check_data_loaded,
    name='validate_data',
    progress_weight=0.05,
)
registry.register(validation)

```

## 내장 Dataset Step[​](#내장-dataset-step "내장 Dataset Step에 대한 직접 링크")

SDK 는 공통 데이터셋 작업을 위한 재사용 가능한 step 을 제공합니다. 이들은 `target_format` 이 설정되면 `BaseTrainAction` 이 자동으로 사용하지만, 커스텀 워크플로우에서 직접 사용할 수도 있습니다.

### ExportDatasetStep[​](#exportdatasetstep "ExportDatasetStep에 대한 직접 링크")

Synapse backend 에서 데이터셋을 다운로드합니다. `context.params` 에서 `dataset` 을 읽어 결과를 `context.dataset` 에 저장합니다.

```python
from synapse_sdk.plugins.steps import ExportDatasetStep

registry.register(ExportDatasetStep())

```

**동작:**

* `context.params` 에서 `dataset`, `splits`, `output_dir` 을 읽음
* 데이터셋을 로컬 파일시스템으로 다운로드
* 결과를 `context.dataset` 에 `path`, `format`, `is_categorized`, `count` 키로 저장

### ConvertDatasetStep[​](#convertdatasetstep "ConvertDatasetStep에 대한 직접 링크")

데이터셋을 포맷 간 변환합니다. (`ExportDatasetStep` 이 설정한) `context.dataset` 에서 읽어 변환된 path 로 갱신합니다.

```python
from synapse_sdk.plugins.steps import ConvertDatasetStep

# Convert to YOLO format
registry.register(ConvertDatasetStep(target_format='yolo'))

# Convert to COCO format
registry.register(ConvertDatasetStep(target_format='coco'))

```

**생성자 인자:**

| 인자            | 타입  | 기본값    | 설명                      |
| --------------- | ----- | --------- | ------------------------- |
| `target_format` | `str` | `'yolo'`  | 타깃 포맷 (yolo, coco 등) |
| `source_format` | `str` | `'dm_v2'` | 소스 포맷                 |

**동작:**

* 이전 step 의 `context.dataset['path']` 를 읽음
* 타깃 포맷으로 변환
* `context.dataset` 을 `path`, `config_path`, `format`, `is_categorized` 로 갱신

### 커스텀 워크플로우에서 사용[​](#커스텀-워크플로우에서-사용 "커스텀 워크플로우에서 사용에 대한 직접 링크")

```python
from synapse_sdk.plugins.steps import ExportDatasetStep, ConvertDatasetStep, StepRegistry
from synapse_sdk.plugins.actions.train import TrainContext

def setup_steps(self, registry: StepRegistry[TrainContext]) -> None:
    # Use SDK steps for data handling
    registry.register(ExportDatasetStep())
    registry.register(ConvertDatasetStep(target_format='yolo'))
    
    # Add custom preprocessing
    registry.register(MyAugmentationStep())
    
    # Custom training step
    registry.register(MyTrainStep())

```

> **알아두기**: `target_format` 이 설정된 `BaseTrainAction` 을 사용하면 이 step 들이 자동으로 등록됩니다. 커스텀 동작이 필요할 때만 `setup_steps()` 를 오버라이드하세요.

## Progress 추적[​](#progress-추적 "Progress 추적에 대한 직접 링크")

### Weight 기반 Progress[​](#weight-기반-progress "Weight 기반 Progress에 대한 직접 링크")

`progress_weight` 는 각 step 이 전체 progress 에서 차지하는 비중을 나타냅니다.

```python
class LoadStep(BaseStep[MyContext]):
    @property
    def progress_weight(self) -> float:
        return 0.2  # 20%

class TrainStep(BaseStep[MyContext]):
    @property
    def progress_weight(self) -> float:
        return 0.7  # 70%

class SaveStep(BaseStep[MyContext]):
    @property
    def progress_weight(self) -> float:
        return 0.1  # 10%

```

Orchestrator 는 각 step 완료 후 누적 weight 를 기반으로 progress 를 계산합니다.

<!-- -->

### Step 내부 Progress[​](#step-내부-progress "Step 내부 Progress에 대한 직접 링크")

step 내부에서 상세 progress 를 리포팅합니다.

```python
class TrainStep(BaseStep[TrainContext]):
    def execute(self, context: TrainContext) -> StepResult:
        epochs = context.params.get('epochs', 10)

        for epoch in range(epochs):
            train_one_epoch()
            # Per-step progress (automatically uses 'train' as step)
            context.set_progress(epoch + 1, epochs)
            context.set_metrics({'loss': 0.1, 'epoch': epoch})

        return StepResult(success=True)

```

## 에러 처리 & Rollback[​](#에러-처리--rollback "에러 처리 & Rollback에 대한 직접 링크")

### 자동 Rollback[​](#자동-rollback "자동 Rollback에 대한 직접 링크")

step 실패 시 Orchestrator 는 이전에 실행된 모든 step 에 대해 역순으로 `rollback()` 을 호출합니다.

```text
Execution order:  Step1 -> Step2 -> Step3 (failed!)
Rollback order:   Step2.rollback() -> Step1.rollback()

```

```python
class Step1(BaseStep[MyContext]):
    def execute(self, context: MyContext) -> StepResult:
        context.log('step1', {'action': 'created resource A'})
        return StepResult(success=True, rollback_data={'resource': 'A'})

    def rollback(self, context: MyContext, result: StepResult) -> None:
        resource = result.rollback_data.get('resource')
        context.log('rollback', {'action': f'deleted resource {resource}'})


class Step2(BaseStep[MyContext]):
    def execute(self, context: MyContext) -> StepResult:
        context.log('step2', {'action': 'created resource B'})
        return StepResult(success=True, rollback_data={'resource': 'B'})

    def rollback(self, context: MyContext, result: StepResult) -> None:
        resource = result.rollback_data.get('resource')
        context.log('rollback', {'action': f'deleted resource {resource}'})


class Step3(BaseStep[MyContext]):
    def execute(self, context: MyContext) -> StepResult:
        # Failure!
        return StepResult(success=False, error='Something went wrong')

```

**실행 결과:**

```text
step1: {'action': 'created resource A'}
step2: {'action': 'created resource B'}
step3: failed
rollback: {'action': 'deleted resource B'}  # Step2 rollback
rollback: {'action': 'deleted resource A'}  # Step1 rollback
RuntimeError: Step 'step3' failed: Something went wrong

```

> **알아두기**: rollback 중 발생한 에러는 다른 step 의 rollback 을 막지 않습니다 (best-effort rollback).

### 예외 처리[​](#예외-처리 "예외 처리에 대한 직접 링크")

`execute()` 내부의 예외는 자동으로 `StepResult(success=False, error=str(e))` 로 변환됩니다.

```python
class RiskyStep(BaseStep[MyContext]):
    def execute(self, context: MyContext) -> StepResult:
        # Exception automatically converted to failure
        raise ValueError('Invalid input')

# Result: StepResult(success=False, error='Invalid input')
# Then rollback proceeds

```

## 예제: 다단계 학습[​](#예제-다단계-학습 "예제: 다단계 학습에 대한 직접 링크")

완전한 학습 워크플로우 예제입니다.

```python
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

from synapse_sdk.plugins import BaseTrainAction
from synapse_sdk.plugins.actions.train import BaseTrainParams, TrainContext
from synapse_sdk.plugins.steps import BaseStep, StepRegistry, StepResult, LoggingStep


class MyTrainParams(BaseTrainParams):
    """Training parameters."""
    dataset: int
    epochs: int = 100
    batch_size: int = 32


# Step definitions
class LoadDatasetStep(BaseStep[TrainContext]):
    @property
    def name(self) -> str:
        return 'load_dataset'

    @property
    def progress_weight(self) -> float:
        return 0.1

    def execute(self, context: TrainContext) -> StepResult:
        dataset = context.params.get('dataset')
        context.log('dataset_load_start', {'dataset': dataset})

        # Load dataset
        context.dataset = context.client.get_data_collection(dataset)

        context.log('dataset_load_complete', {'count': len(context.dataset)})
        return StepResult(success=True, data={'dataset_size': len(context.dataset)})


class TrainModelStep(BaseStep[TrainContext]):
    @property
    def name(self) -> str:
        return 'train'

    @property
    def progress_weight(self) -> float:
        return 0.7

    def execute(self, context: TrainContext) -> StepResult:
        epochs = context.params.get('epochs', 100)

        for epoch in range(epochs):
            # Train one epoch (implement your own training logic)
            loss = 1.0 / (epoch + 1)  # Example: decreasing loss
            context.set_progress(epoch + 1, epochs)
            context.set_metrics({'loss': loss, 'epoch': epoch})

        context.model_path = '/tmp/model.pt'
        return StepResult(
            success=True,
            data={'final_loss': loss},
            rollback_data={'model_path': context.model_path},
        )

    def rollback(self, context: TrainContext, result: StepResult) -> None:
        # Clean up temporary files created during training
        model_path = result.rollback_data.get('model_path')
        if model_path:
            Path(model_path).unlink(missing_ok=True)


class UploadModelStep(BaseStep[TrainContext]):
    @property
    def name(self) -> str:
        return 'upload_model'

    @property
    def progress_weight(self) -> float:
        return 0.2

    def execute(self, context: TrainContext) -> StepResult:
        context.set_progress(0, 100)

        model = context.client.create_model({
            'file': context.model_path,
            'name': 'trained-model',
        })
        context.model = model

        context.set_progress(100, 100)
        return StepResult(success=True, data={'model_id': model['id']})


# Action definition
class MyTrainAction(BaseTrainAction[MyTrainParams]):
    """Step-based training action."""

    def setup_steps(self, registry: StepRegistry[TrainContext]) -> None:
        # Wrap all steps with logging
        registry.register(LoggingStep(LoadDatasetStep()))
        registry.register(LoggingStep(TrainModelStep()))
        registry.register(LoggingStep(UploadModelStep()))

```

## 모범 사례[​](#모범-사례 "모범 사례에 대한 직접 링크")

### Step 설계[​](#step-설계 "Step 설계에 대한 직접 링크")

* **단일 책임**: 각 step 은 하나의 명확한 작업만 수행해야 합니다
* **재사용성 고려**: 다른 워크플로우에서도 쓸 수 있도록 step 을 설계하세요
* **적절한 단위**: 너무 작거나 너무 큰 step 을 피하세요

### Progress Weight[​](#progress-weight "Progress Weight에 대한 직접 링크")

* **실제 시간 반영**: 실제 실행 시간 비율에 맞춰 weight 를 설정하세요
* **합계 고려**: 모든 step weight 의 합이 1.0 에 가깝도록 설계하세요
* **빠른 step**: 검증/초기화처럼 빠른 step 에는 작은 weight (0.05\~0.1) 를 사용하세요

### Rollback 구현[​](#rollback-구현 "Rollback 구현에 대한 직접 링크")

* **멱등성 보장**: rollback 은 여러 번 호출해도 안전해야 합니다
* **부분 실패 처리**: 실행이 중간에 실패한 경우를 고려하세요
* **필요 정보 저장**: 필요한 모든 정보를 `rollback_data` 에 포함하세요

```python
# Good: Idempotent rollback
def rollback(self, context: MyContext, result: StepResult) -> None:
    path = Path(result.rollback_data.get('file_path', ''))
    path.unlink(missing_ok=True)  # No error if file doesn't exist

```

### Context 상태 관리[​](#context-상태-관리 "Context 상태 관리에 대한 직접 링크")

* **최소 상태**: 필요한 상태만 context 에 저장하세요
* **불변 params**: step 에서 `params` 를 수정하지 마세요
* **로깅 활용**: 중요한 상태 변화를 `log()` 로 기록하세요

## 관련 문서[​](#관련-문서 "관련 문서에 대한 직접 링크")

* [Defining Actions](/ko/plugins/defining-actions.md) - Action 정의 방법
* [RuntimeContext](/ko/plugins/runtime-context.md) - RuntimeContext API
* [Pipelines](/ko/plugins/pipelines.md) - 파이프라인 설정
