본문으로 건너뛰기

AgentClient

agent 전용 동작, job 관리, 분산 실행을 위한 클라이언트입니다.

개요

AgentClient 는 agent 동작에 접근할 수 있게 해주며, 플러그인 실행, Ray job 관리, 실시간 로그 스트리밍을 포함합니다. 동기(AgentClient)와 비동기(AsyncAgentClient) 두 버전이 모두 제공됩니다.

설치

pip install synapse-sdk

WebSocket 스트리밍 지원:

pip install synapse-sdk websocket-client  # Sync 클라이언트
pip install synapse-sdk websockets # Async 클라이언트

AgentClient (Sync)

생성자

AgentClient(
base_url: str,
agent_token: str,
*,
user_token: str = None,
tenant: str = None,
timeout: dict = None
)

파라미터

파라미터타입필수기본값설명
base_urlstr-Agent 서버 URL (예: "https://agent.example.com")
agent_tokenstr-Agent 인증 토큰
user_tokenstr아니오None사용자 범위 작업을 위한 사용자 인증 토큰
tenantstr아니오None멀티 테넌트 배포를 위한 테넌트 식별자
timeoutdict아니오None연결 및 읽기 timeout 구성

사용 예

from synapse_sdk.clients.agent import AgentClient

client = AgentClient(
base_url="https://agent.example.com",
agent_token="your-agent-token"
)

# 헬스 체크
status = client.health_check()
print(f"Agent status: {status}")

# Ray jobs 목록
jobs = client.list_jobs()

# job 로그 스트리밍
for line in client.tail_job_logs('raysubmit_abc123'):
print(line)

AsyncAgentClient (Async)

생성자

AsyncAgentClient(
base_url: str,
agent_token: str,
*,
user_token: str = None,
tenant: str = None,
timeout: dict = None
)

Context Manager 사용

from synapse_sdk.clients.agent import AsyncAgentClient

async with AsyncAgentClient(
base_url="https://agent.example.com",
agent_token="your-agent-token"
) as client:
# 헬스 체크
status = await client.health_check()

# Ray jobs 목록
jobs = await client.list_jobs()

# 비동기 job 로그 스트리밍
async for line in client.tail_job_logs('raysubmit_abc123'):
print(line)

Context Manager 미사용

client = AsyncAgentClient(base_url, agent_token)
try:
jobs = await client.list_jobs()
finally:
await client.close()

로그 스트리밍

두 클라이언트 모두 WebSocket 또는 HTTP 프로토콜을 통한 실시간 로그 스트리밍을 지원합니다.

통합 메서드

# Sync
for line in client.tail_job_logs('job-id', protocol='auto'):
print(line)

# Async
async for line in client.tail_job_logs('job-id', protocol='auto'):
print(line)

프로토콜 옵션

  • 'auto' (기본): WebSocket 을 먼저 시도하고 연결 실패 시 HTTP 로 폴백
  • 'websocket': WebSocket 전용 (최저 지연 시간)
  • 'http': HTTP chunked 스트리밍 전용 (호환성 우수)

스트림 한도

스트리밍 작업의 리소스 한도를 구성합니다.

from synapse_sdk.utils.network import StreamLimits

client.stream_limits = StreamLimits(
max_messages=10_000, # 최대 WebSocket 메시지
max_lines=50_000, # 최대 HTTP 줄
max_bytes=50*1024*1024, # 50MB 총량
max_message_size=10_240 # 메시지당 10KB
)

자세한 스트리밍 메서드 문서는 RayClient 를 참고하세요.


Ray 작업

AgentClient 는 mixin 을 통해 모든 Ray 클러스터 관리 메서드를 포함합니다.

Job 작업

# 모든 job 목록
jobs = client.list_jobs()

# job 상세 조회
job = client.get_job('raysubmit_abc123')

# job 로그 조회 (스트리밍 아님)
logs = client.get_job_logs('raysubmit_abc123')

# 실행 중 job 중단
result = client.stop_job('raysubmit_abc123')

노드 작업

# 클러스터 노드 목록
nodes = client.list_nodes()

# 노드 상세 조회
node = client.get_node('node-abc123')

Task 작업

# 모든 task 목록
tasks = client.list_tasks()

# task 상세 조회
task = client.get_task('task-xyz789')

Ray Serve 작업

# serve 애플리케이션 목록
apps = client.list_serve_applications()

# 애플리케이션 상세 조회
app = client.get_serve_application('my-app')

# 애플리케이션 삭제
client.delete_serve_application('my-app')

Resource Gatekeeping (SYN-7005)

AgentClient 는 클러스터 리소스 feasibility(가용성) 검사를 위해 ResourceClientMixin 을 mixin 합니다. 이 메서드들은 SDK 의 serve deploy capacity gate (Inference Actions — Capacity Gate 참조)에서 사용됩니다.

check_feasibility

agent 클러스터에 대한 dry-run gatekeeping 평가. {allowed: bool, reasons: list[str], ...} 를 반환합니다.

def check_feasibility(
kind: Literal['job', 'serve', 'container', 'task'],
num_cpus: float,
num_gpus: float = 0.0,
memory_bytes: int | None = None,
replicas: int = 1,
metadata: dict[str, Any] | None = None,
) -> dict
파라미터타입필수기본값설명
kindLiteral['job', 'serve', 'container', 'task']평가할 리소스 종류
num_cpusfloat요청 CPU 수
num_gpusfloat아니오0.0요청 GPU 수
memory_bytesint | None아니오None요청 메모리(바이트)
replicasint아니오1replica 수
metadatadict | None아니오None호출자 메타데이터 (예: {'plugin_code': ..., 'agent': ...})

Endpoint: POST /resources/feasibility/

result = client.check_feasibility(
kind='serve',
num_cpus=2.0,
num_gpus=1.0,
memory_bytes=4 * 1024**3, # 4 GiB
replicas=2,
metadata={'plugin_code': '[email protected]', 'agent': '42'},
)

if not result['allowed']:
print('denied:', result.get('reasons', []))

get_cluster_resources

def get_cluster_resources(use_cache: bool = True) -> dict

agent 의 현재 클러스터 리소스 스냅샷을 반환합니다. use_cache=False 는 라이브 재집계를 강제합니다.

get_gatekeeping_policy / update_gatekeeping_policy

def get_gatekeeping_policy() -> dict
def update_gatekeeping_policy(patch: dict[str, Any]) -> dict

agent 의 in-memory gatekeeping 정책을 읽거나 원자적으로 patch 합니다. 정책 변경은 클러스터 전역에 적용되므로 신중히 사용하세요.


에러 처리

from synapse_sdk.clients.exceptions import ClientError

try:
for line in client.tail_job_logs('invalid-job'):
print(line)
except ClientError as e:
if e.status_code == 400:
print("잘못된 job ID 또는 파라미터")
elif e.status_code == 404:
print("Job 을 찾을 수 없음")
elif e.status_code == 503:
print("Agent 연결 실패")
else:
print(f"Error: {e}")

공통 에러 코드

코드의미
400잘못된 파라미터 (job ID, timeout, protocol)
404리소스를 찾을 수 없음
408연결 또는 읽기 timeout
429스트림 한도 초과
500내부 에러 또는 라이브러리 사용 불가
503Agent 연결 실패

관련 문서