Upload Actions
Upload Action은 단계 기반 오케스트레이션, 진행률 추적, 자동 롤백을 내장한 파일 업로드 워크플로우를 처리합니다.
개요
BaseUploadAction은 다단계 업로드 워크플로우를 위해 설계된 특수 액션 기반 클래스입니다. 업로드 프로세스의 각 단계를 별도의 Step으로 정의하는 단계 기반 아키텍처를 강제합니다.
한눈에 보기
| 기능 | 설명 |
|---|---|
| 단계 기반 | setup_steps()를 오버라이드하여 워크플로우 단계를 등록해야 함 |
| 자동 롤백 | 실패 시 완료된 단계의 rollback()을 역순으로 실행 |
| 진행률 추적 | 가중치 기반으로 전체 단계의 진행률을 추적 |
| 업로드 컨텍스트 | UploadContext가 단계 간 상태를 전달 |
참고: 다른 액션 타입과 달리,
BaseUploadAction은 워크플로우 단계를 정의해야 합니다.execute()를 직접 오버라이드하는 것은 지원되지 않습니다.
BaseUploadAction
# P = TypeVar('P', bound=BaseModel) - 파라미터 모델 타입
class BaseUploadAction(BaseAction[P]):
"""워크플로우 단계를 지원하는 업로드 액션 기반 클래스."""
category = PluginCategory.UPLOAD
@property
def client(self) -> BackendClient:
"""런타임 컨텍스트의 백엔드 클라이언트."""
...
def setup_steps(self, registry: StepRegistry[UploadContext]) -> None:
"""워크플로우 단계를 등록합니다. 이 메서드를 오버라이드하세요."""
pass
def create_context(self) -> UploadContext:
"""워크플로우용 업로드 컨텍스트를 생성합니다."""
...
def execute(self) -> dict[str, Any]:
"""업로드 워크플로우를 실행합니다. 오버라이드하지 마세요."""
...
클래스 속성
| 속성 | 타입 | 설명 |
|---|---|---|
category | PluginCategory | 기본값 PluginCategory.UPLOAD |
인스턴스 프로퍼티
| 프로퍼티 | 타입 | 설명 |
|---|---|---|
client | BackendClient | 런타임 컨텍스트의 백엔드 클라이언트 |
오버라이드 가능한 메서드
| 메서드 | 필수 | 설명 |
|---|---|---|
setup_steps(registry) | 예 | 레지스트리에 워크플로우 단계를 등록 |
create_context() | 아니오 | 업로드 컨텍스트 생성 커스터마이징 |
주의:
execute()를 직접 오버라이드하지 마세요. 오케스트레이터가 내부적으로 호출하여 단계 워크플로우를 실행합니다.
UploadContext
UploadContext는 BaseStepContext를 확장하여 업로드 전용 상태 필드를 추가합니다. 워크플로우가 진행되면서 각 단계가 컨텍스트를 읽고 씁니다.
@dataclass
class UploadContext(BaseStepContext):
"""업로드 워크플로우 단계 간 공유되는 컨텍스트."""
# 업로드 파라미터 (액션 params에서 전달)
params: dict[str, Any] = field(default_factory=dict)
# 처리 상태 (단계에서 채워짐)
storage: Any | None = None
pathlib_cwd: Any | None = None
organized_files: list[dict[str, Any]] = field(default_factory=list)
uploaded_files: list[dict[str, Any]] = field(default_factory=list)
data_units: list[dict[str, Any]] = field(default_factory=list)
필드
| 필드 | 타입 | 채워지는 단계 | 설명 |
|---|---|---|---|
params | dict[str, Any] | Action | 액션 params의 업로드 파라미터 |
storage | Any | None | Init 단계 | 스토리지 설정 |
pathlib_cwd | Any | None | Init 단계 | 작업 디렉토리 경로 |
organized_files | list[dict] | Organize 단계 | 업로드 준비된 파일 목록 |
uploaded_files | list[dict] | Upload 단계 | 업로드 성공한 파일 목록 |
data_units | list[dict] | Generate 단계 | 생성된 데이터 유닛 목록 |
BaseStepContext에서 상속
| 필드 | 타입 | 설명 |
|---|---|---|
runtime_ctx | RuntimeContext | 부모 런타임 컨텍스트 |
step_results | list[StepResult] | 각 실행된 단계의 결과 |
errors | list[str] | 누적된 에러 메시지 |
current_step | str | None | 현재 실행 중인 단계 이름 |
컨텍스트 메서드 (BaseStepContext에서 상속)
| 메서드 | 설명 |
|---|---|
log(event, data, file) | 런타임 컨텍스트를 통해 이벤트를 로깅 |
set_progress(current, total, step) | 진행률 설정 (step 미지정 시 current_step 자동 사용) |
set_metrics(value: dict, step) | 메트릭 설정 (step 미지정 시 current_step 자동 사용) |
UploadContext 프로퍼티
| 프로퍼티 | 타입 | 설명 |
|---|---|---|
client | BackendClient | 런타임 컨텍스트의 백엔드 클라이언트 (없을 경우 RuntimeError 발생) |
DefaultUploadAction
DefaultUploadAction은 BaseUploadAction을 확장하여 사전 구성된 8단계 워크플로우를 제공합니다. 표준 업로드 파이프라인을 커스터마이징 없이 사용할 때 적합합니다.
from synapse_sdk.plugins.actions.upload import DefaultUploadAction, UploadParams
class MyUploadAction(DefaultUploadAction[UploadParams]):
action_name = 'upload'
params_model = UploadParams
# 8개 단계가 자동으로 등록됩니다:
# 1. InitializeStep (5%) - 스토리지 및 경로 설정
# 2. ProcessMetadataStep (10%) - Excel/CSV 메타데이터 로딩
# 3. AnalyzeCollectionStep (5%) - 파일 사양 로딩
# 4. OrganizeFilesStep (15%) - 파일 stem 기반 그룹핑
# 5. ValidateFilesStep (10%) - 사양 대비 검증
# 6. UploadFilesStep (30%) - 중복 제거 포함 파일 업로드
# 7. GenerateDataUnitsStep (20%) - 데이터 유닛 생성
# 8. CleanupStep (5%) - 최종 정리
커스텀 워크플로우가 필요한 경우, BaseUploadAction을 확장하고 setup_steps()를 오버라이드하세요.
업로드 설정
UploadConfig는 청크 업로드, Presigned URL, 중복 탐지를 포함한 파일 업로드 동작을 제어합니다.
@dataclass
class UploadConfig:
chunked_threshold_mb: int = 50 # 청크 업로드 임계값 (MB)
batch_size: int = 1 # 데이터 유닛 생성 배치 크기
max_workers: int = 10 # 동시 업로드 워커 수
use_presigned: bool = True # Presigned URL 업로드 사용
skip_local_duplicates: bool = True # 체크섬 기반 로컬 중복 파일 제거
skip_existing_checksums: bool = False # 서버에 이미 존재하는 파일 스킵
checksum_batch_size: int = 1000 # 서버 체크섬 검증 배치 크기
설정 파라미터
| 파라미터 | 기본값 | 설명 |
|---|---|---|
chunked_threshold_mb | 50 | 이 크기보다 큰 파일은 청크 단위로 업로드 |
batch_size | 1 | API 호출당 생성하는 데이터 유닛 수 |
max_workers | 10 | 동시 업로드 스레드 수 |
use_presigned | True | Presigned URL 업로드 사용 (S3, MinIO, GCS, Azure) |
skip_local_duplicates | True | MD5 체크섬을 계산하여 업로드 전 로컬 중복 파일 제거 |
skip_existing_checksums | False | 서버에 이미 존재하는 체크섬을 조회하여 중복 업로드 방지 |
checksum_batch_size | 1000 | 서버 체크섬 검증 API 호출당 체크섬 수 |
이 값들은 context.params를 통해 오버라이드할 수 있습니다:
# 플러그인의 params 또는 액션 설정에서
params = {
'skip_local_duplicates': True, # 기본값: True
'skip_existing_checksums': True, # 기본값: False
'checksum_batch_size': 500,
'max_file_size_mb': 100,
'upload_batch_size': 10,
'use_presigned': True,
}
중복 탐지
업로드 단계는 두 가지 수준의 중복 탐지를 지원하며, 모두 파일 업로드 이전에 실행됩니다:
로컬 중복 탐지 (skip_local_duplicates)
기본적으로 활성화됩니다. 병렬 워커를 사용하여 모든 파일의 MD5 체크섬을 계산한 후, 동일한 체크섬을 가진 중복 파일을 제거합니다. 첫 번째 파일만 유지되고 이후 중복 파일은 스킵됩니다.
30개 파일 (원본 15개 + 동일 내용의 복제본 15개)
│
├─ 병렬 MD5 계산 (ThreadPoolExecutor, 10 workers)
├─ deduplicate_by_checksum() → set 기반 O(1) 조회
│ └─ 15개 고유, 15개 중복 스킵
│
▼ 15개 파일만 업로드 진행
이를 통해 생성되는 데이터 유닛 수와 joblog 카운트가 정확히 일치하게 됩니다.
서버 측 중복 탐지 (skip_existing_checksums)
기본적으로 비활성화됩니다. 배치 단위로 서버에 이미 존재하는 체크섬을 조회한 후 해당 파일을 필터링합니다. 중단된 업로드를 재개하거나 폴더를 재임포트할 때 유용합니다.
로컬 중복 제거 후 15개 파일
│
├─ 배치 API 호출: verify_data_files_checksums (배치당 1000개)
├─ 서버가 기존 체크섬 반환
├─ filter_existing_checksums() → 이미 업로드된 파일 제거
│
▼ 실제로 새로운 파일만 업로드 진행
참고: 두 옵션이 모두 활성화된 경우, 로컬 중복 제거가 먼저 실행되어 서버로 전송되는 체크섬 수를 줄입니다.
단계 기반 워크플로우
Upload Action은 단계를 통해 워크플로우를 정의해야 합니다. setup_steps()를 오버라이드하여 실행 순서대로 단계를 등록하세요.
단계 생성
from synapse_sdk.plugins.steps import BaseStep, StepResult
from synapse_sdk.plugins.actions.upload import UploadContext
class ValidateFilesStep(BaseStep[UploadContext]):
"""업로드 전 파일 검증."""
@property
def name(self) -> str:
return 'validate'
@property
def progress_weight(self) -> float:
return 0.1 # 전체 진행률의 10%
def execute(self, context: UploadContext) -> StepResult:
files = context.organized_files
if not files:
return StepResult(success=False, error='업로드할 파일이 없습니다')
# 각 파일 검증
for file in files:
if not self._is_valid(file):
return StepResult(success=False, error=f"유효하지 않은 파일: {file['path']}")
return StepResult(success=True, data={'validated_count': len(files)})
def _is_valid(self, file: dict) -> bool:
# 검증 로직
return True
단계 등록
from pydantic import BaseModel
from synapse_sdk.plugins.actions.upload import BaseUploadAction, UploadContext
from synapse_sdk.plugins.steps import StepRegistry
from .steps import InitStep, ValidateFilesStep, UploadFilesStep, CleanupStep
class UploadParams(BaseModel):
storage_id: int
path: str
class MyUploadAction(BaseUploadAction[UploadParams]):
action_name = 'upload'
def setup_steps(self, registry: StepRegistry[UploadContext]) -> None:
registry.register(InitStep())
registry.register(ValidateFilesStep())
registry.register(UploadFilesStep())
registry.register(CleanupStep())
단계 실행 순서
┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
│ Init │───▶│ Validate │───▶│ Upload │───▶│ Cleanup │
└────────────┘ └────────────┘ └────────────┘ └────────────┘
10% 10% 60% 20%
오케스트레이터는 등록 순서대로 단계를 실행합니다:
- Init: 스토리지 연결 및 경로 초기화
- Validate: 업로드 전 파일 검증
- Upload: 스토리지로 파일 전송
- Cleanup: 업로드 후 정리 작업
자동 롤백
단계가 실패하면 오케스트레이터가 이전에 완료된 모든 단계를 역순으로 자동 롤백합니다.
롤백 구현
from synapse_sdk.plugins.steps import BaseStep, StepResult
from synapse_sdk.plugins.actions.upload import UploadContext
class UploadFilesStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'upload'
@property
def progress_weight(self) -> float:
return 0.6
def execute(self, context: UploadContext) -> StepResult:
uploaded = []
for file in context.organized_files:
result = self._upload_file(file, context)
uploaded.append(result)
context.uploaded_files.append(result)
return StepResult(
success=True,
data={'uploaded_count': len(uploaded)},
rollback_data={'uploaded_files': uploaded}, # 롤백을 위해 저장
)
def rollback(self, context: UploadContext, result: StepResult) -> None:
"""실패 시 업로드된 파일 삭제."""
uploaded = result.rollback_data.get('uploaded_files', [])
for file in uploaded:
self._delete_file(file, context)
def _upload_file(self, file: dict, context: UploadContext) -> dict:
# 업로드 로직
return {'path': file['path'], 'storage_id': context.storage}
def _delete_file(self, file: dict, context: UploadContext) -> None:
# 롤백을 위한 삭제 로직
pass
롤백 흐름
┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
│ Init │───▶│ Validate │───▶│ Upload │──X─│ Cleanup │
│ (성공) │ │ (성공) │ │ (성공) │ │ (실패) │
└────────────┘ └────────────┘ └────────────┘ └────────────┘
▲ ▲ ▲
│ │ │
└─────────────────┴─────────────────┘
롤백
(역순: Upload → Validate → Init)
참고: 롤백은 최선 노력(best-effort) 방식입니다. 롤백이 실패하면 에러가 로깅되지만 다른 롤백은 계속 진행됩니다. 항상 롤백 로직을 멱등성(idempotent)으로 설계하세요.
전체 예제
완전한 파일 업로드 플러그인 구현:
from dataclasses import dataclass
from pathlib import Path
from pydantic import BaseModel, Field
from synapse_sdk.plugins.actions.upload import BaseUploadAction, UploadContext
from synapse_sdk.plugins.steps import BaseStep, StepRegistry, StepResult
# 파라미터
class FileUploadParams(BaseModel):
storage_id: int = Field(description='대상 스토리지 ID')
source_path: str = Field(description='로컬 디렉토리 경로')
extensions: list[str] = Field(default=['.jpg', '.png'], description='업로드할 파일 확장자')
# 단계
class InitializeStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'initialize'
@property
def progress_weight(self) -> float:
return 0.1
def execute(self, context: UploadContext) -> StepResult:
storage_id = context.params['storage_id']
storage = context.client.get_storage(storage_id)
context.storage = storage
context.pathlib_cwd = Path(context.params['source_path'])
return StepResult(success=True)
class OrganizeFilesStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'organize'
@property
def progress_weight(self) -> float:
return 0.1
def execute(self, context: UploadContext) -> StepResult:
extensions = context.params.get('extensions', ['.jpg', '.png'])
source_dir = context.pathlib_cwd
files = []
for ext in extensions:
files.extend(source_dir.glob(f'**/*{ext}'))
context.organized_files = [{'path': str(f), 'name': f.name} for f in files]
context.log('files_organized', {'count': len(files)})
return StepResult(success=True, data={'file_count': len(files)})
class UploadFilesStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'upload'
@property
def progress_weight(self) -> float:
return 0.6
def execute(self, context: UploadContext) -> StepResult:
files = context.organized_files
total = len(files)
for i, file in enumerate(files):
# 파일 업로드 (스토리지 타입에 따라 구현)
result = self._upload_to_storage(file, context.storage)
context.uploaded_files.append(result)
# 진행률 업데이트 (현재 단계 이름에서 자동 추론)
context.set_progress(i + 1, total)
return StepResult(
success=True,
data={'uploaded_count': total},
rollback_data={'files': context.uploaded_files},
)
def rollback(self, context: UploadContext, result: StepResult) -> None:
for file in result.rollback_data.get('files', []):
self._delete_from_storage(file, context.storage)
def _upload_to_storage(self, file: dict, storage) -> dict:
# 업로드 구현
return {'path': file['path'], 'uploaded': True}
def _delete_from_storage(self, file: dict, storage) -> None:
# 롤백 구현
pass
class FinalizeStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'finalize'
@property
def progress_weight(self) -> float:
return 0.2
def execute(self, context: UploadContext) -> StepResult:
# 데이터 유닛 생성 또는 업로드 완료 처리
for file in context.uploaded_files:
data_unit = {'file': file['path'], 'status': 'complete'}
context.data_units.append(data_unit)
context.log('upload_complete', {
'uploaded': len(context.uploaded_files),
'data_units': len(context.data_units),
})
return StepResult(success=True)
# 액션
class FileUploadAction(BaseUploadAction[FileUploadParams]):
"""자동 롤백을 지원하는 파일 업로드 액션."""
action_name = 'upload'
def setup_steps(self, registry: StepRegistry[UploadContext]) -> None:
registry.register(InitializeStep())
registry.register(OrganizeFilesStep())
registry.register(UploadFilesStep())
registry.register(FinalizeStep())
모범 사례
단계 설계
- 단계는 단일 책임 유지: 각 단계는 하나의 책임만 가져야 합니다
- 적절한 가중치 설정:
progress_weight는 실제 실행 시간을 반영해야 합니다 - 롤백 구현: 상태를 변경하는 단계는 항상
rollback()을 구현하세요
대용량 파일 처리
def execute(self, context: UploadContext) -> StepResult:
files = context.organized_files
for i, file in enumerate(files):
# 대용량 파일은 청크 업로드 사용
if file['size'] > 100_000_000: # 100MB
self._chunked_upload(file, context)
else:
self._simple_upload(file, context)
context.set_progress(i + 1, len(files))
return StepResult(success=True)
재시도 로직
from tenacity import retry, stop_after_attempt, wait_exponential
class UploadFilesStep(BaseStep[UploadContext]):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def _upload_file(self, file: dict, storage) -> dict:
# 실패 시 자동 재시도로 업로드
return storage.upload(file['path'])
조건부 단계
class ValidateStep(BaseStep[UploadContext]):
def can_skip(self, context: UploadContext) -> bool:
"""skip_validation이 설정되면 검증을 건너뜁니다."""
return context.params.get('skip_validation', False)
def execute(self, context: UploadContext) -> StepResult:
# 검증 로직
return StepResult(success=True)
관련 문서
- Steps & Workflow - 단계 인프라 상세
- Defining Actions - 액션 정의 패턴
- RuntimeContext - 컨텍스트 API 레퍼런스