RayClientMixin
Mixin for Ray cluster management and real-time log streaming.
Overview
The RayClientMixin provides Ray cluster operations including job management, real-time log streaming (WebSocket/HTTP), node monitoring, and Ray Serve application control. It's included in both AgentClient (sync) and AsyncAgentClient (async).
Key Features
- Job Management: List, get, stop Ray jobs
- Real-time Log Streaming: WebSocket and HTTP-based log tailing with auto-fallback
- Node Monitoring: Monitor cluster nodes
- Task Monitoring: Track task execution
- Ray Serve: Deploy and manage serve applications
- Resource Protection: StreamLimits to prevent memory exhaustion
Log Streaming
tail_job_logs()
Unified streaming method with automatic protocol selection.
def tail_job_logs(
job_id: str,
timeout: float = 30.0,
*,
protocol: Literal['websocket', 'http', 'auto'] = 'auto'
) -> Generator[str, None, None]
Parameters:
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
job_id | str | Yes | - | Ray job ID (e.g., 'raysubmit_abc123') |
timeout | float | No | 30.0 | Connection timeout in seconds |
protocol | Literal['auto', 'websocket', 'http'] | No | 'auto' | Protocol selection (see below) |
Protocol Options:
'auto': Try WebSocket, fall back to HTTP on failure'websocket': WebSocket only (lowest latency)'http': HTTP chunked streaming only (more compatible)
Yields: Log lines as strings
Example:
# Auto protocol selection (recommended)
for line in client.tail_job_logs('raysubmit_abc123'):
print(line)
# Explicit WebSocket
for line in client.tail_job_logs('raysubmit_abc123', protocol='websocket'):
print(line)
# Explicit HTTP streaming
for line in client.tail_job_logs('raysubmit_abc123', protocol='http'):
print(line)
# With custom timeout
for line in client.tail_job_logs('raysubmit_abc123', timeout=60):
if 'ERROR' in line:
break
websocket_tail_job_logs()
Direct WebSocket streaming for lowest latency.
def websocket_tail_job_logs(
job_id: str,
timeout: float = 30.0
) -> Generator[str, None, None]
Requires: websocket-client package (sync) or websockets package (async)
for line in client.websocket_tail_job_logs('raysubmit_abc123'):
print(line)
stream_tail_job_logs()
HTTP chunked transfer streaming as fallback.
def stream_tail_job_logs(
job_id: str,
timeout: float = 30.0
) -> Generator[str, None, None]
for line in client.stream_tail_job_logs('raysubmit_abc123'):
print(line)
Async Streaming
For AsyncAgentClient, all streaming methods return AsyncGenerator:
from synapse_sdk.clients.agent import AsyncAgentClient
async with AsyncAgentClient(base_url, agent_token) as client:
# Auto protocol
async for line in client.tail_job_logs('raysubmit_abc123'):
print(line)
# WebSocket
async for line in client.websocket_tail_job_logs('raysubmit_abc123'):
print(line)
# HTTP
async for line in client.stream_tail_job_logs('raysubmit_abc123'):
print(line)
Stream Limits
Configure resource limits to prevent memory exhaustion:
from synapse_sdk.utils.network import StreamLimits
# Set custom limits
client.stream_limits = StreamLimits(
max_messages=10_000, # Max WebSocket messages
max_lines=50_000, # Max HTTP lines
max_bytes=50*1024*1024, # 50MB total
max_message_size=10_240 # 10KB per message
)
When limits are exceeded, ClientError with status code 429 is raised.
Job Operations
list_jobs()
List all Ray jobs in the cluster.
jobs = client.list_jobs()
for job in jobs:
print(f"Job {job['job_id']}: {job['status']}")
get_job()
Get details for a specific job.
job = client.get_job('raysubmit_abc123')
print(f"Status: {job['status']}")
print(f"Start time: {job['start_time']}")
get_job_logs()
Get all logs for a job (non-streaming).
logs = client.get_job_logs('raysubmit_abc123')
print(logs)
stop_job()
Stop a running job.
result = client.stop_job('raysubmit_abc123')
print(f"Stopped: {result}")
Node Operations
list_nodes()
List all nodes in the Ray cluster.
nodes = client.list_nodes()
for node in nodes:
print(f"Node {node['node_id']}: {node['state']}")
get_node()
Get details for a specific node.
node = client.get_node('node-abc123')
print(f"Alive: {node['alive']}")
Task Operations
list_tasks()
List all tasks in the cluster.
tasks = client.list_tasks()
get_task()
Get details for a specific task.
task = client.get_task('task-xyz789')
Ray Serve Operations
list_serve_applications()
List all Ray Serve applications.
apps = client.list_serve_applications()
get_serve_application()
Get details for a serve application.
app = client.get_serve_application('my-app')
print(f"Status: {app['status']}")
delete_serve_application()
Delete a serve application.
client.delete_serve_application('my-app')
Error Handling
from synapse_sdk.clients.exceptions import ClientError
try:
for line in client.tail_job_logs('invalid-job'):
print(line)
except ClientError as e:
if e.status_code == 400:
print("Invalid job ID format")
elif e.status_code == 429:
print("Stream limits exceeded")
elif e.status_code == 500:
print("WebSocket library not installed")
elif e.status_code == 503:
print("Connection failed")
Error Codes
| Code | Meaning |
|---|---|
| 400 | Invalid job ID, timeout, or protocol |
| 404 | Resource not found |
| 408 | Connection timeout |
| 429 | Stream limits exceeded |
| 500 | Library unavailable or internal error |
| 503 | Connection failed or closed |
Best Practices
Protocol Selection
# Let auto handle fallback (recommended for production)
for line in client.tail_job_logs(job_id, protocol='auto'):
process(line)
# Use WebSocket for interactive monitoring
for line in client.tail_job_logs(job_id, protocol='websocket'):
display_realtime(line)
# Use HTTP for compatibility with proxies/firewalls
for line in client.tail_job_logs(job_id, protocol='http'):
log(line)
Error Recovery
import time
def robust_streaming(client, job_id, max_retries=3):
for attempt in range(max_retries):
try:
for line in client.tail_job_logs(job_id):
yield line
break
except ClientError as e:
if e.status_code == 503 and attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
raise
Stream Limit Configuration
# High-volume production logs
client.stream_limits = StreamLimits(
max_messages=50_000,
max_lines=100_000,
max_bytes=200 * 1024 * 1024 # 200MB
)
# Limited development environment
client.stream_limits = StreamLimits(
max_messages=1_000,
max_lines=5_000,
max_bytes=10 * 1024 * 1024 # 10MB
)
Related
- AgentClient — Main client with Ray mixin
- Network Utilities — StreamLimits and validation
- BaseClient — Base client implementation