MLClientMixin
Provides machine learning model management and ground truth operations for the Synapse backend.
Overview
The MLClientMixin handles all operations related to machine learning models, ground truth datasets, and model evaluation workflows. This mixin is automatically included in the BackendClient and provides methods for ML pipeline integration.
Model Management
list_models(params=None)
List available machine learning models with filtering options.
# List all models
models = client.list_models()
for model in models[0]:
print(f"Model: {model['name']} (ID: {model['id']})")
# List models for a specific project
project_models = client.list_models(params={'project': 123})
# List models by type
classification_models = client.list_models(params={'model_type': 'classification'})
# List active models only
active_models = client.list_models(params={'is_active': True})
Parameters:
params(dict, optional): Filtering parameters
Common filtering params:
project: Filter by project IDmodel_type: Filter by model type (classification,detection,segmentation)is_active: Filter by model statuscreated_after: Filter by creation datesearch: Text search in model names and descriptions
Returns:
tuple: (models_list, total_count)
get_model(pk, params=None, url_conversion=None)
Get detailed information about a specific model.
# Get basic model info
model = client.get_model(456)
print(f"Model: {model['name']}")
print(f"Type: {model['model_type']}")
print(f"Accuracy: {model['metrics']['accuracy']}")
# Get model with expanded metrics
model = client.get_model(456, params={'expand': 'metrics'})
# Get model with custom URL conversion for files
model = client.get_model(
456,
url_conversion={'file': lambda url: f"https://cdn.example.com{url}"}
)
Parameters:
pk(int): Model IDparams(dict, optional): Query parametersurl_conversion(dict, optional): Custom URL conversion for file fields
Common params:
expand: Include additional data (metrics,evaluations,versions)include_file: Whether to include model file information
Returns:
dict: Complete model information including metadata and metrics
Model structure:
id: Model IDname: Model namedescription: Model descriptionmodel_type: Type of modelfile: Model file referencemetrics: Performance metricsproject: Associated project IDis_active: Whether model is currently activecreated_at: Creation timestamp
create_model(data)
Create a new machine learning model with file upload.
# Create model with file upload
model_data = {
'name': 'Object Detection Model v2',
'description': 'Improved object detection with better accuracy',
'model_type': 'detection',
'project': 123,
'metrics': {
'accuracy': 0.92,
'precision': 0.89,
'recall': 0.94,
'f1_score': 0.91
},
'configuration': {
'input_size': [640, 640],
'num_classes': 10,
'framework': 'pytorch'
},
'file': '/path/to/model.pkl' # Will be uploaded via chunked upload
}
new_model = client.create_model(model_data)
print(f"Created model with ID: {new_model['id']}")
Parameters:
data(dict): Model configuration and metadata
Model data structure:
name(str, required): Model namedescription(str): Model descriptionmodel_type(str, required): Model typeproject(int, required): Project IDfile(str, required): Path to model filemetrics(dict): Performance metricsconfiguration(dict): Model configurationis_active(bool): Whether model should be active
Returns:
dict: Created model with generated ID
Note: The model file is automatically uploaded using chunked upload for optimal performance.
Ground Truth Operations
list_ground_truth_events(params=None, url_conversion=None, list_all=False)
List ground truth events with comprehensive filtering options.
# List ground truth events for a dataset version
events = client.list_ground_truth_events(params={
'ground_truth_dataset_versions': [123]
})
# List all events (handles pagination automatically)
all_events = client.list_ground_truth_events(list_all=True)
# List events with date filtering
from datetime import datetime, timedelta
recent_date = (datetime.now() - timedelta(days=30)).isoformat()
recent_events = client.list_ground_truth_events(params={
'created_after': recent_date,
'ground_truth_dataset_versions': [123]
})
# List events with custom URL conversion
events = client.list_ground_truth_events(
params={'ground_truth_dataset_versions': [123]},
url_conversion={'files': lambda url: f"https://cdn.example.com{url}"}
)
Parameters:
params(dict, optional): Filtering parametersurl_conversion(dict, optional): Custom URL conversion for file fieldslist_all(bool): If True, automatically handles pagination
Common filtering params:
ground_truth_dataset_versions: List of dataset version IDsproject: Filter by project IDcreated_after: Filter by creation datedata_type: Filter by data typesearch: Text search in event data
Returns:
tuple: (events_list, total_count) iflist_all=Falselist: All events iflist_all=True
Ground truth event structure:
id: Event IDdata: Annotation/ground truth datadata_unit: Associated data unit informationground_truth_dataset_version: Dataset version IDcreated_at: Creation timestampmetadata: Additional event metadata
get_ground_truth_version(pk)
Get detailed information about a ground truth dataset version.
version = client.get_ground_truth_version(123)
print(f"Dataset version: {version['version']}")
print(f"Dataset: {version['ground_truth_dataset']['name']}")
print(f"Total events: {version['event_count']}")
print(f"Created: {version['created_at']}")
Parameters:
pk(int): Ground truth dataset version ID
Returns:
dict: Complete dataset version information
Dataset version structure:
id: Version IDversion: Version number/nameground_truth_dataset: Parent dataset informationevent_count: Number of events in this versiondescription: Version descriptionis_active: Whether version is currently activecreated_at: Creation timestampstatistics: Version statistics and metrics
Model Evaluation Workflows
Model Performance Analysis
def analyze_model_performance(model_id, ground_truth_version_id):
"""Analyze model performance against ground truth data."""
# Get model details
model = client.get_model(model_id, params={'expand': 'metrics'})
print(f"Analyzing model: {model['name']}")
# Get ground truth data
gt_events = client.list_ground_truth_events(
params={'ground_truth_dataset_versions': [ground_truth_version_id]},
list_all=True
)
print(f"Ground truth events: {len(gt_events)}")
# Extract metrics
model_metrics = model.get('metrics', {})
print("Model Metrics:")
for metric, value in model_metrics.items():
print(f" {metric}: {value}")
# Analyze ground truth distribution
data_types = {}
for event in gt_events:
data_type = event['data'].get('type', 'unknown')
data_types[data_type] = data_types.get(data_type, 0) + 1
print("Ground Truth Distribution:")
for data_type, count in data_types.items():
print(f" {data_type}: {count}")
return {
'model': model,
'ground_truth_stats': data_types,
'total_gt_events': len(gt_events)
}
# Usage
analysis = analyze_model_performance(456, 123)
Model Comparison
def compare_models(model_ids, metric='accuracy'):
"""Compare multiple models by a specific metric."""
models = []
for model_id in model_ids:
model = client.get_model(model_id, params={'expand': 'metrics'})
models.append(model)
# Sort by metric
sorted_models = sorted(
models,
key=lambda m: m.get('metrics', {}).get(metric, 0),
reverse=True
)
print(f"Models ranked by {metric}:")
for i, model in enumerate(sorted_models, 1):
metric_value = model.get('metrics', {}).get(metric, 'N/A')
print(f"{i}. {model['name']}: {metric_value}")
return sorted_models
# Compare models by accuracy
model_ranking = compare_models([456, 457, 458], metric='accuracy')
Ground Truth Data Export
def export_ground_truth_data(dataset_version_id, output_format='coco'):
"""Export ground truth data in specified format."""
# Get dataset version info
version = client.get_ground_truth_version(dataset_version_id)
print(f"Exporting dataset: {version['ground_truth_dataset']['name']}")
# Get all events
events = client.list_ground_truth_events(
params={'ground_truth_dataset_versions': [dataset_version_id]},
list_all=True
)
if output_format == 'coco':
# Convert to COCO format
coco_data = {
'info': {
'description': version['description'],
'version': version['version'],
'year': 2023,
'contributor': 'Synapse SDK'
},
'images': [],
'annotations': [],
'categories': []
}
# Process events
for event in events:
# Extract image info from data_unit
data_unit = event['data_unit']
files = data_unit.get('files', {})
# Add image
if 'image' in files:
image_info = {
'id': data_unit['id'],
'file_name': files['image'].get('name', ''),
'width': files['image'].get('width', 0),
'height': files['image'].get('height', 0)
}
coco_data['images'].append(image_info)
# Add annotations
annotations = event['data'].get('annotations', [])
for ann in annotations:
annotation = {
'id': len(coco_data['annotations']),
'image_id': data_unit['id'],
'category_id': ann.get('category_id', 1),
'bbox': ann.get('bbox', []),
'area': ann.get('area', 0),
'iscrowd': ann.get('iscrowd', 0)
}
coco_data['annotations'].append(annotation)
return coco_data
else:
# Return raw format
return events
# Export as COCO format
coco_data = export_ground_truth_data(123, 'coco')
print(f"Exported {len(coco_data['images'])} images and {len(coco_data['annotations'])} annotations")
Model Training Integration
Training Data Preparation
def prepare_training_data(project_id, split_ratio=0.8):
"""Prepare training and validation data from ground truth."""
# Get all ground truth events for project
events = client.list_ground_truth_events(
params={'project': project_id},
list_all=True
)
# Split data
import random
random.shuffle(events)
split_point = int(len(events) * split_ratio)
train_events = events[:split_point]
val_events = events[split_point:]
print(f"Training samples: {len(train_events)}")
print(f"Validation samples: {len(val_events)}")
return {
'train': train_events,
'validation': val_events,
'total': len(events)
}
# Prepare data
data_split = prepare_training_data(123, split_ratio=0.8)
Model Deployment
def deploy_model(model_path, model_config):
"""Deploy a trained model to the system."""
# Create model entry
model_data = {
'name': model_config['name'],
'description': model_config['description'],
'model_type': model_config['type'],
'project': model_config['project_id'],
'file': model_path,
'metrics': model_config.get('metrics', {}),
'configuration': model_config.get('configuration', {}),
'is_active': True
}
# Upload and create model
model = client.create_model(model_data)
print(f"Deployed model: {model['id']}")
# Deactivate previous models if requested
if model_config.get('replace_active', False):
existing_models = client.list_models(params={
'project': model_config['project_id'],
'is_active': True
})
for existing_model in existing_models[0]:
if existing_model['id'] != model['id']:
# Note: You'd need an update_model method for this
print(f"Would deactivate model: {existing_model['id']}")
return model
# Deploy model
deployment_config = {
'name': 'Production Object Detector v3',
'description': 'Latest object detection model for production',
'type': 'detection',
'project_id': 123,
'metrics': {'accuracy': 0.94, 'mAP': 0.87},
'configuration': {'input_size': [640, 640], 'confidence_threshold': 0.5},
'replace_active': True
}
deployed_model = deploy_model('/path/to/trained_model.pkl', deployment_config)
Complete ML Workflow
def complete_ml_workflow(project_id):
"""Complete machine learning workflow from data to deployed model."""
client = BackendClient(
base_url="https://api.synapse.sh",
api_token="your-token"
)
print("=== ML Workflow Started ===")
# 1. Analyze available ground truth data
print("1. Analyzing ground truth data...")
events = client.list_ground_truth_events(
params={'project': project_id},
list_all=True
)
print(f"Found {len(events)} ground truth events")
# 2. Get existing models for comparison
print("2. Checking existing models...")
existing_models = client.list_models(params={'project': project_id})
print(f"Found {len(existing_models[0])} existing models")
# 3. Create and deploy new model (simulated)
print("3. Deploying new model...")
model_data = {
'name': f'Auto-Generated Model for Project {project_id}',
'description': 'Model created through automated workflow',
'model_type': 'classification',
'project': project_id,
'file': '/path/to/new_model.pkl', # In real scenario, this would be actual trained model
'metrics': {
'accuracy': 0.93,
'precision': 0.91,
'recall': 0.95,
'f1_score': 0.93
},
'configuration': {
'framework': 'pytorch',
'input_size': [224, 224],
'num_classes': 10
}
}
new_model = client.create_model(model_data)
print(f"Deployed model: {new_model['id']}")
# 4. Compare with existing models
print("4. Comparing model performance...")
all_models = existing_models[0] + [new_model]
best_model = max(all_models, key=lambda m: m.get('metrics', {}).get('accuracy', 0))
print(f"Best model: {best_model['name']} (Accuracy: {best_model['metrics']['accuracy']})")
print("=== ML Workflow Completed ===")
return new_model
# Run workflow
if __name__ == "__main__":
result = complete_ml_workflow(123)
Error Handling
from synapse_sdk.clients.exceptions import ClientError
def robust_model_operations():
"""Example of robust model operations with error handling."""
try:
# Try to get model
model = client.get_model(999)
except ClientError as e:
if e.status_code == 404:
print("Model not found")
return None
else:
print(f"Error getting model: {e}")
raise
try:
# Try to create model
model_data = {
'name': 'Test Model',
'model_type': 'classification',
'project': 123,
'file': '/path/to/model.pkl'
}
new_model = client.create_model(model_data)
except ClientError as e:
if e.status_code == 400:
print(f"Invalid model data: {e.response}")
elif e.status_code == 413:
print("Model file too large")
else:
print(f"Error creating model: {e}")
return None
return new_model
See Also
- BackendClient - Main backend client
- DataCollectionClientMixin - Data management operations
- IntegrationClientMixin - Plugin and job management