AgentClient
agent 전용 동작, job 관리, 분산 실행을 위한 클라이언트입니다.
개요
AgentClient 는 agent 동작에 접근할 수 있게 해주며, 플러그인 실행, Ray job 관리, 실시간 로그 스트리밍을 포함합니다. 동기(AgentClient)와 비동기(AsyncAgentClient) 두 버전이 모두 제공됩니다.
설치
pip install synapse-sdk
WebSocket 스트리밍 지원:
pip install synapse-sdk websocket-client # Sync 클라이언트
pip install synapse-sdk websockets # Async 클라이언트
AgentClient (Sync)
생성자
AgentClient(
base_url: str,
agent_token: str,
*,
user_token: str = None,
tenant: str = None,
timeout: dict = None
)
파라미터
| 파라미터 | 타입 | 필수 | 기본값 | 설명 |
|---|---|---|---|---|
base_url | str | 예 | - | Agent 서버 URL (예: "https://agent.example.com") |
agent_token | str | 예 | - | Agent 인증 토큰 |
user_token | str | 아니오 | None | 사용자 범위 작업을 위한 사용자 인증 토큰 |
tenant | str | 아니오 | None | 멀티 테넌트 배포를 위한 테넌트 식별자 |
timeout | dict | 아니오 | None | 연결 및 읽기 timeout 구성 |
사용 예
from synapse_sdk.clients.agent import AgentClient
client = AgentClient(
base_url="https://agent.example.com",
agent_token="your-agent-token"
)
# 헬스 체크
status = client.health_check()
print(f"Agent status: {status}")
# Ray jobs 목록
jobs = client.list_jobs()
# job 로그 스트리밍
for line in client.tail_job_logs('raysubmit_abc123'):
print(line)
AsyncAgentClient (Async)
생성자
AsyncAgentClient(
base_url: str,
agent_token: str,
*,
user_token: str = None,
tenant: str = None,
timeout: dict = None
)
Context Manager 사용
from synapse_sdk.clients.agent import AsyncAgentClient
async with AsyncAgentClient(
base_url="https://agent.example.com",
agent_token="your-agent-token"
) as client:
# 헬스 체크
status = await client.health_check()
# Ray jobs 목록
jobs = await client.list_jobs()
# 비동기 job 로그 스트리밍
async for line in client.tail_job_logs('raysubmit_abc123'):
print(line)
Context Manager 미사용
client = AsyncAgentClient(base_url, agent_token)
try:
jobs = await client.list_jobs()
finally:
await client.close()
로그 스트리밍
두 클라이언트 모두 WebSocket 또는 HTTP 프로토콜을 통한 실시간 로그 스트리밍을 지원합니다.
통합 메서드
# Sync
for line in client.tail_job_logs('job-id', protocol='auto'):
print(line)
# Async
async for line in client.tail_job_logs('job-id', protocol='auto'):
print(line)
프로토콜 옵션
'auto'(기본): WebSocket 을 먼저 시도하고 연결 실패 시 HTTP 로 폴백'websocket': WebSocket 전용 (최저 지연 시간)'http': HTTP chunked 스트리밍 전용 (호환성 우수)
스트림 한도
스트리밍 작업의 리소스 한도를 구성합니다.
from synapse_sdk.utils.network import StreamLimits
client.stream_limits = StreamLimits(
max_messages=10_000, # 최대 WebSocket 메시지
max_lines=50_000, # 최대 HTTP 줄
max_bytes=50*1024*1024, # 50MB 총량
max_message_size=10_240 # 메시지당 10KB
)
자세한 스트리밍 메서드 문서는 RayClient 를 참고하세요.
Ray 작업
AgentClient 는 mixin 을 통해 모든 Ray 클러스터 관리 메서드를 포함합니다.
Job 작업
# 모든 job 목록
jobs = client.list_jobs()
# job 상세 조회
job = client.get_job('raysubmit_abc123')
# job 로그 조회 (스트리밍 아님)
logs = client.get_job_logs('raysubmit_abc123')
# 실행 중 job 중단
result = client.stop_job('raysubmit_abc123')
노드 작업
# 클러스터 노드 목록
nodes = client.list_nodes()
# 노드 상세 조회
node = client.get_node('node-abc123')
Task 작업
# 모든 task 목록
tasks = client.list_tasks()
# task 상세 조회
task = client.get_task('task-xyz789')
Ray Serve 작업
# serve 애플리케이션 목록
apps = client.list_serve_applications()
# 애플리케이션 상세 조회
app = client.get_serve_application('my-app')
# 애플리케이션 삭제
client.delete_serve_application('my-app')
Resource Gatekeeping (SYN-7005)
AgentClient 는 클러스터 리소스 feasibility(가용성) 검사를 위해 ResourceClientMixin 을 mixin 합니다. 이 메서드들은 SDK 의 serve deploy capacity gate (Inference Actions — Capacity Gate 참조)에서 사용됩니다.
check_feasibility
agent 클러스터에 대한 dry-run gatekeeping 평가. {allowed: bool, reasons: list[str], ...} 를 반환합니다.
def check_feasibility(
kind: Literal['job', 'serve', 'container', 'task'],
num_cpus: float,
num_gpus: float = 0.0,
memory_bytes: int | None = None,
replicas: int = 1,
metadata: dict[str, Any] | None = None,
) -> dict
| 파라미터 | 타입 | 필수 | 기본값 | 설명 |
|---|---|---|---|---|
kind | Literal['job', 'serve', 'container', 'task'] | 예 | — | 평가할 리소스 종류 |
num_cpus | float | 예 | — | 요청 CPU 수 |
num_gpus | float | 아니오 | 0.0 | 요청 GPU 수 |
memory_bytes | int | None | 아니오 | None | 요청 메모리(바이트) |
replicas | int | 아니오 | 1 | replica 수 |
metadata | dict | None | 아니오 | None | 호출자 메타데이터 (예: {'plugin_code': ..., 'agent': ...}) |
Endpoint: POST /resources/feasibility/
result = client.check_feasibility(
kind='serve',
num_cpus=2.0,
num_gpus=1.0,
memory_bytes=4 * 1024**3, # 4 GiB
replicas=2,
metadata={'plugin_code': '[email protected]', 'agent': '42'},
)
if not result['allowed']:
print('denied:', result.get('reasons', []))
get_cluster_resources
def get_cluster_resources(use_cache: bool = True) -> dict
agent 의 현재 클러스터 리소스 스냅샷을 반환합니다. use_cache=False 는 라이브 재집계를 강제합니다.
get_gatekeeping_policy / update_gatekeeping_policy
def get_gatekeeping_policy() -> dict
def update_gatekeeping_policy(patch: dict[str, Any]) -> dict
agent 의 in-memory gatekeeping 정책을 읽거나 원자적으로 patch 합니다. 정책 변경은 클러스터 전역에 적용되므로 신중히 사용하세요.
에러 처리
from synapse_sdk.clients.exceptions import ClientError
try:
for line in client.tail_job_logs('invalid-job'):
print(line)
except ClientError as e:
if e.status_code == 400:
print("잘못된 job ID 또는 파라미터")
elif e.status_code == 404:
print("Job 을 찾을 수 없음")
elif e.status_code == 503:
print("Agent 연결 실패")
else:
print(f"Error: {e}")
공통 에러 코드
| 코드 | 의미 |
|---|---|
| 400 | 잘못된 파라미터 (job ID, timeout, protocol) |
| 404 | 리소스를 찾을 수 없음 |
| 408 | 연결 또는 읽기 timeout |
| 429 | 스트림 한도 초과 |
| 500 | 내부 에러 또는 라이브러리 사용 불가 |
| 503 | Agent 연결 실패 |
관련 문서
- RayClient — 상세 Ray 스트리밍 메서드
- BackendClient — 백엔드 작업
- BaseClient — 기반 클라이언트 구현
- Network Utilities — StreamLimits 및 검증