deisa.ray.bridge module¶
Bridge between MPI ranks and the Ray-based analytics system.
This module exposes the Bridge class used by simulation ranks to
register their data chunks and exchange information with analytics running on
top of Ray.
- class deisa.ray.bridge.Bridge(id: int, arrays_metadata: ~typing.Mapping[str, ~typing.Mapping[str, ~typing.Any]], system_metadata: ~typing.Mapping[str, ~typing.Any], *args: ~typing.Any, _node_id: str | None = None, scheduling_actor_cls: ~typing.Type = <deisa.ray.scheduling_actor.ActorClass(SchedulingActor) object>, _init_retries: int = 3, **kwargs: ~typing.Any)[source]¶
Bases:
objectBridge between MPI ranks and Ray cluster for distributed array processing.
Each Bridge instance is created by an MPI rank to connect to the Ray cluster and send data chunks. Each Bridge is responsible for managing a chunk of data from the decomposed distributed array.
- Parameters:
_node_id (str or None, optional) – The ID of the node. If None, the ID is taken from the Ray runtime context. Useful for testing with several scheduling actors on a single machine. Default is None.
scheduling_actor_cls (Type, optional) – The class to use for creating the scheduling actor. Default is _RealSchedulingActor.
_init_retries (int, optional) – Number of retry attempts when initializing the scheduling actor. Default is 3.
- node_id¶
The ID of the node this Bridge is associated with.
- Type:
str
- scheduling_actor¶
The Ray actor handle for the scheduling actor.
- Type:
RayActorHandle
- preprocessing_callbacks¶
Dictionary mapping array names to their preprocessing callback functions.
- Type:
dict[str, Callable]
Notes
The Bridge automatically initializes Ray if it hasn’t been initialized yet. The scheduling actor is created with a detached lifetime to persist beyond the Bridge initialization. The actor uses node affinity scheduling to ensure it runs on the specified node.
Examples
Create a bridge for a simulation rank that owns one array
temperature:arrays_metadata = { "temperature": { "chunk_shape": (10, 10), "nb_chunks_per_dim": (4, 4), "nb_chunks_of_node": 1, "dtype": np.float64, "chunk_position": (0, 0), } } system_metadata = { "nb_ranks": 4, "ray_address": "auto", } bridge = Bridge( id=0, arrays_metadata=arrays_metadata, system_metadata=system_metadata, ) bridge.send( array_name="temperature", chunk=np.zeros((10, 10), dtype=np.float64), timestep=0, )
- get(*args: Any, name: str, default: Any | None = None, chunked: bool = False, **kwargs: Any) Any | None[source]¶
Retrieve information back from Analytics.
Used for two cases:
Retrieve a simple value that is set in the analytics so that the simulation can react to some event that has been detected. This case is asynchronous.
(Planned) Retrieve a distributed array that has been modified by the analytics. This case is synchronous and currently not implemented for
chunked=True.
- Parameters:
name (str) – The name of the key that is being retrieved from the Analytics.
default (Any, optional) – The default value to return if the key has not been set or does not exist. Default is None.
chunked (bool, optional) – Whether the value that is returned is distributed or not. Should be set to True only if retrieving a distributed array that is handled by the bridge. Currently not implemented. Default is False.
Notes
When
chunkedis False, this method simply forwards the request to the node actor and returns the result (ordefaultif not set). Whenchunkedis True, aNotImplementedErroris raised.
- send(*args: Any, array_name: str, chunk: ndarray, timestep: int, chunked: bool = True, store_externally: bool = False, **kwargs: Any) None[source]¶
Make a chunk of data available to the analytics.
This method applies the preprocessing callback associated with
array_nameto the chunk, stores it in Ray’s object store, and sends a reference to the node actor. The method blocks until the data is processed by the node actor.- Parameters:
array_name (str) – The name of the array this chunk belongs to.
chunk (numpy.ndarray) – The chunk of data to be sent to the analytics.
timestep (int) – The timestep index for this chunk of data.
chunked (bool, optional) – Whether the chunk was produced by the internal chunking logic. Currently reserved for future use. Default is True.
store_externally (bool, optional) – If True, the data is stored externally. Not implemented yet. Default is False.
Notes
The chunk is first processed through the preprocessing callback associated with
array_name. The processed chunk is then stored in Ray’s object store with the node actor as the owner, ensuring the reference persists even after the simulation script terminates. This method blocks until the node actor has processed the chunk.- Raises:
KeyError – If
array_nameis not found in the preprocessing callbacks dictionary.
- deisa.ray.bridge.get_node_actor_options(name: str, namespace: str) Dict[str, Any][source]¶
Return Ray options used to create (or get) a node scheduling actor.
- Parameters:
name (str) – Actor name to use for the node actor.
namespace (str) – Ray namespace where the actor will live.
- Returns:
Dictionary of options to be passed to
SchedulingActor.options.- Return type:
dict
Notes
The options use
get_if_exists=Trueto avoid race conditions when several bridges on the same node attempt to create the same actor. The actor is configured with:lifetime='detached'so it survives the creating tasknum_cpus=0so it does not reserve CPU resourcesa very large
max_concurrencybecause the actor is async-only and used mainly as a coordination point.