본문으로 건너뛰기

Inference Actions

모델 추론 워크플로우를 구축하려면 inference action 을 사용하세요. 배치 추론에는 BaseInferenceAction, Ray Serve 를 통한 REST API 서빙에는 BaseDeploymentAction 을 사용합니다.

개요

Synapse SDK 는 추론 워크플로우를 위한 두 가지 기반 클래스를 제공합니다.

기반 클래스목적사용 사례
BaseInferenceAction배치 추론데이터셋 처리, 오프라인 예측
BaseDeploymentActionREST API 서빙Ray Serve 를 통한 실시간 추론 엔드포인트

두 클래스 모두 두 가지 실행 모드를 지원합니다.

  • Simple Mode: 단순한 워크플로우에서 execute() 를 직접 오버라이드
  • Step-Based Mode: 복잡한 파이프라인에서 setup_steps() 로 워크플로우 단계를 등록

BaseInferenceAction

추론 액션의 기반 클래스입니다. 모델 로딩과 추론 워크플로우를 위한 헬퍼 메서드를 제공합니다.

class BaseInferenceAction(BaseAction[P]):
category = PluginCategory.NEURAL_NET
progress = InferenceProgressCategories()

진행률 카테고리

다음 표준 카테고리로 추론 진행률을 추적합니다.

카테고리설명
MODEL_LOAD'model_load'모델 로딩 및 초기화
INFERENCE'inference'입력에 대한 추론 실행
POSTPROCESS'postprocess'결과 후처리
self.set_progress(1, 3, self.progress.MODEL_LOAD)
self.set_progress(2, 3, self.progress.INFERENCE)
self.set_progress(3, 3, self.progress.POSTPROCESS)

헬퍼 메서드

get_model

ID 로 모델 메타데이터를 조회합니다.

def get_model(self, model_id: int) -> dict[str, Any]
파라미터타입필수설명
model_idint모델 식별자

반환값: 파일 URL 을 포함한 모델 메타데이터 딕셔너리.

model = self.get_model(123)
print(model['name'], model['file'])

download_model

모델 아티팩트를 다운로드하고 추출합니다.

def download_model(
self,
model_id: int,
output_dir: str | Path | None = None,
) -> Path
파라미터타입필수기본값설명
model_idint-모델 식별자
output_dirstr | Path | None아니오None모델을 추출할 디렉토리. None 이면 임시 디렉토리 사용

반환값: 추출된 모델 디렉토리의 Path.

model_path = self.download_model(123)
# model_path 에 추출된 모델 아티팩트가 위치합니다

load_model

추론을 위한 모델을 로드합니다. 아티팩트를 다운로드하고 로컬 경로가 포함된 모델 정보를 반환합니다.

def load_model(self, model_id: int) -> dict[str, Any]
파라미터타입필수설명
model_idint모델 식별자

반환값: 로컬 아티팩트를 가리키는 'path' 키를 포함한 모델 메타데이터 딕셔너리.

model_info = self.load_model(123)
model_path = model_info['path']
# 여기서 프레임워크 모델을 로드합니다:
# model = torch.load(model_path / 'model.pt')

infer

입력에 대해 추론을 실행합니다. 이 메서드를 오버라이드하여 추론 로직을 구현하세요.

def infer(
self,
model: Any,
inputs: list[dict[str, Any]],
) -> list[dict[str, Any]]
파라미터타입필수설명
modelAny로드된 모델 (프레임워크별)
inputslist[dict[str, Any]]입력 딕셔너리의 리스트

반환값: 결과 딕셔너리의 리스트.

def infer(self, model, inputs):
results = []
for inp in inputs:
prediction = model.predict(inp['image'])
results.append({'prediction': prediction})
return results

실행 모드

Simple Mode

단순한 워크플로우에서는 execute() 를 직접 오버라이드합니다.

from synapse_sdk.plugins.actions.inference import BaseInferenceAction
from pydantic import BaseModel

class InferenceParams(BaseModel):
model_id: int
inputs: list[dict]

class MyInferenceAction(BaseInferenceAction[InferenceParams]):
action_name = 'inference'

def execute(self) -> dict[str, Any]:
# 모델 로드
model_info = self.load_model(self.params.model_id)
self.set_progress(1, 3, self.progress.MODEL_LOAD)

# 추론 실행
results = self.infer(model_info, self.params.inputs)
self.set_progress(2, 3, self.progress.INFERENCE)

# 후처리
processed = self._postprocess(results)
self.set_progress(3, 3, self.progress.POSTPROCESS)

return {'results': processed}

def infer(self, model, inputs):
import torch

model_obj = torch.load(model['path'] + '/model.pt')
results = []
for inp in inputs:
pred = model_obj(inp['tensor'])
results.append({'prediction': pred.tolist()})
return results

Step-Based Mode

복잡한 파이프라인에서는 setup_steps() 로 워크플로우 단계를 등록합니다.

from synapse_sdk.plugins.actions.inference import (
BaseInferenceAction,
InferenceContext,
)
from synapse_sdk.plugins.steps import BaseStep, StepResult, StepRegistry

class LoadModelStep(BaseStep[InferenceContext]):
@property
def name(self) -> str:
return 'load_model'

@property
def progress_weight(self) -> float:
return 0.3

def execute(self, context: InferenceContext) -> StepResult:
# 컨텍스트를 사용한 모델 로드
import torch

model_path = context.model_path
context.model = torch.load(f'{model_path}/model.pt')
return StepResult(success=True)

class InferenceStep(BaseStep[InferenceContext]):
@property
def name(self) -> str:
return 'inference'

@property
def progress_weight(self) -> float:
return 0.7

def execute(self, context: InferenceContext) -> StepResult:
for request in context.requests:
prediction = context.model(request['input'])
context.results.append({'prediction': prediction})
context.processed_count += 1
return StepResult(success=True)

class MyInferenceAction(BaseInferenceAction[InferenceParams]):
action_name = 'inference'

def setup_steps(self, registry: StepRegistry[InferenceContext]) -> None:
registry.register(LoadModelStep())
registry.register(InferenceStep())

InferenceContext

추론 액션의 단계 기반 워크플로우를 위한 컨텍스트입니다. BaseStepContext 를 확장하여 추론 전용 상태를 추가합니다.

@dataclass
class InferenceContext(BaseStepContext):
params: dict[str, Any] = field(default_factory=dict)
model_id: int | None = None
model: dict[str, Any] | None = None
model_path: str | None = None
requests: list[dict[str, Any]] = field(default_factory=list)
results: list[dict[str, Any]] = field(default_factory=list)
batch_size: int = 1
processed_count: int = 0
속성타입설명
paramsdict[str, Any]액션 파라미터
model_idint | None사용 중인 모델 ID
modeldict[str, Any] | None백엔드에서 로드한 모델 정보
model_pathstr | None다운로드된 모델의 로컬 경로
requestslist[dict[str, Any]]처리할 입력 요청
resultslist[dict[str, Any]]추론 결과
batch_sizeint처리 배치 크기
processed_countint처리된 아이템 수

예제: 배치 추론

PyTorch 를 사용한 배치 추론 액션의 완성된 예제입니다.

from pathlib import Path
from typing import Any

import torch
from pydantic import BaseModel

from synapse_sdk.plugins.actions.inference import BaseInferenceAction


class BatchInferenceParams(BaseModel):
model_id: int
inputs: list[dict[str, Any]]
batch_size: int = 32


class BatchInferenceAction(BaseInferenceAction[BatchInferenceParams]):
"""PyTorch 모델용 배치 추론 액션."""

action_name = 'batch_inference'

def execute(self) -> dict[str, Any]:
# Step 1: 모델 로드
model_info = self.load_model(self.params.model_id)
model_path = Path(model_info['path'])
model = torch.load(model_path / 'model.pt')
model.eval()
self.set_progress(1, 3, self.progress.MODEL_LOAD)

# Step 2: 배치 단위로 추론 실행
results = []
inputs = self.params.inputs
batch_size = self.params.batch_size

for i in range(0, len(inputs), batch_size):
batch = inputs[i:i + batch_size]
batch_tensors = torch.stack([
torch.tensor(inp['data']) for inp in batch
])

with torch.no_grad():
predictions = model(batch_tensors)

for pred in predictions:
results.append({'prediction': pred.tolist()})

self.set_progress(2, 3, self.progress.INFERENCE)

# Step 3: 후처리
self.set_progress(3, 3, self.progress.POSTPROCESS)

return {
'results': results,
'processed_count': len(results),
}

def infer(self, model, inputs):
# 선택사항: 커스텀 추론 로직을 위해 오버라이드
pass

Deployment Actions

REST API 를 Ray Serve 를 통해 서빙하려면 deployment action 을 사용합니다. 추론 엔드포인트를 배포하려면 BaseDeploymentAction 을 사용하세요.

BaseDeploymentAction

Ray Serve 배포 액션의 기반 클래스입니다. Ray 초기화, 배포 생성, 백엔드 등록을 처리합니다.

class BaseDeploymentAction(BaseAction[P]):
progress = DeploymentProgressCategories()
entrypoint: type | None = None # serve 배포 클래스를 지정

DeploymentProgressCategories

카테고리설명
INITIALIZE'initialize'Ray 클러스터 초기화
DEPLOY'deploy'Ray Serve 에 배포
REGISTER'register'백엔드에 등록

배포 메서드

ray_init

Ray 클러스터 연결을 초기화합니다.

def ray_init(self, **kwargs: Any) -> None

deploy

추론 엔드포인트를 Ray Serve 에 배포합니다.

def deploy(self) -> None

내부적으로 serve.run(...) 직전에 사전(pre-flight) 클러스터 용량 게이트(_check_serve_capacity)를 호출합니다. 아래의 Capacity Gate 섹션을 참고하세요.

register_serve_application

서브 애플리케이션을 백엔드에 등록합니다.

def register_serve_application(self) -> int | None

반환값: 생성된 경우 serve 애플리케이션 ID, 그렇지 않으면 None.

구성 메서드

다음 메서드를 오버라이드하여 배포를 커스터마이징합니다.

메서드기본값설명
get_serve_app_name()SYNAPSE_PLUGIN_RELEASE_CODE 환경 변수Serve 애플리케이션 이름
get_route_prefix()SYNAPSE_PLUGIN_RELEASE_CHECKSUM 환경 변수URL 라우트 prefix
get_ray_actor_options()params 에서 추출Ray actor 옵션 (num_cpus, num_gpus, memory)
get_runtime_env(){}Ray runtime 환경

BaseServeDeployment

Ray Serve 추론 배포의 기반 클래스입니다. BaseAction 을 상속하며 multiplexing 을 지원하는 모델 로딩을 제공합니다.

class BaseServeDeployment(BaseAction):
def __init__(self, backend_url: str) -> None:
self.backend_url = backend_url
self._model_cache: dict[str, Any] = {}

구현해야 할 추상 메서드:

메서드설명
async _get_model(model_info: dict) -> Any추출된 아티팩트에서 모델 로드
async infer(*args, **kwargs) -> Any입력에 대해 추론 실행

클래스 속성:

속성설명
action_name액션 디스커버리용 이름 (예: 'inference')
appFastAPI 앱 인스턴스 (데코레이터는 deploy() 에서 자동 적용)

infer_remote

배포된 serve 엔드포인트로 추론을 호출합니다. inference action 이 실행될 때 entrypoint 에서 사용됩니다.

@classmethod
def infer_remote(cls, params: dict[str, Any], ctx: Any) -> Any

Params 형식:

타입필수설명
modelint | str아니오Multiplexing 용 모델 ID
methodstr아니오HTTP 메서드 (기본값: 'post')
jsondictserve 엔드포인트로 전송할 요청 본문
# Inference params payload
params = {
"model": 34,
"method": "post",
"json": {
"image_path": "https://example.com/image.jpg",
"threshold": 0.5,
}
}

DeploymentContext

배포 액션의 단계 기반 워크플로우를 위한 컨텍스트입니다.

@dataclass
class DeploymentContext(BaseStepContext):
params: dict[str, Any] = field(default_factory=dict)
model_id: int | None = None
model: dict[str, Any] | None = None
model_path: str | None = None
serve_app_name: str | None = None
serve_app_id: int | None = None
route_prefix: str | None = None
ray_actor_options: dict[str, Any] = field(default_factory=dict)
deployed: bool = False
속성타입설명
serve_app_namestr | NoneRay Serve 애플리케이션 이름
serve_app_idint | None생성된 serve 애플리케이션 ID
route_prefixstr | None배포의 URL 라우트 prefix
ray_actor_optionsdict[str, Any]Ray actor 구성
deployedbool배포 성공 여부

create_serve_multiplexed_model_id

Serve multiplexing 용 JWT 인코딩 모델 ID 를 생성합니다.

def create_serve_multiplexed_model_id(
model_id: int | str,
token: str,
backend_url: str,
tenant: str | None = None,
) -> str
파라미터타입필수설명
model_idint | str인코딩할 모델 ID
tokenstr사용자 액세스 토큰
backend_urlstr백엔드 URL (JWT 시크릿으로 사용)
tenantstr | None아니오테넌트 식별자

반환값: JWT 인코딩된 모델 토큰 문자열.

from synapse_sdk.plugins.actions.inference import create_serve_multiplexed_model_id

model_token = create_serve_multiplexed_model_id(
model_id=123,
token='user_access_token',
backend_url='https://api.example.com',
tenant='my-tenant',
)
# 요청 헤더에 사용:
headers = {'serve_multiplexed_model_id': model_token}

예제: 모델 배포

Ray Serve 로 PyTorch 모델을 배포하는 완성된 예제입니다.

from typing import Any

from fastapi import FastAPI
from pydantic import BaseModel

from synapse_sdk.plugins.actions.inference import (
BaseDeploymentAction,
BaseServeDeployment,
)

app = FastAPI()


class PyTorchInference(BaseServeDeployment):
"""PyTorch 추론 배포.

@serve.deployment 와 @serve.ingress(app) 데코레이터는
BaseDeploymentAction.deploy() 가 자동으로 적용합니다.
"""

action_name = 'inference'
app = app

async def _get_model(self, model_info: dict[str, Any]) -> Any:
import torch

model_path = model_info['path'] / 'model.pt'
model = torch.load(model_path)
model.eval()
return model

@app.post('/')
async def infer(self, inputs: list[dict]) -> list[dict]:
model = await self.get_model()

import torch

results = []
for inp in inputs:
tensor = torch.tensor(inp['data'])
with torch.no_grad():
prediction = model(tensor)
results.append({'prediction': prediction.tolist()})

return results


class DeploymentParams(BaseModel):
model: int
num_gpus: int = 1


class MyDeploymentAction(BaseDeploymentAction[DeploymentParams]):
"""PyTorch 모델을 Ray Serve 에 배포."""

action_name = 'deployment'
entrypoint = PyTorchInference

def execute(self) -> dict[str, Any]:
# Ray 초기화
self.ray_init()
self.set_progress(1, 3, self.progress.INITIALIZE)

# Ray Serve 에 배포
self.deploy()
self.set_progress(2, 3, self.progress.DEPLOY)

# 백엔드에 등록
app_id = self.register_serve_application()
self.set_progress(3, 3, self.progress.REGISTER)

return {'serve_application': app_id}

Capacity Gate (SYN-7005)

BaseDeploymentAction.deploy()serve.run(...) 을 호출하기 직전에 사전(pre-flight) 클러스터 용량 게이트를 실행합니다. 이 게이트는 agent 의 POST /resources/feasibility/ 엔드포인트를 ResourceClientMixin.check_feasibility 를 통해 호출하며, 클러스터가 배포를 수용할 수 없는 경우 fail-closed(차단) 동작합니다.

분기

조건동작예외 / 로그
ctx.agent_client is None우아한 스킵 — serve.run 진행WARNING: serve capacity gate skipped: agent_client not provided
Agent 응답 allowed=Trueserve.run 으로 진행
Agent 응답 allowed=False배포 차단RuntimeError('Serve deploy denied: insufficient cluster capacity ([reasons])')
Agent 도달 불가 (ClientError / ClientTimeoutError / TimeoutError / ConnectionError / OSError)배포 차단 (fail-closed)RuntimeError('Serve deploy capacity check failed: agent unreachable ({error_type})')
형식 오류 응답 (dict 아님, allowed 없음)배포 차단 (fail-closed)RuntimeError('Serve deploy capacity check failed: agent unreachable (MalformedResponse)')

원본 예외는 진단을 위해 __cause__ 로 보존되지만, agent URL 이나 토큰이 사용자 메시지에 노출되지 않도록 래퍼 메시지에서는 cause 문자열을 의도적으로 생략합니다.

요청 페이로드

게이트는 ray_actor_optionsserve_options 에서 페이로드를 파생합니다.

출처기본값비고
kind하드코딩'serve'agent 엔드포인트가 요구
num_cpusray_actor_options['num_cpus']1.0float 로 강제 변환
num_gpusray_actor_options['num_gpus']0.0float 로 강제 변환
memory_bytesray_actor_options['memory']None존재할 때 int 로 강제 변환
replicasserve_options['num_replicas']1int 로 강제 변환
metadata.plugin_codeget_serve_app_name()예: '[email protected]'
metadata.agentSYNAPSE_AGENT_ID 환경 변수''

진단

"Serve deploy denied" 오류를 트러블슈팅할 때는 오류 메시지의 reasons 를 확인하고 get_gatekeeping_policy 로 agent 의 gatekeeping 정책과 교차 검증하세요. "agent unreachable" 오류는 네트워크 문제와 agent-side 문제를 구분할 수 있도록 원인 예외 타입(ClientError / TimeoutError 등)이 포함됩니다.

try:
self.deploy()
except RuntimeError as exc:
if 'insufficient cluster capacity' in str(exc):
# 클러스터 포화 — 사용자에게 표시하거나 재시도
self.log('capacity_denied', {'error': str(exc)})
elif 'agent unreachable' in str(exc):
# 일시적 네트워크 / agent 이슈 — 원본 예외는 __cause__ 에 있음
self.log('agent_unreachable', {'cause': type(exc.__cause__).__name__})
raise
지역화

SDK 는 단일 영문 메시지를 발생시킵니다. 백엔드는 to_task 시리얼라이저 경로를 통해 deploy 가 디스패치될 때 이를 지역화된 ValidationError (ko / en) 로 변환합니다.

범위 외 (SYN-7005)

단계 기반 배포 경로(setup_steps 레지스트리)와 async 클라이언트(AsyncAgentClient) 용량 게이트는 후속 작업으로 미뤄집니다. 본 릴리스에서는 단순 execute()deploy() 경로만 게이팅됩니다.

추론 실행

배포 후 inference 액션을 통해 serve 엔드포인트를 호출합니다.

synapse plugin run inference --mode job --params '{"model": 34, "method": "post", "json": {"inputs": [{"data": [1, 2, 3]}]}}'

엔트리포인트는 BaseServeDeployment 서브클래스를 감지하고 infer_remote() 를 호출하며, 배포된 엔드포인트의 라우트 prefix 를 확인하여 요청을 전달합니다.


모범 사례

모델 캐싱

반복 다운로드를 피하기 위해 로드된 모델을 캐시합니다.

class CachedInferenceAction(BaseInferenceAction[InferenceParams]):
_model_cache: dict[int, Any] = {}

def load_model_cached(self, model_id: int) -> Any:
if model_id not in self._model_cache:
model_info = self.load_model(model_id)
self._model_cache[model_id] = torch.load(model_info['path'] + '/model.pt')
return self._model_cache[model_id]

배치 처리 최적화

처리량 향상을 위해 입력을 배치 단위로 처리합니다.

def execute(self) -> dict[str, Any]:
model = self.load_model_cached(self.params.model_id)

# 배치 단위 처리
batch_size = self.params.batch_size
results = []

for i in range(0, len(self.params.inputs), batch_size):
batch = self.params.inputs[i:i + batch_size]
batch_results = self._process_batch(model, batch)
results.extend(batch_results)

# 진행률 업데이트
progress = min((i + batch_size) / len(self.params.inputs), 1.0)
self.set_progress(int(progress * 100), 100, self.progress.INFERENCE)

return {'results': results}

에러 처리

모델 로딩과 추론 에러를 우아하게 처리합니다.

def execute(self) -> dict[str, Any]:
try:
model_info = self.load_model(self.params.model_id)
except ValueError as e:
self.log('model_load_error', {'error': str(e)})
return {'error': f'Failed to load model: {e}'}

try:
results = self.infer(model_info, self.params.inputs)
except Exception as e:
self.log('inference_error', {'error': str(e)})
return {'error': f'Inference failed: {e}', 'partial_results': []}

return {'results': results}

관련 문서