RayClient
Client for Apache Ray cluster management and monitoring functionality.
Overview
The RayClientMixin provides comprehensive Ray cluster operations, including job management, real-time log streaming, node monitoring, and Ray Serve application control. It's designed as a mixin class that extends BaseClient with Ray-specific functionality.
Key Features
- Job Lifecycle Management: Create, monitor, manage, and stop Ray jobs
- Real-time Log Streaming: WebSocket and HTTP-based log tailing
- Node & Task Monitoring: Monitor cluster resources and task execution
- Ray Serve Integration: Deploy and manage Ray Serve applications
- Robust Error Handling: Input validation and sanitized error messages
- Resource Management: Automatic cleanup and connection tracking
Constructor
# RayClientMixin is typically used as a mixin
class RayClient(RayClientMixin):
def __init__(self, base_url: str, timeout: dict = None):
super().__init__(base_url, timeout)
Parameters
base_url(str): Ray cluster dashboard URL (e.g., "http://ray-head:8265")timeout(dict, optional): Connection and read timeout configuration
Usage
from synapse_sdk.clients.agent.ray import RayClientMixin
from synapse_sdk.clients.base import BaseClient
class RayClient(RayClientMixin, BaseClient):
pass
client = RayClient(base_url="http://ray-head:8265")
# List all jobs
jobs = client.list_jobs()
# Get specific job details
job = client.get_job('job-12345')
# Stop a running job if needed
if job['status'] == 'RUNNING':
result = client.stop_job('job-12345')
print(f"Job stop initiated: {result['status']}")
# Stream logs in real-time
for log_line in client.tail_job_logs('job-12345'):
print(log_line.strip())
Job Operations
get_job(pk)
Retrieve details for a specific job.
job = client.get_job('job-12345')
print(f"Job status: {job['status']}")
list_jobs()
List all jobs in the Ray cluster.
jobs = client.list_jobs()
for job in jobs['results']:
print(f"Job {job['id']}: {job['status']}")
list_job_logs(pk)
Get static log entries for a job.
logs = client.list_job_logs('job-12345')
stop_job(pk)
Stop a running job gracefully using Ray's stop_job() API.
# Stop a running job
result = client.stop_job('job-12345')
print(f"Stop status: {result['status']}")
# Handle stop errors
try:
client.stop_job('job-12345')
except ClientError as e:
print(f"Stop failed: {e}")
Real-time Log Streaming
tail_job_logs(pk, stream_timeout=10, protocol='stream')
Stream job logs using either WebSocket or HTTP protocol.
# HTTP streaming (default, more compatible)
for log_line in client.tail_job_logs('job-12345', protocol='stream'):
print(log_line.strip())
# WebSocket streaming (lower latency)
for log_line in client.tail_job_logs('job-12345', protocol='websocket'):
print(log_line.strip())
# With custom timeout
for log_line in client.tail_job_logs('job-12345', stream_timeout=30):
if 'ERROR' in log_line:
break
websocket_tail_job_logs(pk, stream_timeout=10)
Stream logs via WebSocket for lowest latency.
try:
for log_line in client.websocket_tail_job_logs('job-12345'):
print(log_line.strip())
if 'COMPLETED' in log_line:
break
except ClientError as e:
print(f"WebSocket streaming failed: {e}")
stream_tail_job_logs(pk, stream_timeout=10)
Stream logs via HTTP chunked transfer encoding.
for log_line in client.stream_tail_job_logs('job-12345', stream_timeout=60):
if 'FAILED' in log_line:
print(f"Job failed: {log_line}")
break
Node Operations
get_node(pk)
Get details for a specific cluster node.
node = client.get_node('node-abc123')
print(f"Node status: {node['alive']}")
list_nodes()
List all nodes in the Ray cluster.
nodes = client.list_nodes()
for node in nodes['results']:
print(f"Node {node['node_id']}: {node['state']}")
Task Operations
get_task(pk)
Retrieve details for a specific task.
task = client.get_task('task-xyz789')
list_tasks()
List all tasks in the cluster.
tasks = client.list_tasks()
Ray Serve Operations
get_serve_application(pk)
Get details for a Ray Serve application.
app = client.get_serve_application('app-123')
print(f"Application status: {app['status']}")
list_serve_applications()
List all Ray Serve applications.
apps = client.list_serve_applications()
delete_serve_application(pk)
Delete a Ray Serve application.
client.delete_serve_application('app-123')
Error Handling
All methods include robust error handling with specific ClientError exceptions:
from synapse_sdk.clients.exceptions import ClientError
try:
for log_line in client.tail_job_logs('invalid-job'):
print(log_line)
except ClientError as e:
if e.status == 400:
print("Invalid job ID or parameters")
elif e.status == 404:
print("Job not found")
elif e.status == 503:
print("Connection to Ray cluster failed")
else:
print(f"Unexpected error: {e}")
Common Error Codes
- 400: Invalid parameters (job ID, timeout, protocol) or job already in terminal state
- 404: Resource not found (job, node, task, application)
- 408: Connection or read timeout
- 429: Stream limits exceeded
- 500: WebSocket library unavailable or internal error
- 503: Ray cluster connection failed
Resource Management
The RayClient includes automatic resource management:
- Thread Pool: 5 worker threads for concurrent operations
- Connection Tracking: WeakSet for active connections
- Stream Limits: Prevents memory exhaustion
- Automatic Cleanup: Resources cleaned up on destruction
Stream Limits
Default limits for log streaming:
- Max messages: 10,000
- Max lines: 50,000
- Max bytes: 50MB
- Max message size: 10KB
- Queue size: 1,000
Best Practices
1. Protocol Selection
# Use WebSocket for lowest latency when available
try:
logs = client.tail_job_logs(job_id, protocol='websocket')
except ClientError:
# Fallback to HTTP streaming
logs = client.tail_job_logs(job_id, protocol='stream')
2. Timeout Management
# Use appropriate timeouts for long-running jobs
for log_line in client.tail_job_logs(job_id, stream_timeout=300):
process_log_line(log_line)
3. Error Recovery
import time
def robust_log_streaming(client, job_id, max_retries=3):
for attempt in range(max_retries):
try:
for log_line in client.tail_job_logs(job_id):
yield log_line
break
except ClientError as e:
if e.status == 503 and attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
raise
4. Resource Cleanup
# Context manager for proper cleanup
class RayClientContext:
def __init__(self, base_url):
self.client = RayClient(base_url)
def __enter__(self):
return self.client
def __exit__(self, exc_type, exc_val, exc_tb):
# Cleanup handled automatically by RayClient.__del__()
pass
with RayClientContext("http://ray-head:8265") as client:
for log_line in client.tail_job_logs('job-12345'):
print(log_line.strip())
Thread Safety
RayClient is designed for concurrent use with proper thread safety mechanisms:
- Thread pool for background operations
- WeakSet for connection tracking
- Proper resource cleanup mechanisms
See Also
- AgentClient - For agent-specific operations
- BaseClient - Base client implementation
- Network Utilities - Streaming and validation utilities