본문으로 건너뛰기

Steps & Workflow

Step 프레임워크는 복잡한 워크플로우를 재사용 가능한 단위로 분리하고, 자동 progress 추적과 실패 rollback 을 제공합니다.

개요

Step 기반 워크플로우는 다음 이점을 제공합니다:

  • 모듈성: 복잡한 작업을 독립적인 step 으로 분할
  • Progress 추적: progress_weight 기반 자동 진행률 계산
  • 에러 복구: 실패 시 실행된 step 의 자동 rollback
  • 재사용성: 서로 다른 워크플로우에서 step 재사용

한눈에 보기

컴포넌트역할Import Path
BaseStep[C]추상 step 클래스synapse_sdk.plugins.steps
StepResultstep 실행 결과synapse_sdk.plugins.steps
BaseStepContextstep 간 공유 상태synapse_sdk.plugins.steps
StepRegistry[C]step 등록 및 순서 관리synapse_sdk.plugins.steps
Orchestrator[C]순차 step 실행 엔진synapse_sdk.plugins.steps
LoggingStep[C]step 로깅 wrappersynapse_sdk.plugins.steps
TimingStep[C]시간 측정 wrappersynapse_sdk.plugins.steps
ValidationStep[C]context 검증 stepsynapse_sdk.plugins.steps
ExportDatasetStepSynapse 에서 데이터셋 다운로드synapse_sdk.plugins.steps
ConvertDatasetStep데이터셋 포맷 변환synapse_sdk.plugins.steps

핵심 컴포넌트

BaseStep

BaseStep[C] 는 모든 step 의 추상 base 클래스입니다. 타입 파라미터 CBaseStepContext 를 확장하는 context 타입입니다.

class BaseStep[C: BaseStepContext](ABC):
@property
@abstractmethod
def name(self) -> str:
"""Unique step identifier."""
...

@property
@abstractmethod
def progress_weight(self) -> float:
"""Progress weight (0.0-1.0)."""
...

@abstractmethod
def execute(self, context: C) -> StepResult:
"""Execute the step."""
...

def can_skip(self, context: C) -> bool:
"""Skip condition (default: False)."""
return False

def rollback(self, context: C, result: StepResult) -> None:
"""Rollback logic (default: no-op)."""
pass

필수 구현:

프로퍼티/메서드타입설명
namestr고유 step 식별자
progress_weightfloat전체 progress 중 이 step 의 비중 (0.0~1.0)
execute(context)StepResultstep 로직 실행

선택적 오버라이드:

메서드기본값설명
can_skip(context)False조건부 skip 로직
rollback(context, result)No-op실패 시 정리 로직

StepResult

StepResult 는 step 실행 결과를 담는 dataclass 입니다.

@dataclass
class StepResult:
success: bool = True
data: dict[str, Any] = field(default_factory=dict)
error: str | None = None
rollback_data: dict[str, Any] = field(default_factory=dict)
skipped: bool = False
timestamp: datetime = field(default_factory=datetime.now)
필드타입기본값설명
successboolTruestep 성공 여부
datadict[str, Any]{}결과 데이터
errorstr | NoneNone실패 시 에러 메시지
rollback_datadict[str, Any]{}rollback 에 필요한 데이터
skippedboolFalsestep skip 여부
timestampdatetimenow()완료 시각

BaseStepContext

BaseStepContext 는 step 간 공유 상태를 위한 base 클래스입니다.

@dataclass
class BaseStepContext:
runtime_ctx: RuntimeContext
step_results: list[StepResult] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
current_step: str | None = field(default=None, init=False)
current_step_order: int | None = field(default=None, init=False)

def log(self, event: str, data: dict[str, Any], file: str | None = None) -> None:
"""Log an event."""
...

def set_progress(self, current: int, total: int, step: str | None = None) -> None:
"""Set progress. Uses current_step as step if not provided."""
...

def set_metrics(self, value: dict[str, Any], step: str | None = None) -> None:
"""Set metrics. Uses current_step as step if not provided."""
...

알아두기: step 내부에서 step 을 지정하지 않고 set_progress() 또는 set_metrics() 를 호출하면, 현재 step 의 name 이 자동으로 step 으로 사용됩니다.

StepRegistry

StepRegistry[C] 는 step 등록과 실행 순서를 관리합니다.

class StepRegistry[C: BaseStepContext]:
def register(self, step: BaseStep[C]) -> None:
"""Add step to the end."""

def unregister(self, name: str) -> None:
"""Remove step by name."""

def get_steps(self) -> list[BaseStep[C]]:
"""Get registered step list."""

def insert_after(self, after_name: str, step: BaseStep[C]) -> None:
"""Insert step after another step."""

def insert_before(self, before_name: str, step: BaseStep[C]) -> None:
"""Insert step before another step."""

def get_step_orders(self) -> dict[str, int]:
"""Get 0-based order mapping for all steps."""

@property
def total_weight(self) -> float:
"""Sum of all step progress_weights."""

Orchestrator

Orchestrator[C] 는 등록된 step 을 순차 실행하고 실패 시 rollback 을 처리합니다.

progress_callback 은 필수입니다

progress_callback 파라미터는 올바른 job status 관리를 위해 필수 입니다. 이것이 없으면 backend 가 progress 업데이트를 받지 못하고, 완료 시 job status 가 'success' 로 전이되지 않습니다.

class Orchestrator[C: BaseStepContext]:
def __init__(
self,
registry: StepRegistry[C],
context: C,
progress_callback: Callable[[int, int], None], # Required!
) -> None:
"""Initialize orchestrator.

Args:
registry: StepRegistry with steps to execute.
context: Shared context for steps.
progress_callback: Callback(current, total) for progress updates.
Required for proper job status management.
Typically: lambda curr, total: self.set_progress(curr, total)
"""

def execute(self) -> dict[str, Any]:
"""Execute all steps.

Returns:
Dict with success status and execution info.

Raises:
RuntimeError: On step failure (after rollback).
"""

Step 생성

기본 Step

BaseStep 을 서브클래싱해 커스텀 step 을 정의합니다.

from dataclasses import dataclass, field
from synapse_sdk.plugins.steps import BaseStep, BaseStepContext, StepResult


@dataclass
class MyContext(BaseStepContext):
"""Custom context."""
data: list[str] = field(default_factory=list)
processed_count: int = 0


class LoadDataStep(BaseStep[MyContext]):
"""Data loading step."""

@property
def name(self) -> str:
return 'load_data'

@property
def progress_weight(self) -> float:
return 0.3 # 30% of total progress

def execute(self, context: MyContext) -> StepResult:
# Update progress (automatically uses 'load_data' as step)
context.set_progress(0, 100)

# Load data logic
context.data = ['item1', 'item2', 'item3']

context.set_progress(100, 100)
return StepResult(success=True, data={'count': len(context.data)})

Rollback 이 있는 Step

실패 시 정리가 필요한 step 은 rollback() 을 구현하세요.

from pathlib import Path
from synapse_sdk.plugins.steps import BaseStep, StepResult


class CreateFilesStep(BaseStep[MyContext]):
"""File creation step (with rollback support)."""

@property
def name(self) -> str:
return 'create_files'

@property
def progress_weight(self) -> float:
return 0.2

def execute(self, context: MyContext) -> StepResult:
created_files = []

for item in context.data:
path = Path(f'/tmp/{item}.txt')
path.write_text(item)
created_files.append(str(path))

# Store info needed for rollback
return StepResult(
success=True,
data={'files': created_files},
rollback_data={'created_files': created_files},
)

def rollback(self, context: MyContext, result: StepResult) -> None:
"""Delete created files."""
for file_path in result.rollback_data.get('created_files', []):
Path(file_path).unlink(missing_ok=True)
context.log('rollback', {'step': self.name, 'files_removed': len(result.rollback_data.get('created_files', []))})

조건부 Skip

조건부 step 실행은 can_skip() 을 구현하세요.

class CacheStep(BaseStep[MyContext]):
"""Cache loading step (skip if cache exists)."""

@property
def name(self) -> str:
return 'load_cache'

@property
def progress_weight(self) -> float:
return 0.1

def can_skip(self, context: MyContext) -> bool:
# Skip if data already loaded
return len(context.data) > 0

def execute(self, context: MyContext) -> StepResult:
# Cache loading logic (implement your own cache loading)
context.data = ['cached_item1', 'cached_item2']
return StepResult(success=True)

워크플로우 구축

기본 워크플로우

from synapse_sdk.plugins.steps import Orchestrator, StepRegistry


# 1. Create context
context = MyContext(runtime_ctx=runtime_ctx)

# 2. Register steps
registry = StepRegistry[MyContext]()
registry.register(LoadDataStep())
registry.register(CreateFilesStep())
registry.register(ProcessStep())

# 3. Execute with orchestrator (progress_callback is required!)
orchestrator = Orchestrator(
registry=registry,
context=context,
progress_callback=lambda curr, total: context.set_progress(curr, total),
)
result = orchestrator.execute()

print(result)
# {'success': True, 'steps_executed': 3, 'steps_total': 3}

동적 Step 삽입

기존 워크플로우에 step 을 동적으로 삽입합니다.

registry = StepRegistry[MyContext]()
registry.register(LoadDataStep())
registry.register(ProcessStep())
registry.register(SaveStep())

# Insert validation step before 'process'
registry.insert_before('process', ValidateStep())

# Insert audit step after 'process'
registry.insert_after('process', AuditStep())

# Execution order: load_data -> validate -> process -> audit -> save

기존 step 을 로깅으로 감싸려면 처음부터 LoggingStep 으로 등록하세요:

registry = StepRegistry[MyContext]()
registry.register(LoggingStep(LoadDataStep()))
registry.register(LoggingStep(ProcessStep()))
registry.register(LoggingStep(SaveStep()))

# All steps are wrapped with logging from registration

알아두기: LoggingStep 은 step 이름에 logged_ 접두사를 붙입니다 (예: load_datalogged_load_data 가 됨). wrapping 된 step 으로 동적 삽입을 할 때는 접두사가 붙은 이름을 참조하세요: registry.insert_before('logged_process', ValidateStep()).

Fixed Step Registry (export 위임)

setup_steps() 는 위임 여부로 분기하지 않습니다항상 단일 고정 9-step registry 를 등록합니다. 9-step 은 두 acquisition head + 하나의 공유 transform tail 로 구성됩니다:

  • In-process head: Initialize → FetchResults → PrepareExport — v2 list → bulk-fetch 경로로 데이터를 취득합니다.
  • Delegated head: DelegateEnqueue → DelegateStreamProgress → BundleRead — backend async-job 을 enqueue 하고 SSE 진행률을 중계한 뒤 raw bundle 을 읽습니다.
  • Shared tail (plugin transform): ConvertData → SaveFiles → Finalize두 head 가 합류하는 동일 step 클래스로, 플러그인 커스텀 transform 이 위임/비위임 양쪽에서 동일하게 실행됩니다(FR-8 parity).

위임 결정은 run() 시점에 _resolve_delegation_decision() 으로 한 번 평가되어 context.delegation_decision 에 seed 됩니다. 게이트는 다음 조건을 모두 만족할 때만 발동합니다: kill-switch 미설정, v2_client 존재, 프로브한 count 가 임계를 엄격 초과(> 1000), project_id 가용. 각 head step 의 can_skip 이 이 seed 된 결정을 참조하므로, 비활성 head 는 완료-pass (skip → 100%, orchestrator 가 보고)되고 활성 head 만 실행됩니다.

def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
# 항상 동일한 고정 9-step registry 를 등록 — 분기 없음.
# _resolve_delegation_decision() 은 run() 시점에 1회 평가되어
# context.delegation_decision 에 seed 되고, 각 head step 의 can_skip()
# 이 이를 참조해 비활성 head 를 완료-pass(skip → 100%) 처리한다.

# In-process acquisition head — 비위임 시 활성
registry.register(InitializeStep()) # 위임 시 can_skip
registry.register(FetchResultsStep()) # v2 list → bulk-fetch
registry.register(PrepareExportStep())

# Delegated acquisition head — 위임 시 활성
registry.register(DelegateEnqueueStep()) # plugin_exports.create
registry.register(DelegateStreamProgressStep()) # async_jobs.stream_progress (SSE)
registry.register(BundleReadStep()) # raw bundle 읽기 → tail

# Shared transform tail — 양쪽 경로 공통 실행 (FR-8 parity)
registry.register(ConvertDataStep()) # plugin 커스텀 transform
registry.register(SaveFilesStep())
registry.register(FinalizeStep())

두 head 는 하나의 공유 transform tail 로 합류하므로, 위임/비위임 export 는 동일한 plugin 포맷 산출물을 만듭니다 — 데이터 취득 경로만 다릅니다. raw bundle 은 delegated head 의 중간 산출물이며, BundleReadStep 이 이를 읽어 공유 tail 로 전달하면 ConvertData/SaveFiles 가 in-process 경로와 동일하게 처리합니다.

알아두기: 단일 고정 registry 는 모드와 무관하게 job 의 step 목록을 안정적으로 유지합니다. 종전의 "분기형 registry" 모델은 경로별로 다른 step 집합을 등록해 긴 acquisition 이 job 전체 진행률을 정체시켰는데, 고정 registry 는 비활성 head 의 step 만 skip(완료-pass)함으로써 이를 제거합니다.

전체 게이트(임계, kill-switch), 위임 raw-bundle 산출물, 관측성 계약은 Export Actions — 대량 export 의 서버사이드 위임 을 참고하세요.

Progress Callback

progress_callback필수 이며, job status 관리를 위해 backend 에 progress 를 리포팅합니다.

# In an action class, use self.set_progress for proper backend reporting
orchestrator = Orchestrator(
registry=registry,
context=context,
progress_callback=lambda curr, total: self.set_progress(curr, total),
)
orchestrator.execute()

# Progress is reported to backend at each step completion:
# 30% (LoadDataStep complete, weight=0.3)
# 50% (CreateFilesStep complete, weight=0.2)
# 100% (ProcessStep complete, weight=0.5)

action 인스턴스가 없는 standalone 워크플로우에서는 context 의 set_progress 를 사용할 수 있습니다:

orchestrator = Orchestrator(
registry=registry,
context=context,
progress_callback=lambda curr, total: context.set_progress(curr, total),
)

Action 에서 Step 사용

BaseTrainActionsetup_steps() 메서드를 통해 step 기반 실행을 지원합니다.

BaseTrainAction 통합

from synapse_sdk.plugins import BaseTrainAction
from synapse_sdk.plugins.actions.train import TrainContext
from synapse_sdk.plugins.steps import StepRegistry


class MyTrainAction(BaseTrainAction[MyTrainParams]):
"""Step-based training action."""

def setup_steps(self, registry: StepRegistry[TrainContext]) -> None:
"""Register workflow steps."""
registry.register(LoadDatasetStep())
registry.register(PreprocessStep())
registry.register(TrainModelStep())
registry.register(UploadModelStep())

# Don't implement execute() - step-based execution is automatic when steps are registered

알아두기: setup_steps() 가 step 을 하나라도 등록하면, execute() 호출 대신 step 기반 실행이 자동으로 활성화됩니다.

커스텀 Context

추가 상태 필드를 더하려면 TrainContext 를 확장하세요.

from dataclasses import dataclass, field
from synapse_sdk.plugins.actions.train import TrainContext


@dataclass
class MyTrainContext(TrainContext):
"""Extended training context."""
augmentation_config: dict = field(default_factory=dict)
validation_results: list = field(default_factory=list)

내장 Step 유틸리티

LoggingStep

step 실행을 타이밍과 함께 자동으로 로깅합니다.

from synapse_sdk.plugins.steps import LoggingStep

# Wrap existing step with logging
logged_step = LoggingStep(ProcessStep())
registry.register(logged_step)

# Log output:
# step_start: {'step': 'process'}
# step_end: {'step': 'process', 'elapsed': 1.234, 'success': True}

LoggingStep 이 로깅하는 이벤트:

이벤트타이밍데이터
step_start실행 전{'step': name}
step_end실행 후{'step': name, 'elapsed': seconds, 'success': bool, 'skipped': bool}
step_rollbackrollback 시{'step': name}

TimingStep

실행 시간을 측정해 step 의 결과 데이터에 추가합니다.

from synapse_sdk.plugins.steps import TimingStep

timed_step = TimingStep(ProcessStep())
registry.register(timed_step)

# After execution, the StepResult.data includes duration_seconds
# Access via context.step_results[-1].data['duration_seconds']

알아두기: TimingStep 은 step 이름에 timed_ 접두사를 붙입니다 (예: processtimed_process 가 됨).

ValidationStep

context 검증 step 을 손쉽게 생성합니다.

from synapse_sdk.plugins.steps import ValidationStep


def check_data_loaded(ctx: MyContext) -> tuple[bool, str | None]:
"""Validate data is loaded."""
if not ctx.data:
return False, 'No data loaded'
return True, None


# Create ValidationStep
validation = ValidationStep(
validator=check_data_loaded,
name='validate_data',
progress_weight=0.05,
)
registry.register(validation)

내장 Dataset Step

SDK 는 공통 데이터셋 작업을 위한 재사용 가능한 step 을 제공합니다. 이들은 target_format 이 설정되면 BaseTrainAction 이 자동으로 사용하지만, 커스텀 워크플로우에서 직접 사용할 수도 있습니다.

ExportDatasetStep

Synapse backend 에서 데이터셋을 다운로드합니다. context.params 에서 dataset 을 읽어 결과를 context.dataset 에 저장합니다.

from synapse_sdk.plugins.steps import ExportDatasetStep

registry.register(ExportDatasetStep())

동작:

  • context.params 에서 dataset, splits, output_dir 을 읽음
  • 데이터셋을 로컬 파일시스템으로 다운로드
  • 결과를 context.datasetpath, format, is_categorized, count 키로 저장

ConvertDatasetStep

데이터셋을 포맷 간 변환합니다. (ExportDatasetStep 이 설정한) context.dataset 에서 읽어 변환된 path 로 갱신합니다.

from synapse_sdk.plugins.steps import ConvertDatasetStep

# Convert to YOLO format
registry.register(ConvertDatasetStep(target_format='yolo'))

# Convert to COCO format
registry.register(ConvertDatasetStep(target_format='coco'))

생성자 인자:

인자타입기본값설명
target_formatstr'yolo'타깃 포맷 (yolo, coco 등)
source_formatstr'dm_v2'소스 포맷

동작:

  • 이전 step 의 context.dataset['path'] 를 읽음
  • 타깃 포맷으로 변환
  • context.datasetpath, config_path, format, is_categorized 로 갱신

커스텀 워크플로우에서 사용

from synapse_sdk.plugins.steps import ExportDatasetStep, ConvertDatasetStep, StepRegistry
from synapse_sdk.plugins.actions.train import TrainContext

def setup_steps(self, registry: StepRegistry[TrainContext]) -> None:
# Use SDK steps for data handling
registry.register(ExportDatasetStep())
registry.register(ConvertDatasetStep(target_format='yolo'))

# Add custom preprocessing
registry.register(MyAugmentationStep())

# Custom training step
registry.register(MyTrainStep())

알아두기: target_format 이 설정된 BaseTrainAction 을 사용하면 이 step 들이 자동으로 등록됩니다. 커스텀 동작이 필요할 때만 setup_steps() 를 오버라이드하세요.

Progress 추적

Weight 기반 Progress

progress_weight 는 각 step 이 전체 progress 에서 차지하는 비중을 나타냅니다.

class LoadStep(BaseStep[MyContext]):
@property
def progress_weight(self) -> float:
return 0.2 # 20%

class TrainStep(BaseStep[MyContext]):
@property
def progress_weight(self) -> float:
return 0.7 # 70%

class SaveStep(BaseStep[MyContext]):
@property
def progress_weight(self) -> float:
return 0.1 # 10%

Orchestrator 는 각 step 완료 후 누적 weight 를 기반으로 progress 를 계산합니다.

Step 내부 Progress

step 내부에서 상세 progress 를 리포팅합니다.

class TrainStep(BaseStep[TrainContext]):
def execute(self, context: TrainContext) -> StepResult:
epochs = context.params.get('epochs', 10)

for epoch in range(epochs):
train_one_epoch()
# Per-step progress (automatically uses 'train' as step)
context.set_progress(epoch + 1, epochs)
context.set_metrics({'loss': 0.1, 'epoch': epoch})

return StepResult(success=True)

에러 처리 & Rollback

자동 Rollback

step 실패 시 Orchestrator 는 이전에 실행된 모든 step 에 대해 역순으로 rollback() 을 호출합니다.

Execution order:  Step1 -> Step2 -> Step3 (failed!)
Rollback order: Step2.rollback() -> Step1.rollback()
class Step1(BaseStep[MyContext]):
def execute(self, context: MyContext) -> StepResult:
context.log('step1', {'action': 'created resource A'})
return StepResult(success=True, rollback_data={'resource': 'A'})

def rollback(self, context: MyContext, result: StepResult) -> None:
resource = result.rollback_data.get('resource')
context.log('rollback', {'action': f'deleted resource {resource}'})


class Step2(BaseStep[MyContext]):
def execute(self, context: MyContext) -> StepResult:
context.log('step2', {'action': 'created resource B'})
return StepResult(success=True, rollback_data={'resource': 'B'})

def rollback(self, context: MyContext, result: StepResult) -> None:
resource = result.rollback_data.get('resource')
context.log('rollback', {'action': f'deleted resource {resource}'})


class Step3(BaseStep[MyContext]):
def execute(self, context: MyContext) -> StepResult:
# Failure!
return StepResult(success=False, error='Something went wrong')

실행 결과:

step1: {'action': 'created resource A'}
step2: {'action': 'created resource B'}
step3: failed
rollback: {'action': 'deleted resource B'} # Step2 rollback
rollback: {'action': 'deleted resource A'} # Step1 rollback
RuntimeError: Step 'step3' failed: Something went wrong

알아두기: rollback 중 발생한 에러는 다른 step 의 rollback 을 막지 않습니다 (best-effort rollback).

예외 처리

execute() 내부의 예외는 자동으로 StepResult(success=False, error=str(e)) 로 변환됩니다.

class RiskyStep(BaseStep[MyContext]):
def execute(self, context: MyContext) -> StepResult:
# Exception automatically converted to failure
raise ValueError('Invalid input')

# Result: StepResult(success=False, error='Invalid input')
# Then rollback proceeds

예제: 다단계 학습

완전한 학습 워크플로우 예제입니다.

from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

from synapse_sdk.plugins import BaseTrainAction
from synapse_sdk.plugins.actions.train import BaseTrainParams, TrainContext
from synapse_sdk.plugins.steps import BaseStep, StepRegistry, StepResult, LoggingStep


class MyTrainParams(BaseTrainParams):
"""Training parameters."""
dataset: int
epochs: int = 100
batch_size: int = 32


# Step definitions
class LoadDatasetStep(BaseStep[TrainContext]):
@property
def name(self) -> str:
return 'load_dataset'

@property
def progress_weight(self) -> float:
return 0.1

def execute(self, context: TrainContext) -> StepResult:
dataset = context.params.get('dataset')
context.log('dataset_load_start', {'dataset': dataset})

# Load dataset
context.dataset = context.client.get_data_collection(dataset)

context.log('dataset_load_complete', {'count': len(context.dataset)})
return StepResult(success=True, data={'dataset_size': len(context.dataset)})


class TrainModelStep(BaseStep[TrainContext]):
@property
def name(self) -> str:
return 'train'

@property
def progress_weight(self) -> float:
return 0.7

def execute(self, context: TrainContext) -> StepResult:
epochs = context.params.get('epochs', 100)

for epoch in range(epochs):
# Train one epoch (implement your own training logic)
loss = 1.0 / (epoch + 1) # Example: decreasing loss
context.set_progress(epoch + 1, epochs)
context.set_metrics({'loss': loss, 'epoch': epoch})

context.model_path = '/tmp/model.pt'
return StepResult(
success=True,
data={'final_loss': loss},
rollback_data={'model_path': context.model_path},
)

def rollback(self, context: TrainContext, result: StepResult) -> None:
# Clean up temporary files created during training
model_path = result.rollback_data.get('model_path')
if model_path:
Path(model_path).unlink(missing_ok=True)


class UploadModelStep(BaseStep[TrainContext]):
@property
def name(self) -> str:
return 'upload_model'

@property
def progress_weight(self) -> float:
return 0.2

def execute(self, context: TrainContext) -> StepResult:
context.set_progress(0, 100)

model = context.client.create_model({
'file': context.model_path,
'name': 'trained-model',
})
context.model = model

context.set_progress(100, 100)
return StepResult(success=True, data={'model_id': model['id']})


# Action definition
class MyTrainAction(BaseTrainAction[MyTrainParams]):
"""Step-based training action."""

def setup_steps(self, registry: StepRegistry[TrainContext]) -> None:
# Wrap all steps with logging
registry.register(LoggingStep(LoadDatasetStep()))
registry.register(LoggingStep(TrainModelStep()))
registry.register(LoggingStep(UploadModelStep()))

모범 사례

Step 설계

  • 단일 책임: 각 step 은 하나의 명확한 작업만 수행해야 합니다
  • 재사용성 고려: 다른 워크플로우에서도 쓸 수 있도록 step 을 설계하세요
  • 적절한 단위: 너무 작거나 너무 큰 step 을 피하세요

Progress Weight

  • 실제 시간 반영: 실제 실행 시간 비율에 맞춰 weight 를 설정하세요
  • 합계 고려: 모든 step weight 의 합이 1.0 에 가깝도록 설계하세요
  • 빠른 step: 검증/초기화처럼 빠른 step 에는 작은 weight (0.05~0.1) 를 사용하세요

Rollback 구현

  • 멱등성 보장: rollback 은 여러 번 호출해도 안전해야 합니다
  • 부분 실패 처리: 실행이 중간에 실패한 경우를 고려하세요
  • 필요 정보 저장: 필요한 모든 정보를 rollback_data 에 포함하세요
# Good: Idempotent rollback
def rollback(self, context: MyContext, result: StepResult) -> None:
path = Path(result.rollback_data.get('file_path', ''))
path.unlink(missing_ok=True) # No error if file doesn't exist

Context 상태 관리

  • 최소 상태: 필요한 상태만 context 에 저장하세요
  • 불변 params: step 에서 params 를 수정하지 마세요
  • 로깅 활용: 중요한 상태 변화를 log() 로 기록하세요

관련 문서