deisa.ray.bridge module

Bridge between MPI ranks and the Ray-based analytics system.

This module exposes the Bridge class used by simulation ranks to register their data chunks and exchange information with analytics running on top of Ray.

class deisa.ray.bridge.Bridge(id: int, arrays_metadata: ~typing.Mapping[str, ~typing.Mapping[str, ~typing.Any]], system_metadata: ~typing.Mapping[str, ~typing.Any], *args: ~typing.Any, _node_id: str | None = None, scheduling_actor_cls: ~typing.Type = <deisa.ray.scheduling_actor.ActorClass(SchedulingActor) object>, _init_retries: int = 3, **kwargs: ~typing.Any)[source]

Bases: object

Bridge between MPI ranks and Ray cluster for distributed array processing.

Each Bridge instance is created by an MPI rank to connect to the Ray cluster and send data chunks. Each Bridge is responsible for managing a chunk of data from the decomposed distributed array.

Parameters:
  • _node_id (str or None, optional) – The ID of the node. If None, the ID is taken from the Ray runtime context. Useful for testing with several scheduling actors on a single machine. Default is None.

  • scheduling_actor_cls (Type, optional) – The class to use for creating the scheduling actor. Default is _RealSchedulingActor.

  • _init_retries (int, optional) – Number of retry attempts when initializing the scheduling actor. Default is 3.

node_id

The ID of the node this Bridge is associated with.

Type:

str

scheduling_actor

The Ray actor handle for the scheduling actor.

Type:

RayActorHandle

preprocessing_callbacks

Dictionary mapping array names to their preprocessing callback functions.

Type:

dict[str, Callable]

Notes

The Bridge automatically initializes Ray if it hasn’t been initialized yet. The scheduling actor is created with a detached lifetime to persist beyond the Bridge initialization. The actor uses node affinity scheduling to ensure it runs on the specified node.

Examples

Create a bridge for a simulation rank that owns one array temperature:

arrays_metadata = {
    "temperature": {
        "chunk_shape": (10, 10),
        "nb_chunks_per_dim": (4, 4),
        "nb_chunks_of_node": 1,
        "dtype": np.float64,
        "chunk_position": (0, 0),
    }
}
system_metadata = {
    "nb_ranks": 4,
    "ray_address": "auto",
}

bridge = Bridge(
    id=0,
    arrays_metadata=arrays_metadata,
    system_metadata=system_metadata,
)

bridge.send(
    array_name="temperature",
    chunk=np.zeros((10, 10), dtype=np.float64),
    timestep=0,
)
get(*args: Any, name: str, default: Any | None = None, chunked: bool = False, **kwargs: Any) Any | None[source]

Retrieve information back from Analytics.

Used for two cases:

  1. Retrieve a simple value that is set in the analytics so that the simulation can react to some event that has been detected. This case is asynchronous.

  2. (Planned) Retrieve a distributed array that has been modified by the analytics. This case is synchronous and currently not implemented for chunked=True.

Parameters:
  • name (str) – The name of the key that is being retrieved from the Analytics.

  • default (Any, optional) – The default value to return if the key has not been set or does not exist. Default is None.

  • chunked (bool, optional) – Whether the value that is returned is distributed or not. Should be set to True only if retrieving a distributed array that is handled by the bridge. Currently not implemented. Default is False.

Notes

When chunked is False, this method simply forwards the request to the node actor and returns the result (or default if not set). When chunked is True, a NotImplementedError is raised.

send(*args: Any, array_name: str, chunk: ndarray, timestep: int, chunked: bool = True, store_externally: bool = False, **kwargs: Any) None[source]

Make a chunk of data available to the analytics.

This method applies the preprocessing callback associated with array_name to the chunk, stores it in Ray’s object store, and sends a reference to the node actor. The method blocks until the data is processed by the node actor.

Parameters:
  • array_name (str) – The name of the array this chunk belongs to.

  • chunk (numpy.ndarray) – The chunk of data to be sent to the analytics.

  • timestep (int) – The timestep index for this chunk of data.

  • chunked (bool, optional) – Whether the chunk was produced by the internal chunking logic. Currently reserved for future use. Default is True.

  • store_externally (bool, optional) – If True, the data is stored externally. Not implemented yet. Default is False.

Notes

The chunk is first processed through the preprocessing callback associated with array_name. The processed chunk is then stored in Ray’s object store with the node actor as the owner, ensuring the reference persists even after the simulation script terminates. This method blocks until the node actor has processed the chunk.

Raises:

KeyError – If array_name is not found in the preprocessing callbacks dictionary.

deisa.ray.bridge.get_node_actor_options(name: str, namespace: str) Dict[str, Any][source]

Return Ray options used to create (or get) a node scheduling actor.

Parameters:
  • name (str) – Actor name to use for the node actor.

  • namespace (str) – Ray namespace where the actor will live.

Returns:

Dictionary of options to be passed to SchedulingActor.options.

Return type:

dict

Notes

The options use get_if_exists=True to avoid race conditions when several bridges on the same node attempt to create the same actor. The actor is configured with:

  • lifetime='detached' so it survives the creating task

  • num_cpus=0 so it does not reserve CPU resources

  • a very large max_concurrency because the actor is async-only and used mainly as a coordination point.