Client SDK

The Mitosis project contains a SDK library (named netmito) that you can use to create your own client applications programmatically. The SDK provides a comprehensive Rust API for interacting with Mitosis coordinators. For Python SDK, please refer to the Mitosis Python SDK repository.

Installation

Add the following to your Cargo.toml:

[dependencies]
netmito = "0.6.0"
tokio = { version = "1.0", features = ["full"] }
uuid = { version = "1.18", features = ["v4"] }

Basic Usage

Client Setup

use netmito::client::MitoClient;
use netmito::config::{ClientConfig, client::LoginArgs};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create client configuration
    let mut config = ClientConfig::default();
    config.coordinator_addr = "http://localhost:5000".parse()?;

    // Create client instance
    let mut client = MitoClient::setup(config).await?;

    // Login with credentials
    let login_args = LoginArgs {
        username: Some("username".to_string()),
        password: Some("password".to_string()),
        retain: true,
    };
    client.user_login(login_args).await?;

    Ok(())
}

Task Management

Submitting Tasks

use netmito::config::client::TaskSubmitArgs;

// Submit a simple task
let args = TaskSubmitArgs {
    command: vec!["echo".to_string(), "Hello World".to_string()],
    group: Some("my-group".to_string()),
    tags: vec!["test".to_string()],
    labels: vec!["example".to_string()],
    env: vec!["MY_VAR=value".to_string()],
    terminal: true, // Capture stdout/stderr
    ..Default::default()
};

let task_id = client.task_submit(args).await?;
println!("Submitted task: {}", task_id);

Querying Tasks

use netmito::config::client::TaskQueryArgs;

// Query all tasks
let args = TaskQueryArgs::default();
let tasks = client.task_query(args).await?;

// Query tasks with filters
let args = TaskQueryArgs {
    labels: Some(vec!["example".to_string()]),
    status: Some("completed".to_string()),
    limit: Some(10),
    ..Default::default()
};
let filtered_tasks = client.task_query(args).await?;

Getting Task Details

use uuid::Uuid;

let task_id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?;
let task_info = client.task_get(task_id).await?;

println!("Task status: {:?}", task_info.exec_state);
println!("Created at: {}", task_info.created_at);

Downloading Artifacts

use netmito::config::client::TaskArtifactDownloadArgs;

let args = TaskArtifactDownloadArgs {
    task_id,
    artifact_type: "result".to_string(), // "result", "exec-log", or "std-log"
    output_path: Some("./task-results.tar.gz".to_string()),
    ..Default::default()
};

client.task_artifact_download(args).await?;

User and Group Management

Creating Users (Admin only)

use netmito::config::client::AdminCreateUserArgs;

let args = AdminCreateUserArgs {
    username: Some("new_user".to_string()),
    password: Some("secure_password".to_string()),
    admin: false,
};

client.admin_create_user(args).await?;

Group Operations

use netmito::config::client::{GroupCreateArgs, GroupUpdateUserArgs};

// Create a group
let args = GroupCreateArgs {
    group_name: "research-team".to_string(),
};
client.group_create(args).await?;

// Add user to group
let args = GroupUpdateUserArgs {
    group_name: "research-team".to_string(),
    username: "researcher".to_string(),
    role: "write".to_string(), // "read", "write", or "admin"
};
client.group_update_user(args).await?;

Worker Management

Querying Workers

use netmito::config::client::WorkerQueryArgs;

let args = WorkerQueryArgs {
    group: Some("my-group".to_string()),
    tags: Some(vec!["gpu".to_string()]),
    ..Default::default()
};

let workers = client.worker_query(args).await?;
for worker in workers {
    println!("Worker {} is {}", worker.id, worker.status);
}

Managing Worker Tags

use netmito::config::client::WorkerUpdateTagsArgs;

let worker_id = Uuid::parse_str("worker-uuid-here")?;
let args = WorkerUpdateTagsArgs {
    worker_id,
    tags: vec!["gpu".to_string(), "cuda".to_string()],
};

client.worker_update_tags(args).await?;

Group Attachments

Uploading Files

use netmito::config::client::GroupAttachmentUploadArgs;

let args = GroupAttachmentUploadArgs {
    group_name: Some("my-group".to_string()),
    local_path: "./dataset.tar.gz".to_string(),
    attachment_key: Some("datasets/experiment-1.tar.gz".to_string()),
    ..Default::default()
};

client.group_attachment_upload(args).await?;

Downloading Files

use netmito::config::client::GroupAttachmentDownloadArgs;

let args = GroupAttachmentDownloadArgs {
    group_name: Some("my-group".to_string()),
    attachment_key: "datasets/experiment-1.tar.gz".to_string(),
    output_path: Some("./downloaded-dataset.tar.gz".to_string()),
    ..Default::default()
};

client.group_attachment_download(args).await?;

Advanced Usage

Configuration Options

use netmito::config::ClientConfig;
use url::Url;
use figment::value::magic::RelativePathBuf;

let config = ClientConfig {
    coordinator_addr: Url::parse("https://coordinator.example.com")?,
    credential_path: Some(RelativePathBuf::from("/path/to/credentials")),
    user: Some("api-user".to_string()),
    password: Some("api-password".to_string()),
    retain: true, // Keep existing login state
};

Error Handling

use netmito::error::{Error, ApiError, AuthError};

match client.task_get(task_id).await {
    Ok(task) => println!("Task found: {:?}", task),
    Err(Error::ApiError(ApiError::NotFound(_))) => println!("Task not found"),
    Err(Error::ApiError(ApiError::AuthError(AuthError::PermissionDenied))) => println!("Access denied"),
    Err(e) => println!("Other error: {}", e),
}

Async Patterns

use futures::future::join_all;

// Submit multiple tasks concurrently
let tasks: Vec<_> = (0..10).map(|i| {
    let mut client = client.clone();
    let args = TaskSubmitArgs {
        command: vec!["sleep".to_string(), i.to_string()],
        ..Default::default()
    };
    async move { client.task_submit(args).await }
}).collect();

let results = join_all(tasks).await;
for result in results {
    match result {
        Ok(task_id) => println!("Submitted: {}", task_id),
        Err(e) => println!("Failed: {}", e),
    }
}

Real-time Task Monitoring

use netmito::client::redis::MitoRedisClient;
use netmito::entity::state::TaskExecState;
use tokio::time::{interval, Duration};

// Set up Redis client for real-time updates
let mut redis_client = client.get_redis_client().await?;

// Poll task status periodically
let mut interval = interval(Duration::from_secs(5));
loop {
    interval.tick().await;

    let task = client.task_get(task_id).await?;
    println!("Task status: {:?}", task.exec_state);

    if matches!(task.exec_state, TaskExecState::Completed | TaskExecState::Failed) {
        break;
    }
}

Integration Examples

CI/CD Pipeline Integration

pub struct MitosisPipeline {
    client: MitoClient,
}

impl MitosisPipeline {
    pub async fn run_test_suite(&mut self, commit_hash: &str) -> Result<bool, Box<dyn std::error::Error>> {
        // Submit test tasks
        let test_args = TaskSubmitArgs {
            command: vec!["cargo".to_string(), "test".to_string(), "--release".to_string()],
            group: Some("ci-runners".to_string()),
            tags: vec!["rust".to_string(), "testing".to_string()],
            labels: vec![format!("commit:{}", commit_hash)],
            terminal: true,
            ..Default::default()
        };

        let task_id = self.client.task_submit(test_args).await?;

        // Wait for completion
        let result = self.wait_for_task(task_id).await?;

        // Download test results
        let artifact_args = TaskArtifactDownloadArgs {
            task_id,
            artifact_type: "std-log".to_string(),
            output_path: Some(format!("test-results-{}.tar.gz", commit_hash)),
            ..Default::default()
        };

        self.client.task_artifact_download(artifact_args).await?;

        Ok(result.exec_state == TaskExecState::Completed)
    }
}

Research Computing Workflow

pub struct ResearchWorkflow {
    client: MitoClient,
    group: String,
}

impl ResearchWorkflow {
    pub async fn run_parameter_sweep(&mut self, parameters: Vec<Vec<String>>) -> Result<Vec<Uuid>, Box<dyn std::error::Error>> {
        let mut task_ids = Vec::new();

        for (i, params) in parameters.into_iter().enumerate() {
            let args = TaskSubmitArgs {
                command: vec!["python3".to_string(), "experiment.py".to_string()],
                env: params.into_iter().map(|p| format!("PARAM_{}", p)).collect(),
                group: Some(self.group.clone()),
                labels: vec![format!("sweep:experiment-{}", i)],
                tags: vec!["gpu".to_string(), "python".to_string()],
                terminal: true,
                ..Default::default()
            };

            let task_id = self.client.task_submit(args).await?;
            task_ids.push(task_id);
        }

        Ok(task_ids)
    }

    pub async fn collect_results(&mut self, task_ids: Vec<Uuid>) -> Result<(), Box<dyn std::error::Error>> {
        for task_id in task_ids {
            let args = TaskArtifactDownloadArgs {
                task_id,
                artifact_type: "result".to_string(),
                output_path: Some(format!("results/{}.tar.gz", task_id)),
                ..Default::default()
            };

            self.client.task_artifact_download(args).await?;
        }

        Ok(())
    }
}

API Reference

For complete API documentation, including all available methods, configuration options, and error types, please refer to the API documentation.