deisa.ray.bridge module

Bridge between MPI ranks and the Ray-based analytics system.

This module exposes the Bridge class used by simulation ranks to register their data chunks and exchange information with analytics running on top of Ray.

class deisa.ray.bridge.Bridge(bridge_id: int, arrays_metadata: ~typing.Mapping[str, ~typing.Mapping[str, ~typing.Any]], system_metadata: ~typing.Mapping[str, ~typing.Any], comm: ~deisa.ray.comm.Comm | None = None, *, _node_id: str | None = None, scheduling_actor_cls: ~ray.actor.ActorClass = <deisa.ray.scheduling_actor.ActorClass(SchedulingActor) object>, _init_retries: int = 3, _comm_timeout: int | None = None)[source]

Bases: object

Bridge between MPI ranks and Ray cluster for distributed array processing.

Each Bridge instance is created by an MPI rank to connect to the Ray cluster and send data chunks. Each Bridge is responsible for managing a chunk of data from the decomposed distributed array.

Parameters:
  • bridge_id (int) – Identifier of the MPI rank that owns this bridge instance.

  • arrays_metadata (Mapping[str, Mapping[str, Any]]) – Metadata describing the array layout managed by this bridge.

  • system_metadata (Mapping[str, Any]) – Cluster-wide metadata (e.g., world size, master address/port).

  • comm (Comm, optional) – Custom communication backend to use instead of the default Gloo communicator. If None, init_gloo_comm runs with system_metadata.

  • _node_id (str or None, optional) – Node identifier used for testing or custom scheduling. Defaults to None.

  • scheduling_actor_cls (Type, optional) – Class used to materialize the scheduling actor. Defaults to deisa.ray.scheduling_actor.SchedulingActor.

  • _init_retries (int, optional) – Number of attempts to create and ready the node actor. Defaults to 3.

  • _comm_timeout (int or None, optional) – Timeout (in seconds) for Gloo rendezvous during communicator initialization. Defaults to 120.

node_id

The ID of the node this Bridge is associated with.

Type:

str

Notes

The Bridge automatically initializes Ray if it hasn’t been initialized yet. The scheduling actor is created with a detached lifetime to persist beyond the Bridge initialization. The actor uses node affinity scheduling to ensure it runs on the specified node.

Examples

Create a bridge for a simulation rank that owns one array temperature:

arrays_metadata = {
    "temperature": {
        "chunk_shape": (10, 10),
        "nb_chunks_per_dim": (4, 4),
        "nb_chunks_of_node": 1,
        "dtype": np.float64,
        "chunk_position": (0, 0),
    }
}
system_metadata = {
    "nb_ranks": 4,
    "ray_address": "auto",
}

bridge = Bridge(
    id=0,
    arrays_metadata=arrays_metadata,
    system_metadata=system_metadata
)

bridge.send(
    array_name="temperature",
    chunk=np.zeros((10, 10), dtype=np.float64),
    timestep=0
)
close(*, timestep: int, store_externally: bool = False) None[source]

Close the bridge by signaling analytics that the simulation finished.

Parameters:
  • timestep (int) – The timestep index corresponding to the sentinel chunk.

  • store_externally (bool, optional) – Reserved for future external storage handling. Default is False.

Returns:

Blocks until the sentinel chunk is known by the node actor.

Return type:

None

get(*, name: str, default: Any | None = None, chunked: bool = False) Any | None[source]

Retrieve information back from Analytics.

Used for two cases:

  1. Retrieve a simple value that is set in the analytics so that the simulation can react to some event that has been detected. This case is asynchronous.

  2. (Planned) Retrieve a distributed array that has been modified by the analytics. This case is synchronous and currently not implemented for chunked=True.

Parameters:
  • name (str) – The name of the key that is being retrieved from the Analytics.

  • default (Any, optional) – The default value to return if the key has not been set or does not exist. Default is None.

  • chunked (bool, optional) – Whether the value that is returned is distributed or not. Should be set to True only if retrieving a distributed array that is handled by the bridge. Currently not implemented. Default is False.

Notes

When chunked is False, this method simply forwards the request to the node actor and returns the result (or default if not set). When chunked is True, a NotImplementedError is raised.

Returns:

The value associated with name or default if the key is missing.

Return type:

Any | None

Raises:

NotImplementedError – Chunked retrieval is not implemented yet.

send(*, array_name: str, chunk: ndarray, timestep: int, chunked: bool = True, store_externally: bool = False, test_mode: bool = False) None[source]

Make a chunk of data available to the analytics.

This method stores the chunk in Ray’s object store, and sends a reference to the node actor. The method blocks until the data is processed by the node actor.

Parameters:
  • array_name (str) – The name of the array this chunk belongs to.

  • chunk (numpy.ndarray) – The chunk of data to be sent to the analytics.

  • timestep (int) – The timestep index for this chunk of data.

  • chunked (bool, optional) – Whether the chunk was produced by the internal chunking logic. Currently reserved for future use. Default is True.

  • store_externally (bool, optional) – If True, the data is stored externally. Not implemented yet. Default is False.

  • test_mode (bool, optional) – Reserved flag for future testing or validation hooks. Currently ignored. Default is False.

Notes

The chunk is stored in Ray’s object store with the node actor as the owner, ensuring the reference persists even after the simulation script terminates. This method blocks until the node actor has the chunk.

Raises:

ContractError – When the scheduling node detects a contract violation for the provided chunk.

Returns:

Blocks until the node actor processes the chunk.

Return type:

None