User API

This page describes the user-facing API of deisa-ray. It is written from the behavior of this repository, not only from the deisa-core protocols. The core model is intentionally small:

  • simulation processes create one Bridge per participating rank and call send whenever a local chunk is ready and the simulation chooses to share that timestep with analytics;

  • analytics code creates one Deisa object, registers callbacks, and calls execute_callbacks;

  • callbacks receive lists of DeisaArray objects, where each object wraps a Dask array and the timestep it came from.

Think of send as the point where the simulation exposes a timestep to DEISA. For a distributed array at a given timestep, all bridges that own chunks of that array must share their local chunk for the same timestep before DEISA can assemble the full Dask array seen by analytics.

Callbacks see those shares through a sliding window. A WindowSpec with window_size=N gives the callback up to the most recent N shared timesteps for that array, ordered oldest to newest. This is what lets analysis code combine data across time, for example comparing the newest field with the previous one or computing a derivative over several timesteps.

The simulation and analytics may start in either order. The runtime uses Ray actors to rendezvous, collect chunks, build Dask arrays, and execute Dask graphs close to the data.

Bridges also use a communicator to coordinate with each other. By default, deisa-ray creates a Gloo communicator from system_metadata, but applications that already run under MPI can pass an MPI communicator instead. That communicator is used for fast, efficient bridge-to-bridge coordination, including barriers and feedback broadcasts.

Main imports

from deisa.ray.bridge import Bridge
from deisa.ray.window_handler import Deisa
from deisa.ray.types import DeisaArray, WindowSpec, to_hdf5

Simulation-side API

Bridge

Bridge is the simulation-side entry point.

bridge = Bridge(
    bridge_id=rank,
    arrays_metadata=arrays_metadata,
    system_metadata=system_metadata,
)

Each rank that will ever send data must create a Bridge. The bridge_id must be unique among participating ranks, and there must be a bridge with bridge_id=0. Bridge 0 is special because it sends the final sentinel array used to stop analytics callback execution.

Arguments

bridge_id

Integer identifier for this simulation rank. In MPI programs this is usually the MPI rank.

arrays_metadata

Mapping from array name to metadata for the chunk owned by this bridge. Each value must contain:

chunk_shape

Shape of the local chunk as a tuple/list/1D NumPy array of positive integers.

nb_chunks_per_dim

Number of chunks in each dimension of the global array. The full array shape is approximately chunk_shape * nb_chunks_per_dim dimension by dimension.

nb_chunks_of_node

Number of chunks for this array expected on the current scheduling actor. This is the local completeness count used before forwarding an array timestep.

dtype

NumPy dtype, or a value accepted by numpy.dtype.

chunk_position

Position of this bridge’s chunk in the global chunk grid. It must have the same dimensionality as chunk_shape and each index must be within nb_chunks_per_dim.

system_metadata

Required when comm is omitted. The default communicator uses system_metadata["world_size"], system_metadata["master_address"], and system_metadata["master_port"] to initialize a Gloo process group. If a custom communicator is supplied, this argument may be omitted.

comm

Optional communicator. A raw mpi4py.MPI.Comm is accepted and wrapped automatically. Any custom communicator must expose rank, world_size, barrier(), and broadcast_object(obj, src=0). Bridge.get uses broadcast_object so bridge 0 can query feedback once and share the result with all participating bridges. Passing an MPI communicator is the recommended option when the simulation already has one, because it reuses the simulation’s native fast communication layer instead of creating the default Gloo process group.

_node_id, scheduling_actor_cls, _init_retries, _comm_timeout

Implementation and testing hooks. Normal users should not need them. _comm_timeout controls the default Gloo rendezvous timeout in seconds.

Behavior

During construction, Bridge validates array metadata, initializes or normalizes its communicator, starts Ray with address="auto" if needed, creates or reuses a detached scheduling actor on the local Ray node, registers the chunk metadata with that actor, waits for actor readiness, and finally enters a communicator barrier so all bridges start from a consistent point.

If the default Gloo communicator is used and not all bridge processes connect before the timeout, construction raises the underlying torch distributed rendezvous error with extra context about the expected rank count and master address.

Bridge.send

bridge.send(array_name="temperature", chunk=chunk, timestep=t)

send makes one local chunk available to analytics.

Arguments

array_name

Name of the array. It must match a key in arrays_metadata and the analytics-side WindowSpec name.

chunk

A numpy.ndarray containing this bridge’s local chunk. If the simulation computes on GPU, copy the data to CPU before calling send.

timestep

Integer timestep for the chunk.

chunked

Reserved for future use. The current implementation always sends chunked data through the distributed-array path.

store_externally

Reserved for future external storage support. It is passed through the internal actor layer but external storage is not implemented yet.

test_mode

Reserved test hook and currently ignored.

Behavior

send stores the chunk in Ray’s object store with the scheduling actor as owner, forwards the object reference to the scheduling actor, and blocks until that actor has processed the chunk. If the scheduling layer detects a contract violation, send raises ContractError.

The timestep stream must be non-decreasing globally: all data for timestep i must be sent before any data for timestep j > i. The analytics loop checks this ordering and raises if it observes an older timestep after a newer one.

Bridge.close

bridge.close(timestep=final_timestep)

close tells analytics that the simulation has finished. All bridges must call it because it begins with a communicator barrier. After the barrier, bridge 0 sends an internal sentinel array named "__deisa_last_iteration_array". Analytics stop when that sentinel is received.

timestep is the timestep attached to the sentinel. A common pattern is to use the first timestep after the last real send. close returns the same integer.

Bridge.get

value = bridge.get("cooling_factor", timestep=t)

get retrieves a timestamped feedback value published by analytics with Deisa.set. It is a collective operation: when a communicator is used, every bridge must call get in the same order for a given key/timestep lookup. Bridge 0 queries the global head actor and broadcasts the result to the other bridges.

Arguments

name

Feedback key to read.

timestep

Optional timestep associated with the requested feedback value. When it is provided, get returns the value for exactly that timestep, or None if no value is currently retained for that key/timestep. When omitted, get returns the retained queue for name as a list of (timestep, value) pairs, or None if no feedback exists for the key.

Behavior

Feedback values are stored in fixed-size per-key queues on the head actor. Older entries may be dropped after analytics publishes more values than the queue can hold. A missing value returns None; there is no default argument in the current API.

Feedback is asynchronous and intentionally opportunistic. Analytics callbacks run after DEISA has observed a later timestep, or the close sentinel, so feedback for the final simulated timestep may only become visible during shutdown. Simulation correctness should not depend on observing an analytics event at one exact timestep.

Analytics-side API

Deisa

Deisa is the analytics-side entry point.

deisa = Deisa(max_simulation_ahead=1, feedback_queue_size=1024)

Arguments

ray_start

Optional callable used to start Ray. If omitted, Deisa calls ray.init(address="auto", log_to_driver=False, logging_level=logging.ERROR) when it first connects.

max_simulation_ahead

Number of timesteps the simulation may be ahead of analytics before scheduling applies back-pressure. The default is 1.

feedback_queue_size

Maximum number of feedback entries retained per key on the head actor. The default is 1024. It must be greater than zero.

Behavior

The constructor is intentionally cheap: it records configuration but does not start Ray. Ray connection, Dask scheduler configuration, and head actor creation happen lazily when registering callbacks or executing them.

Deisa.register_callback

def summary(temperature: list[DeisaArray]):
    latest = temperature[-1]
    print(latest.t, latest.dask.mean().compute())

deisa.register_callback(
    summary,
    [WindowSpec("temperature", window_size=3)],
    when="AND",
)

Registers a callback to be called from execute_callbacks.

Arguments

simulation_callback

Callable that receives keyword arguments named after each WindowSpec. Each argument is a list[DeisaArray]. For ergonomic callbacks, choose array names that are valid Python parameter names, or write the callback to accept **kwargs.

arrays_spec

List of WindowSpec objects describing which arrays the callback needs and how many timesteps should be kept for each array.

exception_handler

Optional callable invoked when the user callback raises. If omitted, the default handler prints/logs the exception and callback execution continues. TimeoutError and AssertionError are re-raised by the execution loop.

when

"AND" or "OR". With "AND", the callback runs only when every requested array has new data for the current timestep. With "OR", the callback runs after all requested arrays have been seen at least once and at least one requested array has new data; arrays without new data reuse their latest window.

Returns

Returns the original callback so the method can be used by decorator helpers and by code that wants to keep the callable.

Deisa.callback

@deisa.callback(WindowSpec("temperature"), WindowSpec("pressure"), when="OR")
def compare(temperature: list[DeisaArray], pressure: list[DeisaArray]):
    ...

Decorator form of register_callback. It accepts WindowSpec objects as positional arguments and the same exception_handler and when keyword arguments.

Deisa.execute_callbacks

deisa.execute_callbacks()

Starts the analytics loop. At least one callback must be registered first, otherwise RuntimeError is raised.

Behavior

execute_callbacks marks analytics as ready, builds one internal queue per array, waits for arrays from the head actor, and calls every registered callback whose when condition is satisfied. The loop ends when bridge 0 sends the internal final sentinel through Bridge.close.

Callback arguments are always lists. With no explicit window size, the list has one element: the latest DeisaArray. With window_size=N, the list contains up to the last N arrays for that name, ordered oldest to newest. Early timesteps may have shorter lists, so callbacks that require a full window should check len(window) before computing.

Deisa.set

deisa.set("cooling_factor", value=0.5, timestep=latest.t)

Publishes a timestamped feedback value from analytics so simulation bridges can retrieve it collectively with Bridge.get.

Arguments

key

Hashable feedback key.

value

Python object to store.

timestep

Timestep associated with value. For a given key, feedback timesteps must be strictly increasing. Publishing the same timestep twice, or publishing an older timestep after a newer one, raises ValueError. Timesteps for a key must also be mutually comparable; otherwise the order check raises TypeError.

Behavior

set ensures the analytics side is connected, then stores (timestep, value) on the head actor’s fixed-size feedback queue for key. The call waits for the head actor to accept the value, but simulation-side observation is still asynchronous because bridges poll/read feedback independently of callback execution.

Callback data types

WindowSpec

WindowSpec("temperature")
WindowSpec("temperature", window_size=3)

WindowSpec describes one callback input.

name

Array name. It must match Bridge metadata and Bridge.send.

window_size

Number of recent timesteps to pass to the callback. None means only the latest array is passed. A positive integer creates a sliding window with up to that many entries.

The runtime allocates internal queues large enough for the largest requested window per array across all registered callbacks.

DeisaArray

DeisaArray is the object passed inside callback windows.

dask

A dask.array.Array representing the full distributed array for one timestep. Use normal Dask operations, then call compute or persist when you want execution.

t

Integer timestep associated with the array.

to_hdf5(fname, dataset)

Convenience method that writes this array to an HDF5 virtual dataset.

to_zarr(...)

Convenience wrapper around dask.array.to_zarr. The array is persisted before writing.

to_hdf5

to_hdf5(
    "state.h5",
    {
        "temperature": temperature[-1],
        "pressure": pressure[-1],
    },
)

Writes one or more DeisaArray objects to one HDF5 file using HDF5 virtual datasets. Chunk payloads are written to hidden chunk files named from the final file, dataset name, and chunk position; the final HDF5 file links those chunks through VDS. Dataset names with unusual filesystem characters may produce awkward chunk filenames, so simple dataset names are recommended.

Execution guarantees and assumptions

Array names

The same array name must be used in arrays_metadata, Bridge.send, and WindowSpec. Analytics callbacks receive keyword arguments with those names.

Participating ranks

Any rank that will ever send data must instantiate a Bridge. The participating world size is fixed at startup.

Master bridge

bridge_id=0 must exist. It is responsible for emitting the final sentinel in Bridge.close.

Timestep ordering

Timesteps must be sent in non-decreasing order. The system assumes it can process all arrays for timestep i before processing timestep i+1.

Window ordering

Callback windows are ordered oldest to newest. The most recent item is window[-1].

Collective operations

Bridge construction with the default communicator and Bridge.close are collective over participating bridges. Bridge.get is also collective when a communicator is used because bridge 0 broadcasts the lookup result. All participating bridges must enter these operations in compatible order.

Feedback ordering

Deisa.set stores timestamped feedback in per-key queues. For each key, feedback timesteps must be strictly increasing. Bridge.get returns None for missing values, and may miss values that are too old for the retained queue.

Data locality

Bridge.send stores chunks in Ray’s object store under the local scheduling actor. Dask graphs are then scheduled by Ray, with optional experimental distributed scheduling controlled by project configuration.

Current limitations

  • Feedback is for small Python objects. Distributed-array feedback from analytics back to simulation is not part of the current user API.

  • External storage for Bridge.send(..., store_externally=True) is not implemented.

  • Bridge.send expects CPU numpy.ndarray inputs.

  • The theoretical deisa-core API includes protocol methods such as get_array, delete, and close on the analytics object. Those are not the concrete user API of this Ray implementation today.

Minimal end-to-end shape

Simulation:

bridge = Bridge(
    bridge_id=rank,
    arrays_metadata={
        "temperature": {
            "chunk_shape": (64, 64),
            "nb_chunks_per_dim": (4, 4),
            "nb_chunks_of_node": 4,
            "dtype": np.float64,
            "chunk_position": chunk_position,
        },
    },
    system_metadata={
        "world_size": world_size,
        "master_address": "127.0.0.1",
        "master_port": 29500,
    },
)

for timestep in range(10):
    bridge.send(
        array_name="temperature",
        chunk=make_temperature_chunk(timestep),
        timestep=timestep,
    )

bridge.close(timestep=10)

Analytics:

deisa = Deisa()

@deisa.callback(WindowSpec("temperature", window_size=3))
def analyze_temperature(temperature: list[DeisaArray]):
    if len(temperature) < 3:
        return

    newest = temperature[-1]
    mean_value = newest.dask.mean().compute()
    print("timestep", newest.t, "mean", mean_value)

deisa.execute_callbacks()