본문으로 건너뛰기

Pre-Annotation Actions

Pre-annotation action 은 사람 어노테이션 이전에 task 데이터를 준비합니다. 파일 기반 입력 또는 추론 출력을 사용하여 task 데이터를 추가하거나 변환하려면 ToTaskAction 을 사용하세요.

개요

ToTaskAction 은 다음을 처리합니다.

  • 필터를 통한 task 디스커버리
  • 데이터 수집 및 file specification 검증
  • 파일 기반 어노테이션 적재(ingestion)
  • pre-processor 플러그인을 통한 추론 기반 어노테이션
  • 진행률 및 메트릭 추적

ToTaskAction

서브클래싱

데이터 포맷을 task 데이터로 매핑하도록 변환 훅을 오버라이드합니다.

from synapse_sdk.plugins.actions.to_task import ToTaskAction


class ToTask(ToTaskAction):
action_name = 'to_task'

def convert_data_from_file(self, primary_file_url, primary_file_name, data_file_url, data_file_name, task_data=None):
# TODO: data_file_url 을 파싱하고 task 데이터 페이로드 반환
return {}

def convert_data_from_inference(self, inference_data, task_data=None):
# TODO: 추론 출력을 task 데이터 페이로드로 변환
return inference_data

변환 메서드

다음 메서드를 오버라이드하여 데이터를 task 데이터 포맷으로 변환합니다.

메서드설명파라미터반환값
convert_data_from_file()파일 기반 어노테이션 데이터 파싱primary_file_url, primary_file_name, data_file_url, data_file_name, task_data (선택)dict[str, Any] - task 데이터 페이로드
convert_data_from_inference()추론 출력을 task 데이터로 변환inference_data, task_data (선택)dict[str, Any] - task 데이터 페이로드

두 메서드 모두 추가 컨텍스트로 사용할 전체 task 페이로드를 담은 선택적 task_data 파라미터를 받습니다.

Config.yaml

플러그인 config.yaml 에서 액션 entrypoint 를 정의합니다.

actions:
to_task:
entrypoint: plugin.to_task.ToTask
method: job

파라미터

기반 params 모델은 파일과 추론 워크플로우를 모두 지원합니다.

  • name: 액션 job 이름 (필수)
  • description: 액션 job 의 선택적 설명
  • project: 프로젝트 ID
  • agent: Agent ID
  • task_filters: task 쿼리 필터 (선택, 기본값: {})
  • method: 어노테이션 방법 — file 또는 inference (기본값: file)
  • target_specification_name: 파일 spec 이름 (file 메서드 시 필수)
  • pre_processor: pre-processor release ID (inference 메서드 시 필수)
  • model: 모델 ID (inference 메서드 시 필수)
  • pre_processor_params: 추론용 추가 파라미터 (선택, 기본값: {})

결과

액션은 다음 필드를 가진 ToTaskResult 를 반환합니다.

  • status: job 완료 상태 (SUCCEEDED, FAILED 등)
  • message: 결과를 설명하는 요약 메시지
  • total_tasks: 처리된 전체 task 수
  • success_count: 성공적으로 어노테이션된 task 수
  • failed_count: 어노테이션이 실패한 task 수
  • failures: 실패 기록 목록. 각 항목은 다음을 포함:
    • task_id: 실패한 task 의 ID
    • error: 실패 원인을 설명하는 에러 메시지

Pre-Processor Lifecycle (INFERENCE method)

method=inference 일 때, PrepareStep 은 어노테이션이 시작되기 전에 pre-processor 의 Ray Serve 애플리케이션이 실행 중인지 확인합니다. lifecycle 은 세 단계로 구성됩니다.

  1. Detection (탐지)client.list_serve_applications(plugin_code=code, job__agent=agent) 로 기존 RUNNING 앱을 폴링합니다.
  2. Dispatch (디스패치) — 실행 중인 앱이 없으면 client.run_plugin(code, {action: 'deployment', params: {...}}) 을 호출합니다.
  3. Wait (대기) — 상태가 RUNNING 에 도달하거나, 종단(terminal) 실패가 발생하거나, timeout_seconds (기본 180s)가 경과할 때까지 10초(기본)마다 폴링합니다.

Required Resources Payload (SYN-7005 / FR-8)

디스패치 params 페이로드는 pre-processor 플러그인 릴리스의 actions.inference.required_resources 에 선언된 리소스 요구사항을 전달합니다. 이를 통해 디스패치 형태가 agent-side capacity gate (Inference Actions — Capacity Gate 참조)와 일치하게 되어, 클러스터 포화 상황이 Ray Serve 가 pending replica 를 수락한 이후가 아니라 디스패치 시점에 표면화됩니다.

페이로드 키required_resources 출처필수비고
num_cpusrequired_cpu_count (우선) 또는 num_cpus기본 1
num_gpusrequired_gpu_count (우선) 또는 num_gpus기본 0.1
num_replicasnum_replicas아니오선언되지 않으면 페이로드에서 생략
memorymemory (우선) 또는 memory_bytes (alias)아니오와이어에서 memory 로 전송. 선언되지 않으면 생략

선택 키는 조건부로 추가됩니다 — 선언되지 않으면 null 로 보내지 않고 생략하므로, extra='forbid' 인 백엔드 request 모델도 페이로드를 계속 수락합니다.

actions:
inference:
required_resources:
required_cpu_count: 2
required_gpu_count: 1
num_replicas: 2
memory: 4294967296 # 4 GiB. `memory_bytes` 도 alias 로 허용됨.

Error Handling (SYN-7005 E-Series)

PrepareStep (INFERENCE 메서드)은 디스패치와 대기 실패 시 구조화된 RuntimeError 인스턴스를 발생시킵니다. 에러 메시지는 error_type (또는 terminal status)만 노출하며, 원본 cause 는 진단을 위해 __cause__ 에 보존되지만 토큰 / URL / 스택 트레이스는 사용자 노출 문자열에 절대 끼워 넣지 않습니다.

Fail-Fast 매트릭스

단계트리거예외
Dispatch (_ensure_pre_processor_running)client.run_plugin(...)ClientError (HTTP 4xx/5xx) 발생RuntimeError('Pre-processor deployment dispatch failed ({error_type})')
Wait (_wait_for_pre_processor)Serve 앱 상태가 DEPLOY_FAILED / UNHEALTHY / UNAVAILABLE 에 도달RuntimeError('Pre-processor deployment failed: serve application reached terminal state ({status})')
Wait (_wait_for_pre_processor)RUNNING 도달 없이 timeout 경과RuntimeError('Pre-processor did not become ready within timeout')

Serve Application 상태 분류 (FR-6)

_classify_serve_app 은 백엔드의 7-값 ServeApplication.Status enum 을 3개의 행동 가능한 버킷으로 접어서 종단 실패(terminal failure) 시 폴링이 즉시 short-circuit 되도록 합니다 (이전에는 실패가 예정된 deploy 가 180초 timeout 예산을 모두 소진했습니다).

버킷상태폴링 동작
runningRUNNING즉시 반환 (성공)
failedDEPLOY_FAILED, UNHEALTHY, UNAVAILABLE즉시 RuntimeError 발생 (fail-fast)
pendingNOT_STARTED, DEPLOYING, DELETING, 빈 리스트, 형식 오류 페이로드폴링 계속
RUNNING 우선순위

어느 replica 라도 RUNNING 을 보고하면, 다른 replica 들이 실패한 상태여도 분류기는 ('running', None) 을 반환합니다. 이는 multi-replica 배포 도중 eventual-consistency 롤아웃이 잘못 분류되는 것을 방지합니다.

Pre-Processor 대기 플로우

에러 처리 예제

try:
# ToTaskAction.execute() 를 통해 간접 트리거됨
result = action.execute()
except RuntimeError as exc:
msg = str(exc)
if 'dispatch failed' in msg:
# agent 의 HTTP 4xx/5xx — 원본 ClientError 는 exc.__cause__ 에서 확인
cause = exc.__cause__
log.error('dispatch failed', error_type=type(cause).__name__, status=getattr(cause, 'status_code', None))
elif 'reached terminal state' in msg:
# DEPLOY_FAILED / UNHEALTHY / UNAVAILABLE — agent 의 serve 앱 로그 확인
log.error('pre-processor terminal failure', message=msg)
elif 'did not become ready within timeout' in msg:
# 180초 이후에도 deploy 가 pending — Ray 클러스터 용량 확인
log.error('pre-processor wait timeout')
raise