oumi.launcher.clients

Contents

oumi.launcher.clients#

Submodules#

oumi.launcher.clients.local_client module#

class oumi.launcher.clients.local_client.LocalClient[source]#

Bases: object

A client for running jobs locally in a subprocess.

cancel(job_id) JobStatus | None[source]#

Cancels the specified job.

Parameters:
  • job_id – The ID of the job to cancel.

  • queue – The name of the queue to search.

Returns:

The job status if found, None otherwise.

get_job(job_id: str) JobStatus | None[source]#

Gets the specified job’s status.

Parameters:

job_id – The ID of the job to get.

Returns:

The job status if found, None otherwise.

list_jobs() list[JobStatus][source]#

Returns a list of job statuses.

submit_job(job: JobConfig) JobStatus[source]#

Runs the specified job on this cluster.

oumi.launcher.clients.modal_client module#

Client wrapping the Modal SDK for the Oumi launcher.

Modal (modal.com) is a serverless GPU platform. There is no long-lived cluster concept — every job is a modal.Sandbox that persists beyond the calling Python process. ModalClient translates a JobConfig into a sandbox launch and exposes status/cancel/log primitives via the sandbox’s opaque object_id.

Image, GPU, secrets, and a workspace-scoped HuggingFace cache volume are derived from the JobConfig at launch time. setup and run are concatenated into a single shell script and executed together inside the sandbox so secrets injected via modal.Secret are visible (image-build time has no secrets attached). Sandboxes are tagged with the caller’s logical cluster name so ModalCluster.down() can find and terminate them across worker restarts.

class oumi.launcher.clients.modal_client.ModalClient[source]#

Bases: object

A wrapped client for communicating with Modal.

Tracks the cluster_name → sandbox_ids mapping in-process so ModalCluster.down() can find the sandboxes spawned under a given logical cluster name. Across worker restarts the mapping is lost, so cleanup falls back to per-sandbox cancel via job_id which the worker also persists in the operation record.

cancel(call_id: str) None[source]#

Terminates the sandbox if it is still running.

find_sandboxes_for_cluster(cluster_name: str) list[str][source]#

Returns the sandbox IDs tagged with cluster_name on Modal.

Stateless lookup via Sandbox.list(tags=...) — works across worker restarts (unlike sandboxes_for_cluster(), which only sees launches from the current process). Falls back to the in-process tracker if the Modal API call fails or returns nothing.

get_call(call_id: str) Any[source]#

Resolves a Sandbox by its opaque ID, raising if missing.

get_logs_stream(call_id: str) ModalLogStream[source]#

Returns a readline()-capable stream of stdout+stderr.

Uses StreamReader.read() rather than iterating the stream directly — iteration only works while the sandbox is still live; after termination it returns nothing. Workers call this after a FAILED transition, so the batch-read path is the right default.

get_status(call_id: str) JobStatus[source]#

Returns the current JobStatus for call_id.

launch(job: JobConfig, cluster_name: str | None = None, **kwargs: Any) JobStatus[source]#

Creates a detached modal.Sandbox for the provided job.

Modal has no native cluster concept. The cluster_name argument becomes a logical label returned on JobStatus.cluster so the caller can group multiple sandboxes (e.g. retries) under one name. JobStatus.id is the opaque Sandbox.object_id and is the canonical handle for status / cancel / log lookups.

We use Sandbox (not Function.spawn) because sandboxes persist beyond the Python process that creates them, which is the lifecycle our launcher pattern needs.

sandboxes_for_cluster(cluster_name: str) list[str][source]#

Returns the sandbox IDs spawned under cluster_name in this process.

class oumi.launcher.clients.modal_client.ModalLogStream(iterator: Iterator[str])[source]#

Bases: TextIOBase

Wraps a Modal log iterator into a readline()-capable stream.

readline() str[source]#

Reads the next chunk from the wrapped iterator.

oumi.launcher.clients.polaris_client module#

class oumi.launcher.clients.polaris_client.PolarisClient(user: str)[source]#

Bases: object

A client for communicating with Polaris at ALCF.

class SupportedQueues(value)[source]#

Bases: Enum

Enum representing the supported queues on Polaris.

For more details, see: https://docs.alcf.anl.gov/polaris/running-jobs/#queues

DEBUG = 'debug'#
DEBUG_SCALING = 'debug-scaling'#
DEMAND = 'demand'#
PREEMPTABLE = 'preemptable'#
PROD = 'prod'#
cancel(job_id, queue: SupportedQueues) JobStatus | None[source]#

Cancels the specified job.

Parameters:
  • job_id – The ID of the job to cancel.

  • queue – The name of the queue to search.

Returns:

The job status if found, None otherwise.

static get_active_users() list[str][source]#

Gets the list of users with an open SSH tunnel to Polaris.

Returns:

A list of users.

get_job(job_id: str, queue: SupportedQueues) JobStatus | None[source]#

Gets the specified job’s status.

Parameters:
  • job_id – The ID of the job to get.

  • queue – The name of the queue to search.

Returns:

The job status if found, None otherwise.

list_jobs(queue: SupportedQueues) list[JobStatus][source]#

Lists a list of job statuses for the given queue.

Returns:

A list of dictionaries, each containing the status of a cluster.

put(file_contents: str, destination: str) None[source]#

Puts the specified file contents to the remote path.

Parameters:
  • file_contents – The contents of the file to write.

  • destination – The remote path to write the file to.

put_recursive(source: str, destination: str) None[source]#

Puts the specified file/directory to the remote path using rsync.

Parameters:
  • source – The local file/directory to write.

  • destination – The remote path to write the file/directory to.

run_commands(commands: list[str]) PolarisResponse[source]#

Runs the provided commands in a single SSH command.

Parameters:

commands – The commands to run.

submit_job(job_path: str, working_dir: str, node_count: int, queue: SupportedQueues, name: str | None) str[source]#

Submits the specified job script to Polaris.

Parameters:
  • job_path – The path to the job script to submit.

  • working_dir – The working directory to submit the job from.

  • node_count – The number of nodes to use for the job.

  • queue – The name of the queue to submit the job to.

  • name – The name of the job (optional).

Returns:

The ID of the submitted job.

class oumi.launcher.clients.polaris_client.PolarisResponse(stdout: str, stderr: str, exit_code: int)[source]#

Bases: object

A response from Polaris.

exit_code: int#
stderr: str#
stdout: str#
oumi.launcher.clients.polaris_client.retry_auth(user_function)[source]#

Decorator to ensure auth is fresh before calling a function.

oumi.launcher.clients.sky_client module#

class oumi.launcher.clients.sky_client.SkyClient[source]#

Bases: object

A wrapped client for communicating with Sky Pilot.

class SupportedClouds(value)[source]#

Bases: Enum

Enum representing the supported clouds.

AWS = 'aws'#
AZURE = 'azure'#
GCP = 'gcp'#
K8S = 'k8s'#
LAMBDA = 'lambda'#
NEBIUS = 'nebius'#
RUNPOD = 'runpod'#
SLURM = 'sky-slurm'#
cancel(cluster_name: str, job_id: str) None[source]#

Gets the job queue of a cluster.

Parameters:
  • cluster_name – The name of the cluster to cancel the job on.

  • job_id – The ID of the job to cancel.

down(cluster_name: str) None[source]#

Tears down the target cluster.

Parameters:

cluster_name – The name of the cluster to tear down.

exec(job: JobConfig, cluster_name: str) str[source]#

Executes the specified job on the target cluster.

Parameters:
  • job – The job to execute.

  • cluster_name – The name of the cluster to execute the job on.

Returns:

The ID of the job that was created.

get_cluster_hourly_price(cluster_name: str) float | None[source]#

Gets the hourly price for a cluster from its resource handle.

Parameters:

cluster_name – The name of the cluster.

Returns:

The hourly price in USD, or None if unavailable.

get_logs_stream(cluster_name: str, job_id: str | None = None) SkyLogStream[source]#

Gets a stream that tails the logs of the target job.

Parameters:
  • cluster_name – The name of the cluster the job was run in.

  • job_id – The ID of the job to tail the logs of.

Returns:

A SkyLogStream object containing the captured logs.

launch(job: JobConfig, cluster_name: str | None = None, **kwargs) JobStatus[source]#

Creates a cluster and starts the provided Job.

Parameters:
  • job – The job to execute on the cluster.

  • cluster_name – The name of the cluster to create.

  • kwargs – Additional arguments to pass to the Sky Pilot client.

Returns:

A JobStatus with only id and cluster populated.

queue(cluster_name: str) list[dict] | list[sky.schemas.api.responses.ClusterJobRecord][source]#

Gets the job queue of a cluster.

Parameters:

cluster_name – The name of the cluster to get the queue for.

Returns:

A list of dictionaries, each containing the metadata of a cluster.

status()[source]#

Gets a list of cluster statuses.

Returns:

A list of dictionaries, each containing the status of a cluster.

stop(cluster_name: str) None[source]#

Stops the target cluster.

Parameters:

cluster_name – The name of the cluster to stop.

class oumi.launcher.clients.sky_client.SkyLogStream(iterator: Iterator[str | None])[source]#

Bases: TextIOBase

Wraps a log iterator into a readline()-capable stream.

readline() str[source]#

Reads a line from the log stream.

oumi.launcher.clients.slurm_client module#

class oumi.launcher.clients.slurm_client.SlurmClient(user: str, slurm_host: str, cluster_name: str)[source]#

Bases: object

A client for communicating with a Slurm cluster.

cancel(job_id) JobStatus | None[source]#

Cancels the specified job.

Parameters:

job_id – The ID of the job to cancel.

Returns:

The job status if found, None otherwise.

static get_active_users(slurm_host: str) list[str][source]#

Gets the list of users with an open SSH tunnel to a Slurm cluster.

Returns:

A list of users.

get_job(job_id: str) JobStatus | None[source]#

Gets the specified job’s status.

Queries squeue for active jobs, then falls back to scontrol show job for jobs that have left the queue but are still retained by slurmctld (per MinJobAge).

Parameters:

job_id – The ID of the job to get.

Returns:

The job status if found, None otherwise.

get_latest_job() JobStatus | None[source]#

Gets the most recent job on this cluster.

get_logs_stream(cluster_name: str, job_id: str | None = None) SlurmLogStream[source]#

Gets a stream that tails the logs of the target job.

Parameters:
  • cluster_name – The name of the cluster the job was run in.

  • job_id – The ID of the job to tail the logs of.

list_jobs() list[JobStatus][source]#

Lists all jobs for the current user.

Returns:

A list of JobStatus.

put(file_contents: str, destination: str) None[source]#

Puts the specified file contents to the remote path.

Parameters:
  • file_contents – The contents of the file to write.

  • destination – The remote path to write the file to.

put_recursive(source: str, destination: str) None[source]#

Puts the specified file/directory to the remote path using rsync.

Parameters:
  • source – The local file/directory to write.

  • destination – The remote path to write the file/directory to.

run_commands(commands: list[str]) SlurmResponse[source]#

Runs the provided commands in a single SSH command.

Parameters:

commands – The commands to run.

submit_job(job_path: str, working_dir: str, node_count: int, name: str | None, *, export: str | list[str] | None = None, account: str | None = None, ntasks: int | None = None, threads_per_core: int | None = None, distribution: str | None = None, partition: str | None = None, qos: str | None = None, stdout_file: str | None = '$HOME/oumi_slurm_logs/%j.out', stderr_file: str | None = None, **kwargs) str[source]#

Submits the specified job script to Slurm.

Parameters:
  • job_path – The path to the job script to submit.

  • working_dir – The working directory to submit the job from.

  • node_count – The number of nodes to use for the job.

  • name – The name of the job (optional).

  • export – Environment variables to export. Special values: “NONE” nothing to export, “ALL” export all env vars.

  • account – Charge job to specified account/project.

  • ntasks – Total number of tasks to run.

  • threads_per_core – Number of threads per core to allocate e.g., 1 to allow only one thread per core, or 2 to make use of hyper-threading.

  • distribution – Distribution method for processes to nodes (type = block|cyclic|arbitrary)

  • partition – Partition (aka queue) requested.

  • qos – QoS (aka the queue on Perlmutter) requested.

  • stdout_file – The file to write the stdout to.

  • stderr_file – The file to write the stderr to.

  • kwargs – Additional flags to pass to sbatch. Hyphens in the flag name are replaced with underscores. For example, foo_bar=baz as a kwarg will add “–foo-bar=baz” to the sbatch command.

Returns:

The ID of the submitted job.

class oumi.launcher.clients.slurm_client.SlurmLogStream(cluster_name: str, job_id: str, client: SlurmClient)[source]#

Bases: TextIOBase

A stream that provides access to job logs.

This class inherits from io.TextIOBase to provide a file-like interface for reading job logs.

Expected Order of Events:

  1. Initialization: Starts a tail process to monitor the job’s stdout file and launches a background thread to check job status.

  2. Active Reading: While the job is running, reads lines directly from the tail process stdout in real-time.

  3. Job Completion: The background thread detects when the job is done and terminates the tail process.

  4. Cleanup: When the stream is closed, resources are cleaned up.

State Transitions:

  • _proc: None → subprocess.Popen → None (after termination)

__enter__()[source]#

Context manager entry.

__exit__(exc_type, exc_val, exc_tb)[source]#

Context manager exit.

close() None[source]#

Close the stream and clean up resources.

readline() str[source]#

Read a line from the stream.

Returns:

The line read from the stream, or an empty string if the stream is closed.

class oumi.launcher.clients.slurm_client.SlurmResponse(stdout: str, stderr: str, exit_code: int)[source]#

Bases: object

A response from Slurm.

exit_code: int#
stderr: str#
stdout: str#
oumi.launcher.clients.slurm_client.retry_auth(user_function: Callable) Callable[source]#

Decorator to ensure auth is fresh before calling a function.