본문으로 건너뛰기

Ray Job 아키텍처

이 문서는 Synapse SDK에서 Ray Job의 제출(submit), 실행(execution), 상태 관리(status management), 백엔드 동기화(backend synchronization) 전반에 걸친 아키텍처를 설명합니다.


목차

  1. 전체 아키텍처 개요
  2. Executor 유형
  3. Job 제출 흐름
  4. 상태 모델과 전이
  5. 백엔드 상태 동기화
  6. Entrypoint 실행
  7. Progress 및 Metrics 리포팅
  8. 로그 스트리밍
  9. 에러 처리 및 복구
  10. Runtime Environment 구성
  11. 결과 전달 메커니즘
  12. 주요 파일 참조

전체 아키텍처 개요

SDK는 Ray 클러스터 위에서 플러그인 액션을 실행하고, 그 상태를 Synapse 백엔드와 동기화합니다. 핵심 구성요소는 다음과 같습니다:

핵심 설계 원칙:

  • 이중 리포팅(Dual Reporting): Status Sync Thread(SDK 측)와 Entrypoint(Worker 측) 모두 백엔드에 상태를 보고하여 신뢰성을 확보합니다.
  • Graceful Degradation: 백엔드 통신 실패는 로그 경고만 남기고 Job 실행 자체를 중단하지 않습니다.
  • 비동기 제출: submit() 호출 후 즉시 반환하며, 상태 추적은 백그라운드 스레드가 담당합니다.

Executor 유형

SDK는 네 가지 Ray Executor를 제공합니다:

Executor용도상태 동기화Job 라이프사이클
RayJobExecutor단순 ray.remote 태스크없음 (로컬)ObjectRef 기반
RayActorExecutorPersistent Actor없음 (로컬)Actor lifetime
RayJobsApiExecutorRay Jobs API 기반 실행백엔드 동기화완전한 라이프사이클
RayPipelineExecutor멀티 액션 파이프라인체크포인트 기반파이프라인 단계별

이 문서는 RayJobsApiExecutor 에 집중합니다. 이 Executor만 백엔드와 상태를 동기화하며, 프로덕션 환경에서 사용되는 주요 Executor입니다.


Job 제출 흐름

submit() 메서드 상세

RayJobsApiExecutor.submit() (synapse_sdk/plugins/executors/ray/jobs_api.py:219-308)는 다음 단계를 거칩니다:

파라미터 전달 방식

액션 파라미터는 크기에 따라 두 가지 방식으로 전달됩니다:

  • 32KB 미만: SYNAPSE_ACTION_PARAMS 환경변수에 JSON 문자열로 직접 전달
  • 32KB 이상: working_dir.synapse_params.json 파일로 기록 후 SYNAPSE_ACTION_PARAMS_FILE 환경변수로 경로 전달

submit_job() 호출 파라미터

submit_kwargs = {
'entrypoint': 'python -m synapse_sdk.plugins.entrypoint',
'runtime_env': runtime_env, # dict
'submission_id': job_id, # Synapse job_id를 Ray submission_id로 사용
'entrypoint_num_cpus': num_cpus, # optional
'entrypoint_num_gpus': num_gpus, # optional
'entrypoint_memory': memory, # optional (bytes)
}

submission_id에 Synapse의 job_id를 그대로 사용하므로, Ray 측 Job ID와 백엔드 Job ID가 동일합니다.


상태 모델과 전이

JobStatus Enum

synapse_sdk/clients/backend/models.py에 정의된 상태 값:

class JobStatus(StrEnum):
PENDING = 'PENDING' # 제출됨, 실행 대기 중
RUNNING = 'RUNNING' # 실행 중
STOPPED = 'STOPPED' # 사용자/시스템에 의해 중단
SUCCEEDED = 'SUCCEEDED' # 성공적으로 완료
FAILED = 'FAILED' # 실행 중 오류 발생

상태 전이 다이어그램

누가 어떤 전이를 담당하는가?

전이담당 컴포넌트코드 위치
(생성) → PENDING백엔드 (Job 생성 시)백엔드 서버 측
PENDINGRUNNINGStatus Sync Thread + Entrypoint (이중)jobs_api.py:403-408, entrypoint.py:256-260
PENDINGFAILEDStatus Sync Thread (Ray terminal 감지)jobs_api.py:410-420
PENDINGSTOPPEDStatus Sync Thread (Ray terminal 감지)jobs_api.py:421-425
RUNNINGSUCCEEDEDEntrypoint (dispatch 성공)entrypoint.py:267
RUNNINGFAILEDEntrypoint (dispatch 예외)entrypoint.py:284
Submit 실패 → FAILEDExecutor (_report_submit_failure)jobs_api.py:310-338

이중 RUNNING 보고: Sync Thread와 Entrypoint 모두 RUNNING으로 전환을 시도합니다. 이는 의도된 동작으로, Entrypoint가 시작되기 전에 Sync Thread가 먼저 Ray 상태를 감지하여 빠르게 백엔드에 반영합니다. Entrypoint의 RUNNING 업데이트는 이미 RUNNING인 경우 멱등하게 처리됩니다.


백엔드 상태 동기화

Status Sync Thread

_start_status_sync() (jobs_api.py:340-357)는 데몬 스레드를 생성하여 Ray Job 상태를 폴링합니다.

폴링 설정

설정설명
poll_interval2.0초Ray 상태 체크 간격
timeout300초 (5분)PENDING 상태 최대 대기 시간

핵심 로직 (_sync_status_until_pending_exits)

# jobs_api.py:359-428
def _sync_status_until_pending_exits(self, job_id, poll_interval, timeout=300.0):
backend_client = create_backend_client()
if not backend_client:
return # 인증 정보 없으면 skip

ray_client = self._get_client()
start_time = time.monotonic()

while True:
elapsed = time.monotonic() - start_time
if elapsed >= timeout:
break # 타임아웃

ray_status = ray_client.get_job_status(job_id)
status_str = str(ray_status)

if status_str in ('RUNNING', 'JobStatus.RUNNING'):
backend_client.update_job(job_id, {'status': JobStatus.RUNNING})
break # Entrypoint가 이후 담당

if ray_status.is_terminal():
if status_str in ('FAILED', 'JobStatus.FAILED'):
error_msg = self._get_job_error_message(ray_client, job_id)
update_data = {'status': JobStatus.FAILED}
if error_msg:
update_data['result'] = {'error': error_msg}
backend_client.update_job(job_id, update_data)
elif status_str in ('STOPPED', 'JobStatus.STOPPED'):
backend_client.update_job(job_id, {'status': JobStatus.STOPPED})
break

time.sleep(poll_interval)

백엔드 API 호출

update_job() (synapse_sdk/clients/backend/integration.py)은 다음 엔드포인트를 호출합니다:

PATCH /jobs/{job_id}/
Content-Type: application/json

{
"status": "RUNNING", // JobStatus enum 값
"progress_record": { ... }, // 진행률 데이터 (optional)
"metrics_record": { ... }, // 메트릭 데이터 (optional)
"console_logs": [ ... ], // 콘솔 로그 (optional)
"result": { ... } // 최종 결과 또는 에러 (optional)
}

UpdateJobRequest 모델의 모든 필드는 optional이므로, 필요한 필드만 포함하여 전송합니다.


Entrypoint 실행

synapse_sdk/plugins/entrypoint.py는 Ray Worker 위에서 실행되는 진입점입니다.

실행 흐름

환경변수

Entrypoint는 다음 환경변수를 읽습니다:

환경변수필수설명
SYNAPSE_ACTION_ENTRYPOINTO액션 클래스 경로 (예: mymodule.MyAction)
SYNAPSE_ACTION_PARAMS액션 파라미터 JSON 문자열
SYNAPSE_ACTION_PARAMS_FILE파라미터 파일 경로 (PARAMS와 택 1)
SYNAPSE_JOB_IDXJob ID (없으면 RAY_JOB_ID 사용)
SYNAPSE_ACCESS_TOKENX백엔드 인증 토큰
SYNAPSE_HOSTX백엔드 API 호스트
SYNAPSE_LANGUAGEXi18n 언어 코드 (ISO 639-1)

Logger 선택 로직

# entrypoint.py:97-118
if client and job_id:
logger = JobLogger(client=client, job_id=job_id) # 백엔드 + 콘솔
else:
logger = ConsoleLogger() # 콘솔만
  • JobLogger: BackendClientjob_id가 모두 있을 때 사용. 콘솔 출력 + 백엔드 API로 진행률/메트릭 전송.
  • ConsoleLogger: 인증 정보가 없거나 job_id가 없을 때 fallback. 콘솔에만 출력.

_finalize_job() 상세

Job 종료 시 백엔드에 최종 상태를 전송합니다 (entrypoint.py:152-202):

update_data = {
'status': JobStatus.SUCCEEDED, # 또는 FAILED
'result': { # 성공 시 결과, 실패 시 에러
'key': 'value' # 또는 {'error': 'message'}
},
'progress_record': { # logger에서 수집 (logger가 None이면 생략)
'record': { ... },
'current_progress': { ... }
}
}
client.update_job(job_id, update_data)

이 함수는 예외를 발생시키지 않습니다 (silent failure). 백엔드 통신 실패 시 경고 로그만 출력합니다.


Progress 및 Metrics 리포팅

Logger 계층 구조

Progress Record 구조

JobLogger가 백엔드에 전송하는 progress_record 포맷:

{
"record": {
"steps": {
"training": {
"proportion": 70,
"order": 0,
"percent": 45.5,
"time_remaining": 120.0
},
"evaluation": {
"proportion": 30,
"order": 1,
"percent": 0.0,
"time_remaining": null
}
}
},
"current_progress": {
"overall": 31.85,
"step": "training",
"percent": 45.5,
"time_remaining": 120.0,
"order": 0
}
}

핵심 특성:

  • Monotonic Progress: 진행률은 절대 감소하지 않습니다. 이전 값보다 작은 값이 들어오면 무시됩니다.
  • Step Proportions: 각 스텝에 가중치를 부여하여 전체 진행률(overall)을 가중 합산으로 계산합니다.
  • Dynamic Step Allocation: 미리 정의되지 않은 스텝이 등록되면 남은 가중치에서 균등 분배합니다.

Metrics Record 구조

{
"record": {
"steps": {
"training": {
"loss": 0.05,
"accuracy": 0.98,
"learning_rate": 0.001
}
}
}
}

액션 코드에서의 사용

class MyAction(BaseAction):
def execute(self):
# 스텝 비율 설정
self.ctx.logger.set_step_proportions(
{'preprocessing': 20, 'training': 70, 'evaluation': 10}
)

# 스텝 설정 및 진행률 업데이트
self.ctx.set_progress(0, 100, step='preprocessing')
# ... 작업 수행 ...
self.ctx.set_progress(100, 100, step='preprocessing')

# 메트릭 기록
self.ctx.set_metrics({'loss': 0.05, 'accuracy': 0.98}, step='training')

# 사용자 메시지 로깅
self.ctx.log_message('학습이 완료되었습니다.', level='INFO')

로그 스트리밍

Ray Dashboard 로그 스트리밍

RayJobsApiExecutor는 두 가지 로그 스트리밍 방식을 제공합니다:

동기 방식 (stream_logs, jobs_api.py:553-597):

for line in executor.stream_logs(job_id, timeout=3600):
print(line, end='')

비동기 방식 (stream_logs_async, jobs_api.py:599-619):

async for line in executor.stream_logs_async(job_id):
print(line, end='')

Ray의 tail_job_logs()를 사용하므로 런타임 환경 셋업 로그(pip install 등)도 포함됩니다.

백엔드 SSE 로그 스트리밍

CLI에서 사용하는 실시간 로그 스트리밍 (synapse_sdk/cli/plugin/job.py):

SSE 이벤트 타입:

타입설명
connectedSSE 연결 수립
log로그 메시지 (message 필드 포함)
complete로그 스트리밍 종료

에러 처리 및 복구

에러 발생 지점별 처리

에러 메시지 추출

Ray Job이 Entrypoint에 도달하기 전에 실패하면, Sync Thread가 에러 로그를 추출합니다 (jobs_api.py:430-448):

@staticmethod
def _get_job_error_message(ray_client, job_id):
logs = ray_client.get_job_logs(job_id)
if logs:
return logs[-500:] if len(logs) > 500 else logs # 마지막 500자
return None

복구 패턴 요약

실패 유형복구 전략Job에 미치는 영향
Ray 클러스터 다운Sync thread 즉시 종료상태 불명 (PENDING 유지 가능)
백엔드 API 실패Warning 로그, 계속 진행Job 실행은 정상, 상태 미동기화
파라미터 로딩 실패FAILED 상태 보고 후 종료FAILED
액션 실행 예외FAILED + error 메시지 보고FAILED
Submit 실패FAILED 상태 보고 + 예외 발생FAILED
PENDING 타임아웃 (300s)Warning 로그, thread 종료상태 불명

Runtime Environment 구성

_build_jobs_api_runtime_env() (jobs_api.py:115-217)가 구성하는 Runtime Environment:

패키지 매니저 옵션

매니저기본 옵션설명
pippip_install_options['--upgrade']pip 표준 옵션
uvuv_pip_install_options['--no-cache']uv 빌드 캐시 비활성화

인증 정보 주입

load_credentials()로 현재 환경의 인증 정보를 가져와 Worker의 환경변수에 주입합니다:

creds = load_credentials()
if creds.host:
runtime_env['env_vars']['SYNAPSE_HOST'] = creds.host
if creds.token:
runtime_env['env_vars']['SYNAPSE_ACCESS_TOKEN'] = creds.token

이를 통해 Worker의 Entrypoint에서 BackendClient를 생성하여 백엔드와 직접 통신할 수 있습니다.


결과 전달 메커니즘

stdout 마커 기반 결과 전달

Entrypoint는 액션 결과를 stdout에 마커로 감싸서 출력합니다 (entrypoint.py:140-149):

(일반 로그 출력...)
__SYNAPSE_RESULT_START__
{"key": "value", "metrics": {"accuracy": 0.98}}
__SYNAPSE_RESULT_END__

결과 파싱

get_result() (jobs_api.py:621-658)는 Job 완료 후 로그에서 마커를 찾아 결과를 파싱합니다:

def get_result(self, job_id, timeout=None):
while True:
status = client.get_job_status(job_id)

if status.is_terminal():
if str(status) in ('SUCCEEDED', 'JobStatus.SUCCEEDED'):
logs = client.get_job_logs(job_id)
return self._parse_result_from_logs(logs, job_id)
else:
logs = client.get_job_logs(job_id)
raise ExecutionError(f'Job {job_id} failed. Logs:\n{logs}')

if timeout and (time.time() - start_time) >= timeout:
raise ExecutionError(f'Timed out after {timeout}s')

time.sleep(1) # 1초 간격 폴링

결과 직렬화 규칙

결과 타입직렬화 방식
dict그대로 JSON 직렬화
BaseModel (Pydantic)model_dump(mode='json')
기타{'result': value} 로 래핑

주요 파일 참조

파일역할
synapse_sdk/plugins/executors/ray/jobs_api.py핵심: RayJobsApiExecutor, 상태 동기화, 로그 스트리밍
synapse_sdk/plugins/entrypoint.py핵심: Worker 진입점, 액션 실행, 최종 상태 보고
synapse_sdk/plugins/executors/ray/base.pyBase Executor, 런타임 환경 구성 공통 로직
synapse_sdk/plugins/executors/ray/job.pyRay task executor (단순 ray.remote)
synapse_sdk/plugins/executors/ray/task.pyRay actor executor
synapse_sdk/plugins/executors/ray/pipeline.py멀티 액션 파이프라인
synapse_sdk/plugins/action.pyBaseAction, dispatch() 메서드
synapse_sdk/plugins/context/__init__.pyRuntimeContext 정의
synapse_sdk/plugins/context/env.pyPluginEnvironment
synapse_sdk/clients/backend/models.pyJobStatus, UpdateJobRequest 모델
synapse_sdk/clients/backend/integration.pyBackend API 클라이언트 (update_job, tail_job_console_logs)
synapse_sdk/cli/plugin/job.pyCLI Job 명령 (조회, 로그 스트리밍)
synapse_sdk/loggers.pyLogger 계층 (ConsoleLogger, JobLogger 등)
tests/plugins/executors/ray/test_jobs_api_status_sync.py상태 동기화 테스트