deisa.ray.bridge module¶
Bridge between MPI ranks and the Ray-based analytics system.
This module exposes the Bridge class used by simulation ranks to
register their data chunks and exchange information with analytics running on
top of Ray.
- class deisa.ray.bridge.Bridge(bridge_id: int, arrays_metadata: ~typing.Mapping[str, ~typing.Mapping[str, ~typing.Any]], system_metadata: ~typing.Mapping[str, ~typing.Any], comm: ~deisa.ray.comm.Comm | None = None, *, _node_id: str | None = None, scheduling_actor_cls: ~ray.actor.ActorClass = <deisa.ray.scheduling_actor.ActorClass(SchedulingActor) object>, _init_retries: int = 3, _comm_timeout: int | None = None)[source]¶
Bases:
objectBridge between MPI ranks and Ray cluster for distributed array processing.
Each Bridge instance is created by an MPI rank to connect to the Ray cluster and send data chunks. Each Bridge is responsible for managing a chunk of data from the decomposed distributed array.
- Parameters:
bridge_id (int) – Identifier of the MPI rank that owns this bridge instance.
arrays_metadata (Mapping[str, Mapping[str, Any]]) – Metadata describing the array layout managed by this bridge.
system_metadata (Mapping[str, Any]) – Cluster-wide metadata (e.g., world size, master address/port).
comm (Comm, optional) – Custom communication backend to use instead of the default Gloo communicator. If
None,init_gloo_commruns withsystem_metadata._node_id (str or None, optional) – Node identifier used for testing or custom scheduling. Defaults to
None.scheduling_actor_cls (Type, optional) – Class used to materialize the scheduling actor. Defaults to
deisa.ray.scheduling_actor.SchedulingActor._init_retries (int, optional) – Number of attempts to create and ready the node actor. Defaults to 3.
_comm_timeout (int or None, optional) – Timeout (in seconds) for Gloo rendezvous during communicator initialization. Defaults to 120.
- node_id¶
The ID of the node this Bridge is associated with.
- Type:
str
Notes
The Bridge automatically initializes Ray if it hasn’t been initialized yet. The scheduling actor is created with a detached lifetime to persist beyond the Bridge initialization. The actor uses node affinity scheduling to ensure it runs on the specified node.
Examples
Create a bridge for a simulation rank that owns one array
temperature:arrays_metadata = { "temperature": { "chunk_shape": (10, 10), "nb_chunks_per_dim": (4, 4), "nb_chunks_of_node": 1, "dtype": np.float64, "chunk_position": (0, 0), } } system_metadata = { "nb_ranks": 4, "ray_address": "auto", } bridge = Bridge( id=0, arrays_metadata=arrays_metadata, system_metadata=system_metadata ) bridge.send( array_name="temperature", chunk=np.zeros((10, 10), dtype=np.float64), timestep=0 )
- close(*, timestep: int, store_externally: bool = False) None[source]¶
Close the bridge by signaling analytics that the simulation finished.
- Parameters:
timestep (int) – The timestep index corresponding to the sentinel chunk.
store_externally (bool, optional) – Reserved for future external storage handling. Default is False.
- Returns:
Blocks until the sentinel chunk is known by the node actor.
- Return type:
None
- get(*, name: str, default: Any | None = None, chunked: bool = False) Any | None[source]¶
Retrieve information back from Analytics.
Used for two cases:
Retrieve a simple value that is set in the analytics so that the simulation can react to some event that has been detected. This case is asynchronous.
(Planned) Retrieve a distributed array that has been modified by the analytics. This case is synchronous and currently not implemented for
chunked=True.
- Parameters:
name (str) – The name of the key that is being retrieved from the Analytics.
default (Any, optional) – The default value to return if the key has not been set or does not exist. Default is None.
chunked (bool, optional) – Whether the value that is returned is distributed or not. Should be set to True only if retrieving a distributed array that is handled by the bridge. Currently not implemented. Default is False.
Notes
When
chunkedis False, this method simply forwards the request to the node actor and returns the result (ordefaultif not set). Whenchunkedis True, aNotImplementedErroris raised.- Returns:
The value associated with
nameordefaultif the key is missing.- Return type:
Any | None
- Raises:
NotImplementedError – Chunked retrieval is not implemented yet.
- send(*, array_name: str, chunk: ndarray, timestep: int, chunked: bool = True, store_externally: bool = False, test_mode: bool = False) None[source]¶
Make a chunk of data available to the analytics.
This method stores the
chunkin Ray’s object store, and sends a reference to the node actor. The method blocks until the data is processed by the node actor.- Parameters:
array_name (str) – The name of the array this chunk belongs to.
chunk (numpy.ndarray) – The chunk of data to be sent to the analytics.
timestep (int) – The timestep index for this chunk of data.
chunked (bool, optional) – Whether the chunk was produced by the internal chunking logic. Currently reserved for future use. Default is True.
store_externally (bool, optional) – If True, the data is stored externally. Not implemented yet. Default is False.
test_mode (bool, optional) – Reserved flag for future testing or validation hooks. Currently ignored. Default is False.
Notes
The chunk is stored in Ray’s object store with the node actor as the owner, ensuring the reference persists even after the simulation script terminates. This method blocks until the node actor has the chunk.
- Raises:
ContractError – When the scheduling node detects a contract violation for the provided chunk.
- Returns:
Blocks until the node actor processes the chunk.
- Return type:
None