Steps & Workflow
Step 프레임워크는 복잡한 워크플로우를 재사용 가능한 단위로 분리하고, 자동 progress 추적과 실패 rollback 을 제공합니다.
개요
Step 기반 워크플로우는 다음 이점을 제공합니다:
- 모듈성: 복잡한 작업을 독립적인 step 으로 분할
- Progress 추적:
progress_weight기반 자동 진행률 계산 - 에러 복구: 실패 시 실행된 step 의 자동 rollback
- 재사용성: 서로 다른 워크플로우에서 step 재사용
한눈에 보기
| 컴포넌트 | 역할 | Import Path |
|---|---|---|
BaseStep[C] | 추상 step 클래스 | synapse_sdk.plugins.steps |
StepResult | step 실행 결과 | synapse_sdk.plugins.steps |
BaseStepContext | step 간 공유 상태 | synapse_sdk.plugins.steps |
StepRegistry[C] | step 등록 및 순서 관리 | synapse_sdk.plugins.steps |
Orchestrator[C] | 순차 step 실행 엔진 | synapse_sdk.plugins.steps |
LoggingStep[C] | step 로깅 wrapper | synapse_sdk.plugins.steps |
TimingStep[C] | 시간 측정 wrapper | synapse_sdk.plugins.steps |
ValidationStep[C] | context 검증 step | synapse_sdk.plugins.steps |
ExportDatasetStep | Synapse 에서 데이터셋 다운로드 | synapse_sdk.plugins.steps |
ConvertDatasetStep | 데이터셋 포맷 변환 | synapse_sdk.plugins.steps |
핵심 컴포넌트
BaseStep
BaseStep[C] 는 모든 step 의 추상 base 클래스입니다. 타입 파라미터 C 는 BaseStepContext 를 확장하는 context 타입입니다.
class BaseStep[C: BaseStepContext](ABC):
@property
@abstractmethod
def name(self) -> str:
"""Unique step identifier."""
...
@property
@abstractmethod
def progress_weight(self) -> float:
"""Progress weight (0.0-1.0)."""
...
@abstractmethod
def execute(self, context: C) -> StepResult:
"""Execute the step."""
...
def can_skip(self, context: C) -> bool:
"""Skip condition (default: False)."""
return False
def rollback(self, context: C, result: StepResult) -> None:
"""Rollback logic (default: no-op)."""
pass
필수 구현:
| 프로퍼티/메서드 | 타입 | 설명 |
|---|---|---|
name | str | 고유 step 식별자 |
progress_weight | float | 전체 progress 중 이 step 의 비중 (0.0~1.0) |
execute(context) | StepResult | step 로직 실행 |
선택적 오버라이드:
| 메서드 | 기본값 | 설명 |
|---|---|---|
can_skip(context) | False | 조건부 skip 로직 |
rollback(context, result) | No-op | 실패 시 정리 로직 |
StepResult
StepResult 는 step 실행 결과를 담는 dataclass 입니다.
@dataclass
class StepResult:
success: bool = True
data: dict[str, Any] = field(default_factory=dict)
error: str | None = None
rollback_data: dict[str, Any] = field(default_factory=dict)
skipped: bool = False
timestamp: datetime = field(default_factory=datetime.now)
| 필드 | 타입 | 기본값 | 설명 |
|---|---|---|---|
success | bool | True | step 성공 여부 |
data | dict[str, Any] | {} | 결과 데이터 |
error | str | None | None | 실패 시 에러 메시지 |
rollback_data | dict[str, Any] | {} | rollback 에 필요한 데이터 |
skipped | bool | False | step skip 여부 |
timestamp | datetime | now() | 완료 시각 |
BaseStepContext
BaseStepContext 는 step 간 공유 상태를 위한 base 클래스입니다.
@dataclass
class BaseStepContext:
runtime_ctx: RuntimeContext
step_results: list[StepResult] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
current_step: str | None = field(default=None, init=False)
current_step_order: int | None = field(default=None, init=False)
def log(self, event: str, data: dict[str, Any], file: str | None = None) -> None:
"""Log an event."""
...
def set_progress(self, current: int, total: int, step: str | None = None) -> None:
"""Set progress. Uses current_step as step if not provided."""
...
def set_metrics(self, value: dict[str, Any], step: str | None = None) -> None:
"""Set metrics. Uses current_step as step if not provided."""
...
알아두기: step 내부에서
step을 지정하지 않고set_progress()또는set_metrics()를 호출하면, 현재 step 의name이 자동으로 step 으로 사용됩니다.
StepRegistry
StepRegistry[C] 는 step 등록과 실행 순서를 관리합니다.
class StepRegistry[C: BaseStepContext]:
def register(self, step: BaseStep[C]) -> None:
"""Add step to the end."""
def unregister(self, name: str) -> None:
"""Remove step by name."""
def get_steps(self) -> list[BaseStep[C]]:
"""Get registered step list."""
def insert_after(self, after_name: str, step: BaseStep[C]) -> None:
"""Insert step after another step."""
def insert_before(self, before_name: str, step: BaseStep[C]) -> None:
"""Insert step before another step."""
def get_step_orders(self) -> dict[str, int]:
"""Get 0-based order mapping for all steps."""
@property
def total_weight(self) -> float:
"""Sum of all step progress_weights."""
Orchestrator
Orchestrator[C] 는 등록된 step 을 순차 실행하고 실패 시 rollback 을 처리합니다.
progress_callback 파라미터는 올바른 job status 관리를 위해 필수 입니다. 이것이 없으면 backend 가 progress 업데이트를 받지 못하고, 완료 시 job status 가 'success' 로 전이되지 않습니다.
class Orchestrator[C: BaseStepContext]:
def __init__(
self,
registry: StepRegistry[C],
context: C,
progress_callback: Callable[[int, int], None], # Required!
) -> None:
"""Initialize orchestrator.
Args:
registry: StepRegistry with steps to execute.
context: Shared context for steps.
progress_callback: Callback(current, total) for progress updates.
Required for proper job status management.
Typically: lambda curr, total: self.set_progress(curr, total)
"""
def execute(self) -> dict[str, Any]:
"""Execute all steps.
Returns:
Dict with success status and execution info.
Raises:
RuntimeError: On step failure (after rollback).
"""
Step 생성
기본 Step
BaseStep 을 서브클래싱해 커스텀 step 을 정의합니다.
from dataclasses import dataclass, field
from synapse_sdk.plugins.steps import BaseStep, BaseStepContext, StepResult
@dataclass
class MyContext(BaseStepContext):
"""Custom context."""
data: list[str] = field(default_factory=list)
processed_count: int = 0
class LoadDataStep(BaseStep[MyContext]):
"""Data loading step."""
@property
def name(self) -> str:
return 'load_data'
@property
def progress_weight(self) -> float:
return 0.3 # 30% of total progress
def execute(self, context: MyContext) -> StepResult:
# Update progress (automatically uses 'load_data' as step)
context.set_progress(0, 100)
# Load data logic
context.data = ['item1', 'item2', 'item3']
context.set_progress(100, 100)
return StepResult(success=True, data={'count': len(context.data)})
Rollback 이 있는 Step
실패 시 정리가 필요한 step 은 rollback() 을 구현하세요.
from pathlib import Path
from synapse_sdk.plugins.steps import BaseStep, StepResult
class CreateFilesStep(BaseStep[MyContext]):
"""File creation step (with rollback support)."""
@property
def name(self) -> str:
return 'create_files'
@property
def progress_weight(self) -> float:
return 0.2
def execute(self, context: MyContext) -> StepResult:
created_files = []
for item in context.data:
path = Path(f'/tmp/{item}.txt')
path.write_text(item)
created_files.append(str(path))
# Store info needed for rollback
return StepResult(
success=True,
data={'files': created_files},
rollback_data={'created_files': created_files},
)
def rollback(self, context: MyContext, result: StepResult) -> None:
"""Delete created files."""
for file_path in result.rollback_data.get('created_files', []):
Path(file_path).unlink(missing_ok=True)
context.log('rollback', {'step': self.name, 'files_removed': len(result.rollback_data.get('created_files', []))})
조건부 Skip
조건부 step 실행은 can_skip() 을 구현하세요.
class CacheStep(BaseStep[MyContext]):
"""Cache loading step (skip if cache exists)."""
@property
def name(self) -> str:
return 'load_cache'
@property
def progress_weight(self) -> float:
return 0.1
def can_skip(self, context: MyContext) -> bool:
# Skip if data already loaded
return len(context.data) > 0
def execute(self, context: MyContext) -> StepResult:
# Cache loading logic (implement your own cache loading)
context.data = ['cached_item1', 'cached_item2']
return StepResult(success=True)
워크플로우 구축
기본 워크플로우
from synapse_sdk.plugins.steps import Orchestrator, StepRegistry
# 1. Create context
context = MyContext(runtime_ctx=runtime_ctx)
# 2. Register steps
registry = StepRegistry[MyContext]()
registry.register(LoadDataStep())
registry.register(CreateFilesStep())
registry.register(ProcessStep())
# 3. Execute with orchestrator (progress_callback is required!)
orchestrator = Orchestrator(
registry=registry,
context=context,
progress_callback=lambda curr, total: context.set_progress(curr, total),
)
result = orchestrator.execute()
print(result)
# {'success': True, 'steps_executed': 3, 'steps_total': 3}
동적 Step 삽입
기존 워크플로우에 step 을 동적으로 삽입합니다.
registry = StepRegistry[MyContext]()
registry.register(LoadDataStep())
registry.register(ProcessStep())
registry.register(SaveStep())
# Insert validation step before 'process'
registry.insert_before('process', ValidateStep())
# Insert audit step after 'process'
registry.insert_after('process', AuditStep())
# Execution order: load_data -> validate -> process -> audit -> save
기존 step 을 로깅으로 감싸려면 처음부터 LoggingStep 으로 등록하세요:
registry = StepRegistry[MyContext]()
registry.register(LoggingStep(LoadDataStep()))
registry.register(LoggingStep(ProcessStep()))
registry.register(LoggingStep(SaveStep()))
# All steps are wrapped with logging from registration
알아두기:
LoggingStep은 step 이름에logged_접두사를 붙입니다 (예:load_data는logged_load_data가 됨). wrapping 된 step 으로 동적 삽입을 할 때는 접두사가 붙은 이름을 참조하세요:registry.insert_before('logged_process', ValidateStep()).
Fixed Step Registry (export 위임)
setup_steps() 는 위임 여부로 분기하지 않습니다 — 항상 단일 고정
9-step registry 를 등록합니다. 9-step 은 두 acquisition head + 하나의 공유
transform tail 로 구성됩니다:
- In-process head:
Initialize → FetchResults → PrepareExport— v2 list → bulk-fetch 경로로 데이터를 취득합니다. - Delegated head:
DelegateEnqueue → DelegateStreamProgress → BundleRead— backend async-job 을 enqueue 하고 SSE 진행률을 중계한 뒤 raw bundle 을 읽습니다. - Shared tail (plugin transform):
ConvertData → SaveFiles → Finalize— 두 head 가 합류하는 동일 step 클래스로, 플러그인 커스텀 transform 이 위임/비위임 양쪽에서 동일하게 실행됩니다(FR-8 parity).
위임 결정은 run() 시점에 _resolve_delegation_decision() 으로 한 번
평가되어 context.delegation_decision 에 seed 됩니다. 게이트는 다음 조건을
모두 만족할 때만 발동합니다: kill-switch 미설정, v2_client 존재, 프로브한
count 가 임계를 엄격 초과(> 1000), project_id 가용. 각 head step 의
can_skip 이 이 seed 된 결정을 참조하므로, 비활성 head 는 완료-pass
(skip → 100%, orchestrator 가 보고)되고 활성 head 만 실행됩니다.
def setup_steps(self, registry: StepRegistry[ExportContext]) -> None:
# 항상 동일한 고정 9-step registry 를 등록 — 분기 없음.
# _resolve_delegation_decision() 은 run() 시점에 1회 평가되어
# context.delegation_decision 에 seed 되고, 각 head step 의 can_skip()
# 이 이를 참조해 비활성 head 를 완료-pass(skip → 100%) 처리한다.
# In-process acquisition head — 비위임 시 활성
registry.register(InitializeStep()) # 위임 시 can_skip
registry.register(FetchResultsStep()) # v2 list → bulk-fetch
registry.register(PrepareExportStep())
# Delegated acquisition head — 위임 시 활성
registry.register(DelegateEnqueueStep()) # plugin_exports.create
registry.register(DelegateStreamProgressStep()) # async_jobs.stream_progress (SSE)
registry.register(BundleReadStep()) # raw bundle 읽기 → tail
# Shared transform tail — 양쪽 경로 공통 실행 (FR-8 parity)
registry.register(ConvertDataStep()) # plugin 커스텀 transform
registry.register(SaveFilesStep())
registry.register(FinalizeStep())
두 head 는 하나의 공유 transform tail 로 합류하므로, 위임/비위임 export 는
동일한 plugin 포맷 산출물을 만듭니다 — 데이터 취득 경로만 다릅니다. raw
bundle 은 delegated head 의 중간 산출물이며, BundleReadStep 이 이를 읽어 공유
tail 로 전달하면 ConvertData/SaveFiles 가 in-process 경로와 동일하게
처리합니다.
알아두기: 단일 고정 registry 는 모드와 무관하게 job 의 step 목록을 안정적으로 유지합니다. 종전의 "분기형 registry" 모델은 경로별로 다른 step 집합을 등록해 긴 acquisition 이 job 전체 진행률을 정체시켰는데, 고정 registry 는 비활성 head 의 step 만 skip(완료-pass)함으로써 이를 제거합니다.
전체 게이트(임계, kill-switch), 위임 raw-bundle 산출물, 관측성 계약은 Export Actions — 대량 export 의 서버사이드 위임 을 참고하세요.
Progress Callback
progress_callback 은 필수 이며, job status 관리를 위해 backend 에 progress 를 리포팅합니다.
# In an action class, use self.set_progress for proper backend reporting
orchestrator = Orchestrator(
registry=registry,
context=context,
progress_callback=lambda curr, total: self.set_progress(curr, total),
)
orchestrator.execute()
# Progress is reported to backend at each step completion:
# 30% (LoadDataStep complete, weight=0.3)
# 50% (CreateFilesStep complete, weight=0.2)
# 100% (ProcessStep complete, weight=0.5)
action 인스턴스가 없는 standalone 워크플로우에서는 context 의 set_progress 를 사용할 수 있습니다:
orchestrator = Orchestrator(
registry=registry,
context=context,
progress_callback=lambda curr, total: context.set_progress(curr, total),
)
Action 에서 Step 사용
BaseTrainAction 은 setup_steps() 메서드를 통해 step 기반 실행을 지원합니다.
BaseTrainAction 통합
from synapse_sdk.plugins import BaseTrainAction
from synapse_sdk.plugins.actions.train import TrainContext
from synapse_sdk.plugins.steps import StepRegistry
class MyTrainAction(BaseTrainAction[MyTrainParams]):
"""Step-based training action."""
def setup_steps(self, registry: StepRegistry[TrainContext]) -> None:
"""Register workflow steps."""
registry.register(LoadDatasetStep())
registry.register(PreprocessStep())
registry.register(TrainModelStep())
registry.register(UploadModelStep())
# Don't implement execute() - step-based execution is automatic when steps are registered
알아두기:
setup_steps()가 step 을 하나라도 등록하면,execute()호출 대신 step 기반 실행이 자동으로 활성화됩니다.
커스텀 Context
추가 상태 필드를 더하려면 TrainContext 를 확장하세요.
from dataclasses import dataclass, field
from synapse_sdk.plugins.actions.train import TrainContext
@dataclass
class MyTrainContext(TrainContext):
"""Extended training context."""
augmentation_config: dict = field(default_factory=dict)
validation_results: list = field(default_factory=list)
내장 Step 유틸리티
LoggingStep
step 실행을 타이밍과 함께 자동으로 로깅합니다.
from synapse_sdk.plugins.steps import LoggingStep
# Wrap existing step with logging
logged_step = LoggingStep(ProcessStep())
registry.register(logged_step)
# Log output:
# step_start: {'step': 'process'}
# step_end: {'step': 'process', 'elapsed': 1.234, 'success': True}
LoggingStep 이 로깅하는 이벤트:
| 이벤트 | 타이밍 | 데이터 |
|---|---|---|
step_start | 실행 전 | {'step': name} |
step_end | 실행 후 | {'step': name, 'elapsed': seconds, 'success': bool, 'skipped': bool} |
step_rollback | rollback 시 | {'step': name} |
TimingStep
실행 시간을 측정해 step 의 결과 데이터에 추가합니다.
from synapse_sdk.plugins.steps import TimingStep
timed_step = TimingStep(ProcessStep())
registry.register(timed_step)
# After execution, the StepResult.data includes duration_seconds
# Access via context.step_results[-1].data['duration_seconds']
알아두기:
TimingStep은 step 이름에timed_접두사를 붙입니다 (예:process는timed_process가 됨).
ValidationStep
context 검증 step 을 손쉽게 생성합니다.
from synapse_sdk.plugins.steps import ValidationStep
def check_data_loaded(ctx: MyContext) -> tuple[bool, str | None]:
"""Validate data is loaded."""
if not ctx.data:
return False, 'No data loaded'
return True, None
# Create ValidationStep
validation = ValidationStep(
validator=check_data_loaded,
name='validate_data',
progress_weight=0.05,
)
registry.register(validation)
내장 Dataset Step
SDK 는 공통 데이터셋 작업을 위한 재사용 가능한 step 을 제공합니다. 이들은 target_format 이 설정되면 BaseTrainAction 이 자동으로 사용하지만, 커스텀 워크플로우에서 직접 사용할 수도 있습니다.
ExportDatasetStep
Synapse backend 에서 데이터셋을 다운로드합니다. context.params 에서 dataset 을 읽어 결과를 context.dataset 에 저장합니다.
from synapse_sdk.plugins.steps import ExportDatasetStep
registry.register(ExportDatasetStep())
동작:
context.params에서dataset,splits,output_dir을 읽음- 데이터셋을 로컬 파일시스템으로 다운로드
- 결과를
context.dataset에path,format,is_categorized,count키로 저장
ConvertDatasetStep
데이터셋을 포맷 간 변환합니다. (ExportDatasetStep 이 설정한) context.dataset 에서 읽어 변환된 path 로 갱신합니다.
from synapse_sdk.plugins.steps import ConvertDatasetStep
# Convert to YOLO format
registry.register(ConvertDatasetStep(target_format='yolo'))
# Convert to COCO format
registry.register(ConvertDatasetStep(target_format='coco'))
생성자 인자:
| 인자 | 타입 | 기본값 | 설명 |
|---|---|---|---|
target_format | str | 'yolo' | 타깃 포맷 (yolo, coco 등) |
source_format | str | 'dm_v2' | 소스 포맷 |
동작:
- 이전 step 의
context.dataset['path']를 읽음 - 타깃 포맷으로 변환
context.dataset을path,config_path,format,is_categorized로 갱신
커스텀 워크플로우에서 사용
from synapse_sdk.plugins.steps import ExportDatasetStep, ConvertDatasetStep, StepRegistry
from synapse_sdk.plugins.actions.train import TrainContext
def setup_steps(self, registry: StepRegistry[TrainContext]) -> None:
# Use SDK steps for data handling
registry.register(ExportDatasetStep())
registry.register(ConvertDatasetStep(target_format='yolo'))
# Add custom preprocessing
registry.register(MyAugmentationStep())
# Custom training step
registry.register(MyTrainStep())
알아두기:
target_format이 설정된BaseTrainAction을 사용하면 이 step 들이 자동으로 등록됩니다. 커스텀 동작이 필요할 때만setup_steps()를 오버라이드하세요.
Progress 추적
Weight 기반 Progress
progress_weight 는 각 step 이 전체 progress 에서 차지하는 비중을 나타냅니다.
class LoadStep(BaseStep[MyContext]):
@property
def progress_weight(self) -> float:
return 0.2 # 20%
class TrainStep(BaseStep[MyContext]):
@property
def progress_weight(self) -> float:
return 0.7 # 70%
class SaveStep(BaseStep[MyContext]):
@property
def progress_weight(self) -> float:
return 0.1 # 10%
Orchestrator 는 각 step 완료 후 누적 weight 를 기반으로 progress 를 계산합니다.
Step 내부 Progress
step 내부에서 상세 progress 를 리포팅합니다.
class TrainStep(BaseStep[TrainContext]):
def execute(self, context: TrainContext) -> StepResult:
epochs = context.params.get('epochs', 10)
for epoch in range(epochs):
train_one_epoch()
# Per-step progress (automatically uses 'train' as step)
context.set_progress(epoch + 1, epochs)
context.set_metrics({'loss': 0.1, 'epoch': epoch})
return StepResult(success=True)
에러 처리 & Rollback
자동 Rollback
step 실패 시 Orchestrator 는 이전에 실행된 모든 step 에 대해 역순으로 rollback() 을 호출합니다.
Execution order: Step1 -> Step2 -> Step3 (failed!)
Rollback order: Step2.rollback() -> Step1.rollback()
class Step1(BaseStep[MyContext]):
def execute(self, context: MyContext) -> StepResult:
context.log('step1', {'action': 'created resource A'})
return StepResult(success=True, rollback_data={'resource': 'A'})
def rollback(self, context: MyContext, result: StepResult) -> None:
resource = result.rollback_data.get('resource')
context.log('rollback', {'action': f'deleted resource {resource}'})
class Step2(BaseStep[MyContext]):
def execute(self, context: MyContext) -> StepResult:
context.log('step2', {'action': 'created resource B'})
return StepResult(success=True, rollback_data={'resource': 'B'})
def rollback(self, context: MyContext, result: StepResult) -> None:
resource = result.rollback_data.get('resource')
context.log('rollback', {'action': f'deleted resource {resource}'})
class Step3(BaseStep[MyContext]):
def execute(self, context: MyContext) -> StepResult:
# Failure!
return StepResult(success=False, error='Something went wrong')
실행 결과:
step1: {'action': 'created resource A'}
step2: {'action': 'created resource B'}
step3: failed
rollback: {'action': 'deleted resource B'} # Step2 rollback
rollback: {'action': 'deleted resource A'} # Step1 rollback
RuntimeError: Step 'step3' failed: Something went wrong
알아두기: rollback 중 발생한 에러는 다른 step 의 rollback 을 막지 않습니다 (best-effort rollback).
예외 처리
execute() 내부의 예외는 자동으로 StepResult(success=False, error=str(e)) 로 변환됩니다.
class RiskyStep(BaseStep[MyContext]):
def execute(self, context: MyContext) -> StepResult:
# Exception automatically converted to failure
raise ValueError('Invalid input')
# Result: StepResult(success=False, error='Invalid input')
# Then rollback proceeds
예제: 다단계 학습
완전한 학습 워크플로우 예제입니다.
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from synapse_sdk.plugins import BaseTrainAction
from synapse_sdk.plugins.actions.train import BaseTrainParams, TrainContext
from synapse_sdk.plugins.steps import BaseStep, StepRegistry, StepResult, LoggingStep
class MyTrainParams(BaseTrainParams):
"""Training parameters."""
dataset: int
epochs: int = 100
batch_size: int = 32
# Step definitions
class LoadDatasetStep(BaseStep[TrainContext]):
@property
def name(self) -> str:
return 'load_dataset'
@property
def progress_weight(self) -> float:
return 0.1
def execute(self, context: TrainContext) -> StepResult:
dataset = context.params.get('dataset')
context.log('dataset_load_start', {'dataset': dataset})
# Load dataset
context.dataset = context.client.get_data_collection(dataset)
context.log('dataset_load_complete', {'count': len(context.dataset)})
return StepResult(success=True, data={'dataset_size': len(context.dataset)})
class TrainModelStep(BaseStep[TrainContext]):
@property
def name(self) -> str:
return 'train'
@property
def progress_weight(self) -> float:
return 0.7
def execute(self, context: TrainContext) -> StepResult:
epochs = context.params.get('epochs', 100)
for epoch in range(epochs):
# Train one epoch (implement your own training logic)
loss = 1.0 / (epoch + 1) # Example: decreasing loss
context.set_progress(epoch + 1, epochs)
context.set_metrics({'loss': loss, 'epoch': epoch})
context.model_path = '/tmp/model.pt'
return StepResult(
success=True,
data={'final_loss': loss},
rollback_data={'model_path': context.model_path},
)
def rollback(self, context: TrainContext, result: StepResult) -> None:
# Clean up temporary files created during training
model_path = result.rollback_data.get('model_path')
if model_path:
Path(model_path).unlink(missing_ok=True)
class UploadModelStep(BaseStep[TrainContext]):
@property
def name(self) -> str:
return 'upload_model'
@property
def progress_weight(self) -> float:
return 0.2
def execute(self, context: TrainContext) -> StepResult:
context.set_progress(0, 100)
model = context.client.create_model({
'file': context.model_path,
'name': 'trained-model',
})
context.model = model
context.set_progress(100, 100)
return StepResult(success=True, data={'model_id': model['id']})
# Action definition
class MyTrainAction(BaseTrainAction[MyTrainParams]):
"""Step-based training action."""
def setup_steps(self, registry: StepRegistry[TrainContext]) -> None:
# Wrap all steps with logging
registry.register(LoggingStep(LoadDatasetStep()))
registry.register(LoggingStep(TrainModelStep()))
registry.register(LoggingStep(UploadModelStep()))
모범 사례
Step 설계
- 단일 책임: 각 step 은 하나의 명확한 작업만 수행해야 합니다
- 재사용성 고려: 다른 워크플로우에서도 쓸 수 있도록 step 을 설계하세요
- 적절한 단위: 너무 작거나 너무 큰 step 을 피하세요
Progress Weight
- 실제 시간 반영: 실제 실행 시간 비율에 맞춰 weight 를 설정하세요
- 합계 고려: 모든 step weight 의 합이 1.0 에 가깝도록 설계하세요
- 빠른 step: 검증/초기화처럼 빠른 step 에는 작은 weight (0.05~0.1) 를 사용하세요
Rollback 구현
- 멱등성 보장: rollback 은 여러 번 호출해도 안전해야 합니다
- 부분 실패 처리: 실행이 중간에 실패한 경우를 고려하세요
- 필요 정보 저장: 필요한 모든 정보를
rollback_data에 포함하세요
# Good: Idempotent rollback
def rollback(self, context: MyContext, result: StepResult) -> None:
path = Path(result.rollback_data.get('file_path', ''))
path.unlink(missing_ok=True) # No error if file doesn't exist
Context 상태 관리
- 최소 상태: 필요한 상태만 context 에 저장하세요
- 불변 params: step 에서
params를 수정하지 마세요 - 로깅 활용: 중요한 상태 변화를
log()로 기록하세요
관련 문서
- Defining Actions - Action 정의 방법
- RuntimeContext - RuntimeContext API
- Pipelines - 파이프라인 설정