DataCollectionClientMixin
Provides data collection and file management operations for the Synapse backend.
Overview
The DataCollectionClientMixin handles all operations related to data collections, file uploads, data units, and batch processing. This mixin is automatically included in the BackendClient and provides methods for managing large-scale data operations.
Data Collection Operations
list_data_collection()
Retrieve a list of all available data collections.
collections = client.list_data_collection()
for collection in collections:
print(f"Collection: {collection['name']} (ID: {collection['id']})")
Returns:
list: List of data collection objects
get_data_collection(data_collection_id)
Get detailed information about a specific data collection.
collection = client.get_data_collection(123)
print(f"Collection: {collection['name']}")
print(f"Description: {collection['description']}")
# Access file specifications
file_specs = collection['file_specifications']
for spec in file_specs:
print(f"File type: {spec['name']}, Required: {spec['is_required']}")
Parameters:
data_collection_id(int): Data collection ID
Returns:
dict: Detailed collection information including file specifications
Collection structure:
id: Collection IDname: Collection namedescription: Collection descriptionfile_specifications: List of required file types and formatsproject: Associated project IDcreated_at: Creation timestamp
File Operations
create_data_file(file_path, use_chunked_upload=False)
Create and upload a data file to the backend.
from pathlib import Path
# Regular upload for smaller files
data_file = client.create_data_file(Path('/path/to/image.jpg'))
print(f"Uploaded file ID: {data_file['id']}")
# Chunked upload for large files (>50MB recommended)
large_file = client.create_data_file(
Path('/path/to/large_dataset.zip'),
use_chunked_upload=True
)
print(f"Large file uploaded: {large_file['id']}")
Parameters:
file_path(Path): Path object pointing to the file to uploaduse_chunked_upload(bool): Enable chunked upload for large files
Returns:
dictorstr: File upload response with file ID and metadata
When to use chunked upload:
- Files larger than 50MB
- Unreliable network connections
- When you need upload progress tracking
- For better error recovery
upload_data_file(organized_file, collection_id, use_chunked_upload=False)
Upload organized file data to a specific collection.
# Organize file data
organized_file = {
'files': {
'image': Path('/path/to/image.jpg'),
'annotation': Path('/path/to/annotation.json'),
'metadata': Path('/path/to/metadata.xml')
},
'meta': {
'origin_file_stem': 'sample_001',
'origin_file_extension': '.jpg',
'created_at': '2023-10-01T12:00:00Z',
'batch_id': 'batch_001'
}
}
# Upload to collection
result = client.upload_data_file(
organized_file=organized_file,
collection_id=123,
use_chunked_upload=False
)
Parameters:
organized_file(dict): Structured file data with files and metadatacollection_id(int): Target data collection IDuse_chunked_upload(bool): Enable chunked upload
Organized file structure:
files(dict): Dictionary mapping file types to file pathsmeta(dict): Metadata associated with the file group
Returns:
dict: Upload result with file references and IDs
create_data_units(uploaded_files)
Create data units from previously uploaded files.
# Files that have been uploaded
uploaded_files = [
{
'id': 1,
'file': {'image': 'file_id_123', 'annotation': 'file_id_124'},
'meta': {'batch': 'batch_001'}
},
{
'id': 2,
'file': {'image': 'file_id_125', 'annotation': 'file_id_126'},
'meta': {'batch': 'batch_001'}
}
]
# Create data units
data_units = client.create_data_units(uploaded_files)
print(f"Created {len(data_units)} data units")
Parameters:
uploaded_files(list): List of uploaded file structures
Returns:
list: Created data units with IDs and metadata
Batch Processing
The mixin supports efficient batch processing for large-scale operations:
from multiprocessing import Pool
from pathlib import Path
# Example: Batch upload multiple files
file_paths = [
Path('/data/batch1/file1.jpg'),
Path('/data/batch1/file2.jpg'),
Path('/data/batch1/file3.jpg'),
# ... more files
]
# Process files in batches
batch_size = 10
for i in range(0, len(file_paths), batch_size):
batch = file_paths[i:i+batch_size]
# Upload batch
uploaded_files = []
for file_path in batch:
result = client.create_data_file(file_path)
uploaded_files.append({
'id': len(uploaded_files) + 1,
'file': {'image': result['id']},
'meta': {'batch': f'batch_{i//batch_size}'}
})
# Create data units for batch
data_units = client.create_data_units(uploaded_files)
print(f"Processed batch {i//batch_size}: {len(data_units)} data units")
Progress Tracking
For large uploads, you can track progress:
import os
from tqdm import tqdm
def upload_with_progress(file_paths, collection_id):
"""Upload files with progress tracking."""
uploaded_files = []
with tqdm(total=len(file_paths), desc="Uploading files") as pbar:
for file_path in file_paths:
try:
# Check file size to decide upload method
file_size = os.path.getsize(file_path)
use_chunked = file_size > 50 * 1024 * 1024 # 50MB
# Upload file
result = client.create_data_file(
file_path,
use_chunked_upload=use_chunked
)
# Organize for collection
organized_file = {
'files': {'primary': file_path},
'meta': {
'origin_file_stem': file_path.stem,
'origin_file_extension': file_path.suffix,
'file_size': file_size
}
}
upload_result = client.upload_data_file(
organized_file,
collection_id,
use_chunked_upload=use_chunked
)
uploaded_files.append(upload_result)
pbar.update(1)
except Exception as e:
print(f"Failed to upload {file_path}: {e}")
pbar.update(1)
continue
return uploaded_files
# Usage
file_paths = [Path(f'/data/file_{i}.jpg') for i in range(100)]
results = upload_with_progress(file_paths, collection_id=123)
Data Validation
File Specification Validation
def validate_files_against_collection(file_paths, collection_id):
"""Validate files against collection specifications."""
collection = client.get_data_collection(collection_id)
file_specs = collection['file_specifications']
# Create specification lookup
required_types = {spec['name'] for spec in file_specs if spec['is_required']}
optional_types = {spec['name'] for spec in file_specs if not spec['is_required']}
# Validate file organization
organized_files = []
for file_path in file_paths:
# Extract file type from path or metadata
file_type = extract_file_type(file_path) # Custom function
if file_type in required_types or file_type in optional_types:
organized_files.append({
'path': file_path,
'type': file_type,
'valid': True
})
else:
print(f"Warning: Unknown file type '{file_type}' for {file_path}")
organized_files.append({
'path': file_path,
'type': file_type,
'valid': False
})
return organized_files
def extract_file_type(file_path):
"""Extract file type from path - implement based on your naming convention."""
# Example implementation
if 'image' in str(file_path):
return 'image'
elif 'annotation' in str(file_path):
return 'annotation'
elif 'metadata' in str(file_path):
return 'metadata'
else:
return 'unknown'
Error Handling and Retry Logic
import time
from synapse_sdk.clients.exceptions import ClientError
def robust_upload(file_path, max_retries=3):
"""Upload with retry logic for reliability."""
for attempt in range(max_retries):
try:
result = client.create_data_file(file_path, use_chunked_upload=True)
return result
except ClientError as e:
if e.status_code == 413: # File too large
print(f"File {file_path} too large, trying chunked upload")
try:
return client.create_data_file(file_path, use_chunked_upload=True)
except Exception as retry_e:
print(f"Chunked upload failed: {retry_e}")
if attempt == max_retries - 1:
raise
elif e.status_code == 429: # Rate limited
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited, waiting {wait_time} seconds...")
time.sleep(wait_time)
else:
print(f"Upload failed (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
raise
except Exception as e:
print(f"Unexpected error (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
raise
time.sleep(1) # Brief pause before retry
Complete Workflow Example
from pathlib import Path
from synapse_sdk.clients.backend import BackendClient
def complete_data_ingestion_workflow():
"""Complete workflow for data ingestion."""
client = BackendClient(
base_url="https://api.synapse.sh",
api_token="your-token"
)
# 1. Choose or create data collection
collections = client.list_data_collection()
collection_id = collections[0]['id'] # Use first available
# 2. Get collection specifications
collection = client.get_data_collection(collection_id)
print(f"Using collection: {collection['name']}")
# 3. Prepare file paths
data_dir = Path('/path/to/your/data')
image_files = list(data_dir.glob('*.jpg'))
# 4. Upload files and create data units
uploaded_files = []
for i, image_path in enumerate(image_files):
# Upload individual file
data_file = client.create_data_file(image_path)
# Organize for collection
organized_file = {
'files': {'image': image_path},
'meta': {
'origin_file_stem': image_path.stem,
'origin_file_extension': image_path.suffix,
'sequence': i,
'batch': 'batch_001'
}
}
# Upload to collection
upload_result = client.upload_data_file(
organized_file,
collection_id
)
uploaded_files.append(upload_result)
# 5. Create data units in batches
batch_size = 10
all_data_units = []
for i in range(0, len(uploaded_files), batch_size):
batch = uploaded_files[i:i+batch_size]
data_units = client.create_data_units(batch)
all_data_units.extend(data_units)
print(f"Created batch {i//batch_size}: {len(data_units)} data units")
print(f"Total data units created: {len(all_data_units)}")
return all_data_units
# Run the workflow
if __name__ == "__main__":
data_units = complete_data_ingestion_workflow()
See Also
- BackendClient - Main backend client
- CoreClientMixin - Core file operations
- AnnotationClientMixin - Task and annotation management