Pre-Annotation Actions
Pre-annotation action 은 사람 어노테이션 이전에 task 데이터를 준비합니다. 파일 기반 입력 또는 추론 출력을 사용하여 task 데이터를 추가하거나 변환하려면 ToTaskAction 을 사용하세요.
개요
ToTaskAction 은 다음을 처리합니다.
- 필터를 통한 task 디스커버리
- 데이터 수집 및 file specification 검증
- 파일 기반 어노테이션 적재(ingestion)
- pre-processor 플러그인을 통한 추론 기반 어노테이션
- 진행률 및 메트릭 추적
ToTaskAction
서브클래싱
데이터 포맷을 task 데이터로 매핑하도록 변환 훅을 오버라이드합니다.
from synapse_sdk.plugins.actions.to_task import ToTaskAction
class ToTask(ToTaskAction):
action_name = 'to_task'
def convert_data_from_file(self, primary_file_url, primary_file_name, data_file_url, data_file_name, task_data=None):
# TODO: data_file_url 을 파싱하고 task 데이터 페이로드 반환
return {}
def convert_data_from_inference(self, inference_data, task_data=None):
# TODO: 추론 출력을 task 데이터 페이로드로 변환
return inference_data
변환 메서드
다음 메서드를 오버라이드하여 데이터를 task 데이터 포맷으로 변환합니다.
| 메서드 | 설명 | 파라미터 | 반환값 |
|---|---|---|---|
convert_data_from_file() | 파일 기반 어노테이션 데이터 파싱 | primary_file_url, primary_file_name, data_file_url, data_file_name, task_data (선택) | dict[str, Any] - task 데이터 페이로드 |
convert_data_from_inference() | 추론 출력을 task 데이터로 변환 | inference_data, task_data (선택) | dict[str, Any] - task 데이터 페이로드 |
두 메서드 모두 추가 컨텍스트로 사용할 전체 task 페이로드를 담은 선택적 task_data 파라미터를 받습니다.
Config.yaml
플러그인 config.yaml 에서 액션 entrypoint 를 정의합니다.
actions:
to_task:
entrypoint: plugin.to_task.ToTask
method: job
파라미터
기반 params 모델은 파일과 추론 워크플로우를 모두 지원합니다.
name: 액션 job 이름 (필수)description: 액션 job 의 선택적 설명project: 프로젝트 IDagent: Agent IDtask_filters: task 쿼리 필터 (선택, 기본값:{})method: 어노테이션 방법 —file또는inference(기본값:file)target_specification_name: 파일 spec 이름 (file 메서드 시 필수)pre_processor: pre-processor release ID (inference 메서드 시 필수)model: 모델 ID (inference 메서드 시 필수)pre_processor_params: 추론용 추가 파라미터 (선택, 기본값:{})
결과
액션은 다음 필드를 가진 ToTaskResult 를 반환합니다.
status: job 완료 상태 (SUCCEEDED,FAILED등)message: 결과를 설명하는 요약 메시지total_tasks: 처리된 전체 task 수success_count: 성공적으로 어노테이션된 task 수failed_count: 어노테이션이 실패한 task 수failures: 실패 기록 목록. 각 항목은 다음을 포함:task_id: 실패한 task 의 IDerror: 실패 원인을 설명하는 에러 메시지
Pre-Processor Lifecycle (INFERENCE method)
method=inference 일 때, PrepareStep 은 어노테이션이 시작되기 전에 pre-processor 의 Ray Serve 애플리케이션이 실행 중인지 확인합니다. lifecycle 은 세 단계로 구성됩니다.
- Detection (탐지) —
client.list_serve_applications(plugin_code=code, job__agent=agent)로 기존RUNNING앱을 폴링합니다. - Dispatch (디스패치) — 실행 중인 앱이 없으면
client.run_plugin(code, {action: 'deployment', params: {...}})을 호출합니다. - Wait (대기) — 상태가
RUNNING에 도달하거나, 종단(terminal) 실패가 발생하거나,timeout_seconds(기본 180s)가 경과할 때까지 10초(기본)마다 폴링합니다.
Required Resources Payload (SYN-7005 / FR-8)
디스패치 params 페이로드는 pre-processor 플러그인 릴리스의 actions.inference.required_resources 에 선언된 리소스 요구사항을 전달합니다. 이를 통해 디스패치 형태가 agent-side capacity gate (Inference Actions — Capacity Gate 참조)와 일치하게 되어, 클러스터 포화 상황이 Ray Serve 가 pending replica 를 수락한 이후가 아니라 디스패치 시점에 표면화됩니다.
| 페이로드 키 | required_resources 출처 | 필수 | 비고 |
|---|---|---|---|
num_cpus | required_cpu_count (우선) 또는 num_cpus | 예 | 기본 1 |
num_gpus | required_gpu_count (우선) 또는 num_gpus | 예 | 기본 0.1 |
num_replicas | num_replicas | 아니오 | 선언되지 않으면 페이로드에서 생략 |
memory | memory (우선) 또는 memory_bytes (alias) | 아니오 | 와이어에서 memory 로 전송. 선언되지 않으면 생략 |
선택 키는 조건부로 추가됩니다 — 선언되지 않으면 null 로 보내지 않고 생략하므로, extra='forbid' 인 백엔드 request 모델도 페이로드를 계속 수락합니다.
actions:
inference:
required_resources:
required_cpu_count: 2
required_gpu_count: 1
num_replicas: 2
memory: 4294967296 # 4 GiB. `memory_bytes` 도 alias 로 허용됨.
Error Handling (SYN-7005 E-Series)
PrepareStep (INFERENCE 메서드)은 디스패치와 대기 실패 시 구조화된 RuntimeError 인스턴스를 발생시킵니다. 에러 메시지는 error_type (또는 terminal status)만 노출하며, 원본 cause 는 진단을 위해 __cause__ 에 보존되지만 토큰 / URL / 스택 트레이스는 사용자 노출 문자열에 절대 끼워 넣지 않습니다.
Fail-Fast 매트릭스
| 단계 | 트리거 | 예외 |
|---|---|---|
Dispatch (_ensure_pre_processor_running) | client.run_plugin(...) 이 ClientError (HTTP 4xx/5xx) 발생 | RuntimeError('Pre-processor deployment dispatch failed ({error_type})') |
Wait (_wait_for_pre_processor) | Serve 앱 상태가 DEPLOY_FAILED / UNHEALTHY / UNAVAILABLE 에 도달 | RuntimeError('Pre-processor deployment failed: serve application reached terminal state ({status})') |
Wait (_wait_for_pre_processor) | RUNNING 도달 없이 timeout 경과 | RuntimeError('Pre-processor did not become ready within timeout') |
Serve Application 상태 분류 (FR-6)
_classify_serve_app 은 백엔드의 7-값 ServeApplication.Status enum 을 3개의 행동 가능한 버킷으로 접어서 종단 실패(terminal failure) 시 폴링이 즉시 short-circuit 되도록 합니다 (이전에는 실패가 예정된 deploy 가 180초 timeout 예산을 모두 소진했습니다).
| 버킷 | 상태 | 폴링 동작 |
|---|---|---|
running | RUNNING | 즉시 반환 (성공) |
failed | DEPLOY_FAILED, UNHEALTHY, UNAVAILABLE | 즉시 RuntimeError 발생 (fail-fast) |
pending | NOT_STARTED, DEPLOYING, DELETING, 빈 리스트, 형식 오류 페이로드 | 폴링 계속 |
어느 replica 라도 RUNNING 을 보고하면, 다른 replica 들이 실패한 상태여도 분류기는 ('running', None) 을 반환합니다. 이는 multi-replica 배포 도중 eventual-consistency 롤아웃이 잘못 분류되는 것을 방지합니다.
Pre-Processor 대기 플로우
에러 처리 예제
try:
# ToTaskAction.execute() 를 통해 간접 트리거됨
result = action.execute()
except RuntimeError as exc:
msg = str(exc)
if 'dispatch failed' in msg:
# agent 의 HTTP 4xx/5xx — 원본 ClientError 는 exc.__cause__ 에서 확인
cause = exc.__cause__
log.error('dispatch failed', error_type=type(cause).__name__, status=getattr(cause, 'status_code', None))
elif 'reached terminal state' in msg:
# DEPLOY_FAILED / UNHEALTHY / UNAVAILABLE — agent 의 serve 앱 로그 확인
log.error('pre-processor terminal failure', message=msg)
elif 'did not become ready within timeout' in msg:
# 180초 이후에도 deploy 가 pending — Ray 클러스터 용량 확인
log.error('pre-processor wait timeout')
raise