Ray Job 아키텍처
이 문서는 Synapse SDK에서 Ray Job의 제출(submit), 실행(execution), 상태 관리(status management), 백엔드 동기화(backend synchronization) 전반에 걸친 아키텍처를 설명합니다.
목차
- 전체 아키텍처 개요
- Executor 유형
- Job 제출 흐름
- 상태 모델과 전이
- 백엔드 상태 동기화
- Entrypoint 실행
- Progress 및 Metrics 리포팅
- 로그 스트리밍
- 에러 처리 및 복구
- Runtime Environment 구성
- 결과 전달 메커니즘
- 주요 파일 참조
전체 아키텍처 개요
SDK는 Ray 클러스터 위에서 플러그인 액션을 실행하고, 그 상태를 Synapse 백엔드와 동기화합니다. 핵심 구성요소는 다음과 같습니다:
핵심 설계 원칙:
- 이중 리포팅(Dual Reporting): Status Sync Thread(SDK 측)와 Entrypoint(Worker 측) 모두 백엔드에 상태를 보고하여 신뢰성을 확보합니다.
- Graceful Degradation: 백엔드 통신 실패는 로그 경고만 남기고 Job 실행 자체를 중단하지 않습니다.
- 비동기 제출:
submit()호출 후 즉시 반환하며, 상태 추적은 백그라운드 스레드가 담당합니다.
Executor 유형
SDK는 네 가지 Ray Executor를 제공합니다:
| Executor | 용도 | 상태 동기화 | Job 라이프사이클 |
|---|---|---|---|
RayJobExecutor | 단순 ray.remote 태스크 | 없음 (로컬) | ObjectRef 기반 |
RayActorExecutor | Persistent Actor | 없음 (로컬) | Actor lifetime |
RayJobsApiExecutor | Ray Jobs API 기반 실행 | 백엔드 동기화 | 완전한 라이프사이클 |
RayPipelineExecutor | 멀티 액션 파이프라인 | 체크포인트 기반 | 파이프라인 단계별 |
이 문서는
RayJobsApiExecutor에 집중합니다. 이 Executor만 백엔드와 상태를 동기화하며, 프로덕션 환경에서 사용되는 주요 Executor입니다.
Job 제출 흐름
submit() 메서드 상세
RayJobsApiExecutor.submit() (synapse_sdk/plugins/executors/ray/jobs_api.py:219-308)는 다음 단계를 거칩니다:
파라미터 전달 방식
액션 파라미터는 크기에 따라 두 가지 방식으로 전달됩니다:
- 32KB 미만:
SYNAPSE_ACTION_PARAMS환경변수에 JSON 문자열로 직접 전달 - 32KB 이상:
working_dir내.synapse_params.json파일로 기록 후SYNAPSE_ACTION_PARAMS_FILE환경변수로 경로 전달
submit_job() 호출 파라미터
submit_kwargs = {
'entrypoint': 'python -m synapse_sdk.plugins.entrypoint',
'runtime_env': runtime_env, # dict
'submission_id': job_id, # Synapse job_id를 Ray submission_id로 사용
'entrypoint_num_cpus': num_cpus, # optional
'entrypoint_num_gpus': num_gpus, # optional
'entrypoint_memory': memory, # optional (bytes)
}
submission_id에 Synapse의job_id를 그대로 사용하므로, Ray 측 Job ID와 백엔드 Job ID가 동일합니다.
상태 모델과 전이
JobStatus Enum
synapse_sdk/clients/backend/models.py에 정의된 상태 값:
class JobStatus(StrEnum):
PENDING = 'PENDING' # 제출됨, 실행 대기 중
RUNNING = 'RUNNING' # 실행 중
STOPPED = 'STOPPED' # 사용자/시스템에 의해 중단
SUCCEEDED = 'SUCCEEDED' # 성공적으로 완료
FAILED = 'FAILED' # 실행 중 오류 발생
상태 전이 다이어그램
누가 어떤 전이를 담당하는가?
| 전이 | 담당 컴포넌트 | 코드 위치 |
|---|---|---|
(생성) → PENDING | 백엔드 (Job 생성 시) | 백엔드 서버 측 |
PENDING → RUNNING | Status Sync Thread + Entrypoint (이중) | jobs_api.py:403-408, entrypoint.py:256-260 |
PENDING → FAILED | Status Sync Thread (Ray terminal 감지) | jobs_api.py:410-420 |
PENDING → STOPPED | Status Sync Thread (Ray terminal 감지) | jobs_api.py:421-425 |
RUNNING → SUCCEEDED | Entrypoint (dispatch 성공) | entrypoint.py:267 |
RUNNING → FAILED | Entrypoint (dispatch 예외) | entrypoint.py:284 |
Submit 실패 → FAILED | Executor (_report_submit_failure) | jobs_api.py:310-338 |
이중 RUNNING 보고: Sync Thread와 Entrypoint 모두
RUNNING으로 전환을 시도합니다. 이는 의도된 동작으로, Entrypoint가 시작되기 전에 Sync Thread가 먼저 Ray 상태를 감지하여 빠르게 백엔드에 반영합니다. Entrypoint의 RUNNING 업데이트는 이미 RUNNING인 경우 멱등하게 처리됩니다.
백엔드 상태 동기화
Status Sync Thread
_start_status_sync() (jobs_api.py:340-357)는 데몬 스레드를 생성하여 Ray Job 상태를 폴링합니다.
폴링 설정
| 설정 | 값 | 설명 |
|---|---|---|
poll_interval | 2.0초 | Ray 상태 체크 간격 |
timeout | 300초 (5분) | PENDING 상태 최대 대기 시간 |
핵심 로직 (_sync_status_until_pending_exits)
# jobs_api.py:359-428
def _sync_status_until_pending_exits(self, job_id, poll_interval, timeout=300.0):
backend_client = create_backend_client()
if not backend_client:
return # 인증 정보 없으면 skip
ray_client = self._get_client()
start_time = time.monotonic()
while True:
elapsed = time.monotonic() - start_time
if elapsed >= timeout:
break # 타임아웃
ray_status = ray_client.get_job_status(job_id)
status_str = str(ray_status)
if status_str in ('RUNNING', 'JobStatus.RUNNING'):
backend_client.update_job(job_id, {'status': JobStatus.RUNNING})
break # Entrypoint가 이후 담당
if ray_status.is_terminal():
if status_str in ('FAILED', 'JobStatus.FAILED'):
error_msg = self._get_job_error_message(ray_client, job_id)
update_data = {'status': JobStatus.FAILED}
if error_msg:
update_data['result'] = {'error': error_msg}
backend_client.update_job(job_id, update_data)
elif status_str in ('STOPPED', 'JobStatus.STOPPED'):
backend_client.update_job(job_id, {'status': JobStatus.STOPPED})
break
time.sleep(poll_interval)
백엔드 API 호출
update_job() (synapse_sdk/clients/backend/integration.py)은 다음 엔드포인트를 호출합니다:
PATCH /jobs/{job_id}/
Content-Type: application/json
{
"status": "RUNNING", // JobStatus enum 값
"progress_record": { ... }, // 진행률 데이터 (optional)
"metrics_record": { ... }, // 메트릭 데이터 (optional)
"console_logs": [ ... ], // 콘솔 로그 (optional)
"result": { ... } // 최종 결과 또는 에러 (optional)
}
UpdateJobRequest 모델의 모든 필드는 optional이므로, 필요한 필드만 포함하여 전송합니다.
Entrypoint 실행
synapse_sdk/plugins/entrypoint.py는 Ray Worker 위에서 실행되는 진입점입니다.
실행 흐름
환경변수
Entrypoint는 다음 환경변수를 읽습니다:
| 환경변수 | 필수 | 설명 |
|---|---|---|
SYNAPSE_ACTION_ENTRYPOINT | O | 액션 클래스 경로 (예: mymodule.MyAction) |
SYNAPSE_ACTION_PARAMS | △ | 액션 파라미터 JSON 문자열 |
SYNAPSE_ACTION_PARAMS_FILE | △ | 파라미터 파일 경로 (PARAMS와 택 1) |
SYNAPSE_JOB_ID | X | Job ID (없으면 RAY_JOB_ID 사용) |
SYNAPSE_ACCESS_TOKEN | X | 백엔드 인증 토큰 |
SYNAPSE_HOST | X | 백엔드 API 호스트 |
SYNAPSE_LANGUAGE | X | i18n 언어 코드 (ISO 639-1) |
Logger 선택 로직
# entrypoint.py:97-118
if client and job_id:
logger = JobLogger(client=client, job_id=job_id) # 백엔드 + 콘솔
else:
logger = ConsoleLogger() # 콘솔만
JobLogger:BackendClient와job_id가 모두 있을 때 사용. 콘솔 출력 + 백엔드 API로 진행률/메트릭 전송.ConsoleLogger: 인증 정보가 없거나 job_id가 없을 때 fallback. 콘솔에만 출력.
_finalize_job() 상세
Job 종료 시 백엔드에 최종 상태를 전송합니다 (entrypoint.py:152-202):
update_data = {
'status': JobStatus.SUCCEEDED, # 또는 FAILED
'result': { # 성공 시 결과, 실패 시 에러
'key': 'value' # 또는 {'error': 'message'}
},
'progress_record': { # logger에서 수집 (logger가 None이면 생략)
'record': { ... },
'current_progress': { ... }
}
}
client.update_job(job_id, update_data)
이 함수는 예외를 발생시키지 않습니다 (silent failure). 백엔드 통신 실패 시 경고 로그만 출력합니다.
Progress 및 Metrics 리포팅
Logger 계층 구조
Progress Record 구조
JobLogger가 백엔드에 전송하는 progress_record 포맷:
{
"record": {
"steps": {
"training": {
"proportion": 70,
"order": 0,
"percent": 45.5,
"time_remaining": 120.0
},
"evaluation": {
"proportion": 30,
"order": 1,
"percent": 0.0,
"time_remaining": null
}
}
},
"current_progress": {
"overall": 31.85,
"step": "training",
"percent": 45.5,
"time_remaining": 120.0,
"order": 0
}
}
핵심 특성:
- Monotonic Progress: 진행률은 절대 감소하지 않습니다. 이전 값보다 작은 값이 들어오면 무시됩니다.
- Step Proportions: 각 스텝에 가중치를 부여하여 전체 진행률(
overall)을 가중 합산으로 계산합니다. - Dynamic Step Allocation: 미리 정의되지 않은 스텝이 등록되면 남은 가중치에서 균등 분배합니다.
Metrics Record 구조
{
"record": {
"steps": {
"training": {
"loss": 0.05,
"accuracy": 0.98,
"learning_rate": 0.001
}
}
}
}
액션 코드에서의 사용
class MyAction(BaseAction):
def execute(self):
# 스텝 비율 설정
self.ctx.logger.set_step_proportions(
{'preprocessing': 20, 'training': 70, 'evaluation': 10}
)
# 스텝 설정 및 진행률 업데이트
self.ctx.set_progress(0, 100, step='preprocessing')
# ... 작업 수행 ...
self.ctx.set_progress(100, 100, step='preprocessing')
# 메트릭 기록
self.ctx.set_metrics({'loss': 0.05, 'accuracy': 0.98}, step='training')
# 사용자 메시지 로깅
self.ctx.log_message('학습이 완료되었습니다.', level='INFO')
로그 스트리밍
Ray Dashboard 로그 스트리밍
RayJobsApiExecutor는 두 가지 로그 스트리밍 방식을 제공합니다:
동기 방식 (stream_logs, jobs_api.py:553-597):
for line in executor.stream_logs(job_id, timeout=3600):
print(line, end='')
비동기 방식 (stream_logs_async, jobs_api.py:599-619):
async for line in executor.stream_logs_async(job_id):
print(line, end='')
Ray의
tail_job_logs()를 사용하므로 런타임 환경 셋업 로그(pip install 등)도 포함됩니다.
백엔드 SSE 로그 스트리밍
CLI에서 사용하는 실시간 로그 스트리밍 (synapse_sdk/cli/plugin/job.py):
SSE 이벤트 타입:
| 타입 | 설명 |
|---|---|
connected | SSE 연결 수립 |
log | 로그 메시지 (message 필드 포함) |
complete | 로그 스트리밍 종료 |
에러 처리 및 복구
에러 발생 지점별 처리
에러 메시지 추출
Ray Job이 Entrypoint에 도달하기 전에 실패하면, Sync Thread가 에러 로그를 추출합니다 (jobs_api.py:430-448):
@staticmethod
def _get_job_error_message(ray_client, job_id):
logs = ray_client.get_job_logs(job_id)
if logs:
return logs[-500:] if len(logs) > 500 else logs # 마지막 500자
return None
복구 패턴 요약
| 실패 유형 | 복구 전략 | Job에 미치는 영향 |
|---|---|---|
| Ray 클러스터 다운 | Sync thread 즉시 종료 | 상태 불명 (PENDING 유지 가능) |
| 백엔드 API 실패 | Warning 로그, 계속 진행 | Job 실행은 정상, 상태 미동기화 |
| 파라미터 로딩 실패 | FAILED 상태 보고 후 종료 | FAILED |
| 액션 실행 예외 | FAILED + error 메시지 보고 | FAILED |
| Submit 실패 | FAILED 상태 보고 + 예외 발생 | FAILED |
| PENDING 타임아웃 (300s) | Warning 로그, thread 종료 | 상태 불명 |
Runtime Environment 구성
_build_jobs_api_runtime_env() (jobs_api.py:115-217)가 구성하는 Runtime Environment:
패키지 매니저 옵션
| 매니저 | 키 | 기본 옵션 | 설명 |
|---|---|---|---|
| pip | pip_install_options | ['--upgrade'] | pip 표준 옵션 |
| uv | uv_pip_install_options | ['--no-cache'] | uv 빌드 캐시 비활성화 |
인증 정보 주입
load_credentials()로 현재 환경의 인증 정보를 가져와 Worker의 환경변수에 주입합니다:
creds = load_credentials()
if creds.host:
runtime_env['env_vars']['SYNAPSE_HOST'] = creds.host
if creds.token:
runtime_env['env_vars']['SYNAPSE_ACCESS_TOKEN'] = creds.token
이를 통해 Worker의 Entrypoint에서 BackendClient를 생성하여 백엔드와 직접 통신할 수 있습니다.
결과 전달 메커니즘
stdout 마커 기반 결과 전달
Entrypoint는 액션 결과를 stdout에 마커로 감싸서 출력합니다 (entrypoint.py:140-149):
(일반 로그 출력...)
__SYNAPSE_RESULT_START__
{"key": "value", "metrics": {"accuracy": 0.98}}
__SYNAPSE_RESULT_END__
결과 파싱
get_result() (jobs_api.py:621-658)는 Job 완료 후 로그에서 마커를 찾아 결과를 파싱합니다:
def get_result(self, job_id, timeout=None):
while True:
status = client.get_job_status(job_id)
if status.is_terminal():
if str(status) in ('SUCCEEDED', 'JobStatus.SUCCEEDED'):
logs = client.get_job_logs(job_id)
return self._parse_result_from_logs(logs, job_id)
else:
logs = client.get_job_logs(job_id)
raise ExecutionError(f'Job {job_id} failed. Logs:\n{logs}')
if timeout and (time.time() - start_time) >= timeout:
raise ExecutionError(f'Timed out after {timeout}s')
time.sleep(1) # 1초 간격 폴링
결과 직렬화 규칙
| 결과 타입 | 직렬화 방식 |
|---|---|
dict | 그대로 JSON 직렬화 |
BaseModel (Pydantic) | model_dump(mode='json') |
| 기타 | {'result': value} 로 래핑 |
주요 파일 참조
| 파일 | 역할 |
|---|---|
synapse_sdk/plugins/executors/ray/jobs_api.py | 핵심: RayJobsApiExecutor, 상태 동기화, 로그 스트리밍 |
synapse_sdk/plugins/entrypoint.py | 핵심: Worker 진입점, 액션 실행, 최종 상태 보고 |
synapse_sdk/plugins/executors/ray/base.py | Base Executor, 런타임 환경 구성 공통 로직 |
synapse_sdk/plugins/executors/ray/job.py | Ray task executor (단순 ray.remote) |
synapse_sdk/plugins/executors/ray/task.py | Ray actor executor |
synapse_sdk/plugins/executors/ray/pipeline.py | 멀티 액션 파이프라인 |
synapse_sdk/plugins/action.py | BaseAction, dispatch() 메서드 |
synapse_sdk/plugins/context/__init__.py | RuntimeContext 정의 |
synapse_sdk/plugins/context/env.py | PluginEnvironment |
synapse_sdk/clients/backend/models.py | JobStatus, UpdateJobRequest 모델 |
synapse_sdk/clients/backend/integration.py | Backend API 클라이언트 (update_job, tail_job_console_logs) |
synapse_sdk/cli/plugin/job.py | CLI Job 명령 (조회, 로그 스트리밍) |
synapse_sdk/loggers.py | Logger 계층 (ConsoleLogger, JobLogger 등) |
tests/plugins/executors/ray/test_jobs_api_status_sync.py | 상태 동기화 테스트 |