Inference Actions
모델 추론 워크플로우를 구축하려면 inference action 을 사용하세요. 배치 추론에는 BaseInferenceAction, Ray Serve 를 통한 REST API 서빙에는 BaseDeploymentAction 을 사용합니다.
개요
Synapse SDK 는 추론 워크플로우를 위한 두 가지 기반 클래스를 제공합니다.
| 기반 클래스 | 목적 | 사용 사례 |
|---|---|---|
BaseInferenceAction | 배치 추론 | 데이터셋 처리, 오프라인 예측 |
BaseDeploymentAction | REST API 서빙 | Ray Serve 를 통한 실시간 추론 엔드포인트 |
두 클래스 모두 두 가지 실행 모드를 지원합니다.
- Simple Mode: 단순한 워크플로우에서
execute()를 직접 오버라이드 - Step-Based Mode: 복잡한 파이프라인에서
setup_steps()로 워크플로우 단계를 등록
BaseInferenceAction
추론 액션의 기반 클래스입니다. 모델 로딩과 추론 워크플로우를 위한 헬퍼 메서드를 제공합니다.
class BaseInferenceAction(BaseAction[P]):
category = PluginCategory.NEURAL_NET
progress = InferenceProgressCategories()
진행률 카테고리
다음 표준 카테고리로 추론 진행률을 추적합니다.
| 카테고리 | 값 | 설명 |
|---|---|---|
MODEL_LOAD | 'model_load' | 모델 로딩 및 초기화 |
INFERENCE | 'inference' | 입력에 대한 추론 실행 |
POSTPROCESS | 'postprocess' | 결과 후처리 |
self.set_progress(1, 3, self.progress.MODEL_LOAD)
self.set_progress(2, 3, self.progress.INFERENCE)
self.set_progress(3, 3, self.progress.POSTPROCESS)
헬퍼 메서드
get_model
ID 로 모델 메타데이터를 조회합니다.
def get_model(self, model_id: int) -> dict[str, Any]
| 파라미터 | 타입 | 필수 | 설명 |
|---|---|---|---|
model_id | int | 예 | 모델 식별자 |
반환값: 파일 URL 을 포함한 모델 메타데이터 딕셔너리.
model = self.get_model(123)
print(model['name'], model['file'])
download_model
모델 아티팩트를 다운로드하고 추출합니다.
def download_model(
self,
model_id: int,
output_dir: str | Path | None = None,
) -> Path
| 파라미터 | 타입 | 필수 | 기본값 | 설명 |
|---|---|---|---|---|
model_id | int | 예 | - | 모델 식별자 |
output_dir | str | Path | None | 아니오 | None | 모델을 추출할 디렉토리. None 이면 임시 디렉토리 사용 |
반환값: 추출된 모델 디렉토리의 Path.
model_path = self.download_model(123)
# model_path 에 추출된 모델 아티팩트가 위치합니다
load_model
추론을 위한 모델을 로드합니다. 아티팩트를 다운로드하고 로컬 경로가 포함된 모델 정보를 반환합니다.
def load_model(self, model_id: int) -> dict[str, Any]
| 파라미터 | 타입 | 필수 | 설명 |
|---|---|---|---|
model_id | int | 예 | 모델 식별자 |
반환값: 로컬 아티팩트를 가리키는 'path' 키를 포함한 모델 메타데이터 딕셔너리.
model_info = self.load_model(123)
model_path = model_info['path']
# 여기서 프레임워크 모델을 로드합니다:
# model = torch.load(model_path / 'model.pt')
infer
입력에 대해 추론을 실행합니다. 이 메서드를 오버라이드하여 추론 로직을 구현하세요.
def infer(
self,
model: Any,
inputs: list[dict[str, Any]],
) -> list[dict[str, Any]]
| 파라미터 | 타입 | 필수 | 설명 |
|---|---|---|---|
model | Any | 예 | 로드된 모델 (프레임워크별) |
inputs | list[dict[str, Any]] | 예 | 입력 딕셔너리의 리스트 |
반환값: 결과 딕셔너리의 리스트.
def infer(self, model, inputs):
results = []
for inp in inputs:
prediction = model.predict(inp['image'])
results.append({'prediction': prediction})
return results
실행 모드
Simple Mode
단순한 워크플로우에서는 execute() 를 직접 오버라이드합니다.
from synapse_sdk.plugins.actions.inference import BaseInferenceAction
from pydantic import BaseModel
class InferenceParams(BaseModel):
model_id: int
inputs: list[dict]
class MyInferenceAction(BaseInferenceAction[InferenceParams]):
action_name = 'inference'
def execute(self) -> dict[str, Any]:
# 모델 로드
model_info = self.load_model(self.params.model_id)
self.set_progress(1, 3, self.progress.MODEL_LOAD)
# 추론 실행
results = self.infer(model_info, self.params.inputs)
self.set_progress(2, 3, self.progress.INFERENCE)
# 후처리
processed = self._postprocess(results)
self.set_progress(3, 3, self.progress.POSTPROCESS)
return {'results': processed}
def infer(self, model, inputs):
import torch
model_obj = torch.load(model['path'] + '/model.pt')
results = []
for inp in inputs:
pred = model_obj(inp['tensor'])
results.append({'prediction': pred.tolist()})
return results
Step-Based Mode
복잡한 파이프라인에서는 setup_steps() 로 워크플로우 단계를 등록합니다.
from synapse_sdk.plugins.actions.inference import (
BaseInferenceAction,
InferenceContext,
)
from synapse_sdk.plugins.steps import BaseStep, StepResult, StepRegistry
class LoadModelStep(BaseStep[InferenceContext]):
@property
def name(self) -> str:
return 'load_model'
@property
def progress_weight(self) -> float:
return 0.3
def execute(self, context: InferenceContext) -> StepResult:
# 컨텍스트를 사용한 모델 로드
import torch
model_path = context.model_path
context.model = torch.load(f'{model_path}/model.pt')
return StepResult(success=True)
class InferenceStep(BaseStep[InferenceContext]):
@property
def name(self) -> str:
return 'inference'
@property
def progress_weight(self) -> float:
return 0.7
def execute(self, context: InferenceContext) -> StepResult:
for request in context.requests:
prediction = context.model(request['input'])
context.results.append({'prediction': prediction})
context.processed_count += 1
return StepResult(success=True)
class MyInferenceAction(BaseInferenceAction[InferenceParams]):
action_name = 'inference'
def setup_steps(self, registry: StepRegistry[InferenceContext]) -> None:
registry.register(LoadModelStep())
registry.register(InferenceStep())
InferenceContext
추론 액션의 단계 기반 워크플로우를 위한 컨텍스트입니다. BaseStepContext 를 확장하여 추론 전용 상태를 추가합니다.
@dataclass
class InferenceContext(BaseStepContext):
params: dict[str, Any] = field(default_factory=dict)
model_id: int | None = None
model: dict[str, Any] | None = None
model_path: str | None = None
requests: list[dict[str, Any]] = field(default_factory=list)
results: list[dict[str, Any]] = field(default_factory=list)
batch_size: int = 1
processed_count: int = 0
| 속성 | 타입 | 설명 |
|---|---|---|
params | dict[str, Any] | 액션 파라미터 |
model_id | int | None | 사용 중인 모델 ID |
model | dict[str, Any] | None | 백엔드에서 로드한 모델 정보 |
model_path | str | None | 다운로드된 모델의 로컬 경로 |
requests | list[dict[str, Any]] | 처리할 입력 요청 |
results | list[dict[str, Any]] | 추론 결과 |
batch_size | int | 처리 배치 크기 |
processed_count | int | 처리된 아이템 수 |
예제: 배치 추론
PyTorch 를 사용한 배치 추론 액션의 완성된 예제입니다.
from pathlib import Path
from typing import Any
import torch
from pydantic import BaseModel
from synapse_sdk.plugins.actions.inference import BaseInferenceAction
class BatchInferenceParams(BaseModel):
model_id: int
inputs: list[dict[str, Any]]
batch_size: int = 32
class BatchInferenceAction(BaseInferenceAction[BatchInferenceParams]):
"""PyTorch 모델용 배치 추론 액션."""
action_name = 'batch_inference'
def execute(self) -> dict[str, Any]:
# Step 1: 모델 로드
model_info = self.load_model(self.params.model_id)
model_path = Path(model_info['path'])
model = torch.load(model_path / 'model.pt')
model.eval()
self.set_progress(1, 3, self.progress.MODEL_LOAD)
# Step 2: 배치 단위로 추론 실행
results = []
inputs = self.params.inputs
batch_size = self.params.batch_size
for i in range(0, len(inputs), batch_size):
batch = inputs[i:i + batch_size]
batch_tensors = torch.stack([
torch.tensor(inp['data']) for inp in batch
])
with torch.no_grad():
predictions = model(batch_tensors)
for pred in predictions:
results.append({'prediction': pred.tolist()})
self.set_progress(2, 3, self.progress.INFERENCE)
# Step 3: 후처리
self.set_progress(3, 3, self.progress.POSTPROCESS)
return {
'results': results,
'processed_count': len(results),
}
def infer(self, model, inputs):
# 선택사항: 커스텀 추론 로직을 위해 오버라이드
pass
Deployment Actions
REST API 를 Ray Serve 를 통해 서빙하려면 deployment action 을 사용합니다. 추론 엔드포인트를 배포하려면 BaseDeploymentAction 을 사용하세요.
BaseDeploymentAction
Ray Serve 배포 액션의 기반 클래스입니다. Ray 초기화, 배포 생성, 백엔드 등록을 처리합니다.
class BaseDeploymentAction(BaseAction[P]):
progress = DeploymentProgressCategories()
entrypoint: type | None = None # serve 배포 클래스를 지정
DeploymentProgressCategories
| 카테고리 | 값 | 설명 |
|---|---|---|
INITIALIZE | 'initialize' | Ray 클러스터 초기화 |
DEPLOY | 'deploy' | Ray Serve 에 배포 |
REGISTER | 'register' | 백엔드에 등록 |
배포 메서드
ray_init
Ray 클러스터 연결을 초기화합니다.
def ray_init(self, **kwargs: Any) -> None
deploy
추론 엔드포인트를 Ray Serve 에 배포합니다.
def deploy(self) -> None
내부적으로 serve.run(...) 직전에 사전(pre-flight) 클러스터 용량 게이트(_check_serve_capacity)를 호출합니다. 아래의 Capacity Gate 섹션을 참고하세요.
register_serve_application
서브 애플리케이션을 백엔드에 등록합니다.
def register_serve_application(self) -> int | None
반환값: 생성된 경우 serve 애플리케이션 ID, 그렇지 않으면 None.
구성 메서드
다음 메서드를 오버라이드하여 배포를 커스터마이징합니다.
| 메서드 | 기본값 | 설명 |
|---|---|---|
get_serve_app_name() | SYNAPSE_PLUGIN_RELEASE_CODE 환경 변수 | Serve 애플리케이션 이름 |
get_route_prefix() | SYNAPSE_PLUGIN_RELEASE_CHECKSUM 환경 변수 | URL 라우트 prefix |
get_ray_actor_options() | params 에서 추출 | Ray actor 옵션 (num_cpus, num_gpus, memory) |
get_runtime_env() | {} | Ray runtime 환경 |
BaseServeDeployment
Ray Serve 추론 배포의 기반 클래스입니다. BaseAction 을 상속하며 multiplexing 을 지원하는 모델 로딩을 제공합니다.
class BaseServeDeployment(BaseAction):
def __init__(self, backend_url: str) -> None:
self.backend_url = backend_url
self._model_cache: dict[str, Any] = {}
구현해야 할 추상 메서드:
| 메서드 | 설명 |
|---|---|
async _get_model(model_info: dict) -> Any | 추출된 아티팩트에서 모델 로드 |
async infer(*args, **kwargs) -> Any | 입력에 대해 추론 실행 |
클래스 속성:
| 속성 | 설명 |
|---|---|
action_name | 액션 디스커버리용 이름 (예: 'inference') |
app | FastAPI 앱 인스턴스 (데코레이터는 deploy() 에서 자동 적용) |
infer_remote
배포된 serve 엔드포인트로 추론을 호출합니다. inference action 이 실행될 때 entrypoint 에서 사용됩니다.
@classmethod
def infer_remote(cls, params: dict[str, Any], ctx: Any) -> Any
Params 형식:
| 키 | 타입 | 필수 | 설명 |
|---|---|---|---|
model | int | str | 아니오 | Multiplexing 용 모델 ID |
method | str | 아니오 | HTTP 메서드 (기본값: 'post') |
json | dict | 예 | serve 엔드포인트로 전송할 요청 본문 |
# Inference params payload
params = {
"model": 34,
"method": "post",
"json": {
"image_path": "https://example.com/image.jpg",
"threshold": 0.5,
}
}
DeploymentContext
배포 액션의 단계 기반 워크플로우를 위한 컨텍스트입니다.
@dataclass
class DeploymentContext(BaseStepContext):
params: dict[str, Any] = field(default_factory=dict)
model_id: int | None = None
model: dict[str, Any] | None = None
model_path: str | None = None
serve_app_name: str | None = None
serve_app_id: int | None = None
route_prefix: str | None = None
ray_actor_options: dict[str, Any] = field(default_factory=dict)
deployed: bool = False
| 속성 | 타입 | 설명 |
|---|---|---|
serve_app_name | str | None | Ray Serve 애플리케이션 이름 |
serve_app_id | int | None | 생성된 serve 애플리케이션 ID |
route_prefix | str | None | 배포의 URL 라우트 prefix |
ray_actor_options | dict[str, Any] | Ray actor 구성 |
deployed | bool | 배포 성공 여부 |
create_serve_multiplexed_model_id
Serve multiplexing 용 JWT 인코딩 모델 ID 를 생성합니다.
def create_serve_multiplexed_model_id(
model_id: int | str,
token: str,
backend_url: str,
tenant: str | None = None,
) -> str
| 파라미터 | 타입 | 필수 | 설명 |
|---|---|---|---|
model_id | int | str | 예 | 인코딩할 모델 ID |
token | str | 예 | 사용자 액세스 토큰 |
backend_url | str | 예 | 백엔드 URL (JWT 시크릿으로 사용) |
tenant | str | None | 아니오 | 테넌트 식별자 |
반환값: JWT 인코딩된 모델 토큰 문자열.
from synapse_sdk.plugins.actions.inference import create_serve_multiplexed_model_id
model_token = create_serve_multiplexed_model_id(
model_id=123,
token='user_access_token',
backend_url='https://api.example.com',
tenant='my-tenant',
)
# 요청 헤더에 사용:
headers = {'serve_multiplexed_model_id': model_token}
예제: 모델 배포
Ray Serve 로 PyTorch 모델을 배포하는 완성된 예제입니다.
from typing import Any
from fastapi import FastAPI
from pydantic import BaseModel
from synapse_sdk.plugins.actions.inference import (
BaseDeploymentAction,
BaseServeDeployment,
)
app = FastAPI()
class PyTorchInference(BaseServeDeployment):
"""PyTorch 추론 배포.
@serve.deployment 와 @serve.ingress(app) 데코레이터는
BaseDeploymentAction.deploy() 가 자동으로 적용합니다.
"""
action_name = 'inference'
app = app
async def _get_model(self, model_info: dict[str, Any]) -> Any:
import torch
model_path = model_info['path'] / 'model.pt'
model = torch.load(model_path)
model.eval()
return model
@app.post('/')
async def infer(self, inputs: list[dict]) -> list[dict]:
model = await self.get_model()
import torch
results = []
for inp in inputs:
tensor = torch.tensor(inp['data'])
with torch.no_grad():
prediction = model(tensor)
results.append({'prediction': prediction.tolist()})
return results
class DeploymentParams(BaseModel):
model: int
num_gpus: int = 1
class MyDeploymentAction(BaseDeploymentAction[DeploymentParams]):
"""PyTorch 모델을 Ray Serve 에 배포."""
action_name = 'deployment'
entrypoint = PyTorchInference
def execute(self) -> dict[str, Any]:
# Ray 초기화
self.ray_init()
self.set_progress(1, 3, self.progress.INITIALIZE)
# Ray Serve 에 배포
self.deploy()
self.set_progress(2, 3, self.progress.DEPLOY)
# 백엔드에 등록
app_id = self.register_serve_application()
self.set_progress(3, 3, self.progress.REGISTER)
return {'serve_application': app_id}
Capacity Gate (SYN-7005)
BaseDeploymentAction.deploy() 는 serve.run(...) 을 호출하기 직전에 사전(pre-flight) 클러스터 용량 게이트를 실행합니다. 이 게이트는 agent 의 POST /resources/feasibility/ 엔드포인트를 ResourceClientMixin.check_feasibility 를 통해 호출하며, 클러스터가 배포를 수용할 수 없는 경우 fail-closed(차단) 동작합니다.
분기
| 조건 | 동작 | 예외 / 로그 |
|---|---|---|
ctx.agent_client is None | 우아한 스킵 — serve.run 진행 | WARNING: serve capacity gate skipped: agent_client not provided |
Agent 응답 allowed=True | serve.run 으로 진행 | — |
Agent 응답 allowed=False | 배포 차단 | RuntimeError('Serve deploy denied: insufficient cluster capacity ([reasons])') |
Agent 도달 불가 (ClientError / ClientTimeoutError / TimeoutError / ConnectionError / OSError) | 배포 차단 (fail-closed) | RuntimeError('Serve deploy capacity check failed: agent unreachable ({error_type})') |
형식 오류 응답 (dict 아님, allowed 없음) | 배포 차단 (fail-closed) | RuntimeError('Serve deploy capacity check failed: agent unreachable (MalformedResponse)') |
원본 예외는 진단을 위해 __cause__ 로 보존되지만, agent URL 이나 토큰이 사용자 메시지에 노출되지 않도록 래퍼 메시지에서는 cause 문자열을 의도적으로 생략합니다.
요청 페이로드
게이트는 ray_actor_options 와 serve_options 에서 페이로드를 파생합니다.
| 키 | 출처 | 기본값 | 비고 |
|---|---|---|---|
kind | 하드코딩 | 'serve' | agent 엔드포인트가 요구 |
num_cpus | ray_actor_options['num_cpus'] | 1.0 | float 로 강제 변환 |
num_gpus | ray_actor_options['num_gpus'] | 0.0 | float 로 강제 변환 |
memory_bytes | ray_actor_options['memory'] | None | 존재할 때 int 로 강제 변환 |
replicas | serve_options['num_replicas'] | 1 | int 로 강제 변환 |
metadata.plugin_code | get_serve_app_name() | — | 예: '[email protected]' |
metadata.agent | SYNAPSE_AGENT_ID 환경 변수 | '' | — |
진단
"Serve deploy denied" 오류를 트러블슈팅할 때는 오류 메시지의 reasons 를 확인하고 get_gatekeeping_policy 로 agent 의 gatekeeping 정책과 교차 검증하세요. "agent unreachable" 오류는 네트워크 문제와 agent-side 문제를 구분할 수 있도록 원인 예외 타입(ClientError / TimeoutError 등)이 포함됩니다.
try:
self.deploy()
except RuntimeError as exc:
if 'insufficient cluster capacity' in str(exc):
# 클러스터 포화 — 사용자에게 표시하거나 재시도
self.log('capacity_denied', {'error': str(exc)})
elif 'agent unreachable' in str(exc):
# 일시적 네트워크 / agent 이슈 — 원본 예외는 __cause__ 에 있음
self.log('agent_unreachable', {'cause': type(exc.__cause__).__name__})
raise
SDK 는 단일 영문 메시지를 발생시킵니다. 백엔드는 to_task 시리얼라이저 경로를 통해 deploy 가 디스패치될 때 이를 지역화된 ValidationError (ko / en) 로 변환합니다.
단계 기반 배포 경로(setup_steps 레지스트리)와 async 클라이언트(AsyncAgentClient) 용량 게이트는 후속 작업으로 미뤄집니다. 본 릴리스에서는 단순 execute() → deploy() 경로만 게이팅됩니다.
추론 실행
배포 후 inference 액션을 통해 serve 엔드포인트를 호출합니다.
synapse plugin run inference --mode job --params '{"model": 34, "method": "post", "json": {"inputs": [{"data": [1, 2, 3]}]}}'
엔트리포인트는 BaseServeDeployment 서브클래스를 감지하고 infer_remote() 를 호출하며, 배포된 엔드포인트의 라우트 prefix 를 확인하여 요청을 전달합니다.
모범 사례
모델 캐싱
반복 다운로드를 피하기 위해 로드된 모델을 캐시합니다.
class CachedInferenceAction(BaseInferenceAction[InferenceParams]):
_model_cache: dict[int, Any] = {}
def load_model_cached(self, model_id: int) -> Any:
if model_id not in self._model_cache:
model_info = self.load_model(model_id)
self._model_cache[model_id] = torch.load(model_info['path'] + '/model.pt')
return self._model_cache[model_id]
배치 처리 최적화
처리량 향상을 위해 입력을 배치 단위로 처리합니다.
def execute(self) -> dict[str, Any]:
model = self.load_model_cached(self.params.model_id)
# 배치 단위 처리
batch_size = self.params.batch_size
results = []
for i in range(0, len(self.params.inputs), batch_size):
batch = self.params.inputs[i:i + batch_size]
batch_results = self._process_batch(model, batch)
results.extend(batch_results)
# 진행률 업데이트
progress = min((i + batch_size) / len(self.params.inputs), 1.0)
self.set_progress(int(progress * 100), 100, self.progress.INFERENCE)
return {'results': results}
에러 처리
모델 로딩과 추론 에러를 우아하게 처리합니다.
def execute(self) -> dict[str, Any]:
try:
model_info = self.load_model(self.params.model_id)
except ValueError as e:
self.log('model_load_error', {'error': str(e)})
return {'error': f'Failed to load model: {e}'}
try:
results = self.infer(model_info, self.params.inputs)
except Exception as e:
self.log('inference_error', {'error': str(e)})
return {'error': f'Inference failed: {e}', 'partial_results': []}
return {'results': results}
관련 문서
- Defining Actions - 핵심 액션 기반 클래스
- Step-Based Workflows - 단계 기반으로 복잡한 워크플로우 구성
- Train Actions - 모델 학습 워크플로우