본문으로 건너뛰기

Upload Actions

Upload Action은 단계 기반 오케스트레이션, 진행률 추적, 자동 롤백을 내장한 파일 업로드 워크플로우를 처리합니다.

개요

BaseUploadAction은 다단계 업로드 워크플로우를 위해 설계된 특수 액션 기반 클래스입니다. 업로드 프로세스의 각 단계를 별도의 Step으로 정의하는 단계 기반 아키텍처를 강제합니다.

한눈에 보기

기능설명
단계 기반setup_steps()를 오버라이드하여 워크플로우 단계를 등록해야 함
자동 롤백실패 시 완료된 단계의 rollback()을 역순으로 실행
진행률 추적가중치 기반으로 전체 단계의 진행률을 추적
업로드 컨텍스트UploadContext가 단계 간 상태를 전달

참고: 다른 액션 타입과 달리, BaseUploadAction은 워크플로우 단계를 정의해야 합니다. execute()를 직접 오버라이드하는 것은 지원되지 않습니다.

BaseUploadAction

# P = TypeVar('P', bound=BaseModel) - 파라미터 모델 타입
class BaseUploadAction(BaseAction[P]):
"""워크플로우 단계를 지원하는 업로드 액션 기반 클래스."""

category = PluginCategory.UPLOAD

@property
def client(self) -> BackendClient:
"""런타임 컨텍스트의 백엔드 클라이언트."""
...

def setup_steps(self, registry: StepRegistry[UploadContext]) -> None:
"""워크플로우 단계를 등록합니다. 이 메서드를 오버라이드하세요."""
pass

def create_context(self) -> UploadContext:
"""워크플로우용 업로드 컨텍스트를 생성합니다."""
...

def execute(self) -> dict[str, Any]:
"""업로드 워크플로우를 실행합니다. 오버라이드하지 마세요."""
...

클래스 속성

속성타입설명
categoryPluginCategory기본값 PluginCategory.UPLOAD

인스턴스 프로퍼티

프로퍼티타입설명
clientBackendClient런타임 컨텍스트의 백엔드 클라이언트

오버라이드 가능한 메서드

메서드필수설명
setup_steps(registry)레지스트리에 워크플로우 단계를 등록
create_context()아니오업로드 컨텍스트 생성 커스터마이징

주의: execute()를 직접 오버라이드하지 마세요. 오케스트레이터가 내부적으로 호출하여 단계 워크플로우를 실행합니다.

UploadContext

UploadContextBaseStepContext를 확장하여 업로드 전용 상태 필드를 추가합니다. 워크플로우가 진행되면서 각 단계가 컨텍스트를 읽고 씁니다.

@dataclass
class UploadContext(BaseStepContext):
"""업로드 워크플로우 단계 간 공유되는 컨텍스트."""

# 업로드 파라미터 (액션 params에서 전달)
params: dict[str, Any] = field(default_factory=dict)

# 처리 상태 (단계에서 채워짐)
storage: Any | None = None
pathlib_cwd: Any | None = None
organized_files: list[dict[str, Any]] = field(default_factory=list)
uploaded_files: list[dict[str, Any]] = field(default_factory=list)
data_units: list[dict[str, Any]] = field(default_factory=list)

필드

필드타입채워지는 단계설명
paramsdict[str, Any]Action액션 params의 업로드 파라미터
storageAny | NoneInit 단계스토리지 설정
pathlib_cwdAny | NoneInit 단계작업 디렉토리 경로
organized_fileslist[dict]Organize 단계업로드 준비된 파일 목록
uploaded_fileslist[dict]Upload 단계업로드 성공한 파일 목록
data_unitslist[dict]Generate 단계생성된 데이터 유닛 목록

BaseStepContext에서 상속

필드타입설명
runtime_ctxRuntimeContext부모 런타임 컨텍스트
step_resultslist[StepResult]각 실행된 단계의 결과
errorslist[str]누적된 에러 메시지
current_stepstr | None현재 실행 중인 단계 이름

컨텍스트 메서드 (BaseStepContext에서 상속)

메서드설명
log(event, data, file)런타임 컨텍스트를 통해 이벤트를 로깅
set_progress(current, total, step)진행률 설정 (step 미지정 시 current_step 자동 사용)
set_metrics(value: dict, step)메트릭 설정 (step 미지정 시 current_step 자동 사용)

UploadContext 프로퍼티

프로퍼티타입설명
clientBackendClient런타임 컨텍스트의 백엔드 클라이언트 (없을 경우 RuntimeError 발생)

DefaultUploadAction

DefaultUploadActionBaseUploadAction을 확장하여 사전 구성된 8단계 워크플로우를 제공합니다. 표준 업로드 파이프라인을 커스터마이징 없이 사용할 때 적합합니다.

from synapse_sdk.plugins.actions.upload import DefaultUploadAction, UploadParams

class MyUploadAction(DefaultUploadAction[UploadParams]):
action_name = 'upload'
params_model = UploadParams

# 8개 단계가 자동으로 등록됩니다:
# 1. InitializeStep (5%) - 스토리지 및 경로 설정
# 2. ProcessMetadataStep (10%) - Excel/CSV 메타데이터 로딩
# 3. AnalyzeCollectionStep (5%) - 파일 사양 로딩
# 4. OrganizeFilesStep (15%) - 파일 stem 기반 그룹핑
# 5. ValidateFilesStep (10%) - 사양 대비 검증
# 6. UploadFilesStep (30%) - 중복 제거 포함 파일 업로드
# 7. GenerateDataUnitsStep (20%) - 데이터 유닛 생성
# 8. CleanupStep (5%) - 최종 정리

커스텀 워크플로우가 필요한 경우, BaseUploadAction을 확장하고 setup_steps()를 오버라이드하세요.

업로드 설정

UploadConfig는 청크 업로드, Presigned URL, 중복 탐지를 포함한 파일 업로드 동작을 제어합니다.

@dataclass
class UploadConfig:
chunked_threshold_mb: int = 50 # 청크 업로드 임계값 (MB)
batch_size: int = 1 # 데이터 유닛 생성 배치 크기
max_workers: int = 10 # 동시 업로드 워커 수
use_presigned: bool = True # Presigned URL 업로드 사용
skip_local_duplicates: bool = True # 체크섬 기반 로컬 중복 파일 제거
skip_existing_checksums: bool = False # 서버에 이미 존재하는 파일 스킵
checksum_batch_size: int = 1000 # 서버 체크섬 검증 배치 크기

설정 파라미터

파라미터기본값설명
chunked_threshold_mb50이 크기보다 큰 파일은 청크 단위로 업로드
batch_size1API 호출당 생성하는 데이터 유닛 수
max_workers10동시 업로드 스레드 수
use_presignedTruePresigned URL 업로드 사용 (S3, MinIO, GCS, Azure)
skip_local_duplicatesTrueMD5 체크섬을 계산하여 업로드 전 로컬 중복 파일 제거
skip_existing_checksumsFalse서버에 이미 존재하는 체크섬을 조회하여 중복 업로드 방지
checksum_batch_size1000서버 체크섬 검증 API 호출당 체크섬 수

이 값들은 context.params를 통해 오버라이드할 수 있습니다:

# 플러그인의 params 또는 액션 설정에서
params = {
'skip_local_duplicates': True, # 기본값: True
'skip_existing_checksums': True, # 기본값: False
'checksum_batch_size': 500,
'max_file_size_mb': 100,
'upload_batch_size': 10,
'use_presigned': True,
}

중복 탐지

업로드 단계는 두 가지 수준의 중복 탐지를 지원하며, 모두 파일 업로드 이전에 실행됩니다:

로컬 중복 탐지 (skip_local_duplicates)

기본적으로 활성화됩니다. 병렬 워커를 사용하여 모든 파일의 MD5 체크섬을 계산한 후, 동일한 체크섬을 가진 중복 파일을 제거합니다. 첫 번째 파일만 유지되고 이후 중복 파일은 스킵됩니다.

30개 파일 (원본 15개 + 동일 내용의 복제본 15개)

├─ 병렬 MD5 계산 (ThreadPoolExecutor, 10 workers)
├─ deduplicate_by_checksum() → set 기반 O(1) 조회
│ └─ 15개 고유, 15개 중복 스킵

▼ 15개 파일만 업로드 진행

이를 통해 생성되는 데이터 유닛 수와 joblog 카운트가 정확히 일치하게 됩니다.

서버 측 중복 탐지 (skip_existing_checksums)

기본적으로 비활성화됩니다. 배치 단위로 서버에 이미 존재하는 체크섬을 조회한 후 해당 파일을 필터링합니다. 중단된 업로드를 재개하거나 폴더를 재임포트할 때 유용합니다.

로컬 중복 제거 후 15개 파일

├─ 배치 API 호출: verify_data_files_checksums (배치당 1000개)
├─ 서버가 기존 체크섬 반환
├─ filter_existing_checksums() → 이미 업로드된 파일 제거

▼ 실제로 새로운 파일만 업로드 진행

참고: 두 옵션이 모두 활성화된 경우, 로컬 중복 제거가 먼저 실행되어 서버로 전송되는 체크섬 수를 줄입니다.

단계 기반 워크플로우

Upload Action은 단계를 통해 워크플로우를 정의해야 합니다. setup_steps()를 오버라이드하여 실행 순서대로 단계를 등록하세요.

단계 생성

from synapse_sdk.plugins.steps import BaseStep, StepResult
from synapse_sdk.plugins.actions.upload import UploadContext

class ValidateFilesStep(BaseStep[UploadContext]):
"""업로드 전 파일 검증."""

@property
def name(self) -> str:
return 'validate'

@property
def progress_weight(self) -> float:
return 0.1 # 전체 진행률의 10%

def execute(self, context: UploadContext) -> StepResult:
files = context.organized_files
if not files:
return StepResult(success=False, error='업로드할 파일이 없습니다')

# 각 파일 검증
for file in files:
if not self._is_valid(file):
return StepResult(success=False, error=f"유효하지 않은 파일: {file['path']}")

return StepResult(success=True, data={'validated_count': len(files)})

def _is_valid(self, file: dict) -> bool:
# 검증 로직
return True

단계 등록

from pydantic import BaseModel
from synapse_sdk.plugins.actions.upload import BaseUploadAction, UploadContext
from synapse_sdk.plugins.steps import StepRegistry

from .steps import InitStep, ValidateFilesStep, UploadFilesStep, CleanupStep

class UploadParams(BaseModel):
storage_id: int
path: str

class MyUploadAction(BaseUploadAction[UploadParams]):
action_name = 'upload'

def setup_steps(self, registry: StepRegistry[UploadContext]) -> None:
registry.register(InitStep())
registry.register(ValidateFilesStep())
registry.register(UploadFilesStep())
registry.register(CleanupStep())

단계 실행 순서

┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
│ Init │───▶│ Validate │───▶│ Upload │───▶│ Cleanup │
└────────────┘ └────────────┘ └────────────┘ └────────────┘
10% 10% 60% 20%

오케스트레이터는 등록 순서대로 단계를 실행합니다:

  1. Init: 스토리지 연결 및 경로 초기화
  2. Validate: 업로드 전 파일 검증
  3. Upload: 스토리지로 파일 전송
  4. Cleanup: 업로드 후 정리 작업

자동 롤백

단계가 실패하면 오케스트레이터가 이전에 완료된 모든 단계를 역순으로 자동 롤백합니다.

롤백 구현

from synapse_sdk.plugins.steps import BaseStep, StepResult
from synapse_sdk.plugins.actions.upload import UploadContext

class UploadFilesStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'upload'

@property
def progress_weight(self) -> float:
return 0.6

def execute(self, context: UploadContext) -> StepResult:
uploaded = []
for file in context.organized_files:
result = self._upload_file(file, context)
uploaded.append(result)
context.uploaded_files.append(result)

return StepResult(
success=True,
data={'uploaded_count': len(uploaded)},
rollback_data={'uploaded_files': uploaded}, # 롤백을 위해 저장
)

def rollback(self, context: UploadContext, result: StepResult) -> None:
"""실패 시 업로드된 파일 삭제."""
uploaded = result.rollback_data.get('uploaded_files', [])
for file in uploaded:
self._delete_file(file, context)

def _upload_file(self, file: dict, context: UploadContext) -> dict:
# 업로드 로직
return {'path': file['path'], 'storage_id': context.storage}

def _delete_file(self, file: dict, context: UploadContext) -> None:
# 롤백을 위한 삭제 로직
pass

롤백 흐름

┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
│ Init │───▶│ Validate │───▶│ Upload │──X─│ Cleanup │
│ (성공) │ │ (성공) │ │ (성공) │ │ (실패) │
└────────────┘ └────────────┘ └────────────┘ └────────────┘
▲ ▲ ▲
│ │ │
└─────────────────┴─────────────────┘
롤백
(역순: Upload → Validate → Init)

참고: 롤백은 최선 노력(best-effort) 방식입니다. 롤백이 실패하면 에러가 로깅되지만 다른 롤백은 계속 진행됩니다. 항상 롤백 로직을 멱등성(idempotent)으로 설계하세요.

전체 예제

완전한 파일 업로드 플러그인 구현:

from dataclasses import dataclass
from pathlib import Path

from pydantic import BaseModel, Field
from synapse_sdk.plugins.actions.upload import BaseUploadAction, UploadContext
from synapse_sdk.plugins.steps import BaseStep, StepRegistry, StepResult

# 파라미터
class FileUploadParams(BaseModel):
storage_id: int = Field(description='대상 스토리지 ID')
source_path: str = Field(description='로컬 디렉토리 경로')
extensions: list[str] = Field(default=['.jpg', '.png'], description='업로드할 파일 확장자')

# 단계
class InitializeStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'initialize'

@property
def progress_weight(self) -> float:
return 0.1

def execute(self, context: UploadContext) -> StepResult:
storage_id = context.params['storage_id']
storage = context.client.get_storage(storage_id)
context.storage = storage
context.pathlib_cwd = Path(context.params['source_path'])
return StepResult(success=True)

class OrganizeFilesStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'organize'

@property
def progress_weight(self) -> float:
return 0.1

def execute(self, context: UploadContext) -> StepResult:
extensions = context.params.get('extensions', ['.jpg', '.png'])
source_dir = context.pathlib_cwd

files = []
for ext in extensions:
files.extend(source_dir.glob(f'**/*{ext}'))

context.organized_files = [{'path': str(f), 'name': f.name} for f in files]

context.log('files_organized', {'count': len(files)})
return StepResult(success=True, data={'file_count': len(files)})

class UploadFilesStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'upload'

@property
def progress_weight(self) -> float:
return 0.6

def execute(self, context: UploadContext) -> StepResult:
files = context.organized_files
total = len(files)

for i, file in enumerate(files):
# 파일 업로드 (스토리지 타입에 따라 구현)
result = self._upload_to_storage(file, context.storage)
context.uploaded_files.append(result)

# 진행률 업데이트 (현재 단계 이름에서 자동 추론)
context.set_progress(i + 1, total)

return StepResult(
success=True,
data={'uploaded_count': total},
rollback_data={'files': context.uploaded_files},
)

def rollback(self, context: UploadContext, result: StepResult) -> None:
for file in result.rollback_data.get('files', []):
self._delete_from_storage(file, context.storage)

def _upload_to_storage(self, file: dict, storage) -> dict:
# 업로드 구현
return {'path': file['path'], 'uploaded': True}

def _delete_from_storage(self, file: dict, storage) -> None:
# 롤백 구현
pass

class FinalizeStep(BaseStep[UploadContext]):
@property
def name(self) -> str:
return 'finalize'

@property
def progress_weight(self) -> float:
return 0.2

def execute(self, context: UploadContext) -> StepResult:
# 데이터 유닛 생성 또는 업로드 완료 처리
for file in context.uploaded_files:
data_unit = {'file': file['path'], 'status': 'complete'}
context.data_units.append(data_unit)

context.log('upload_complete', {
'uploaded': len(context.uploaded_files),
'data_units': len(context.data_units),
})

return StepResult(success=True)

# 액션
class FileUploadAction(BaseUploadAction[FileUploadParams]):
"""자동 롤백을 지원하는 파일 업로드 액션."""

action_name = 'upload'

def setup_steps(self, registry: StepRegistry[UploadContext]) -> None:
registry.register(InitializeStep())
registry.register(OrganizeFilesStep())
registry.register(UploadFilesStep())
registry.register(FinalizeStep())

모범 사례

단계 설계

  • 단계는 단일 책임 유지: 각 단계는 하나의 책임만 가져야 합니다
  • 적절한 가중치 설정: progress_weight는 실제 실행 시간을 반영해야 합니다
  • 롤백 구현: 상태를 변경하는 단계는 항상 rollback()을 구현하세요

대용량 파일 처리

def execute(self, context: UploadContext) -> StepResult:
files = context.organized_files

for i, file in enumerate(files):
# 대용량 파일은 청크 업로드 사용
if file['size'] > 100_000_000: # 100MB
self._chunked_upload(file, context)
else:
self._simple_upload(file, context)

context.set_progress(i + 1, len(files))

return StepResult(success=True)

재시도 로직

from tenacity import retry, stop_after_attempt, wait_exponential

class UploadFilesStep(BaseStep[UploadContext]):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def _upload_file(self, file: dict, storage) -> dict:
# 실패 시 자동 재시도로 업로드
return storage.upload(file['path'])

조건부 단계

class ValidateStep(BaseStep[UploadContext]):
def can_skip(self, context: UploadContext) -> bool:
"""skip_validation이 설정되면 검증을 건너뜁니다."""
return context.params.get('skip_validation', False)

def execute(self, context: UploadContext) -> StepResult:
# 검증 로직
return StepResult(success=True)

관련 문서