"""Bridge between MPI ranks and the Ray-based analytics system.
This module exposes the :class:`Bridge` class used by simulation ranks to
register their data chunks and exchange information with analytics running on
top of Ray.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, Mapping, Optional
import numpy as np
import ray
from ray.actor import ActorClass
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from deisa.ray.scheduling_actor import SchedulingActor as _RealSchedulingActor
from deisa.ray.types import RayActorHandle
from deisa.ray.errors import ContractError, _default_exception_handler
from deisa.ray.comm import Comm, init_gloo_comm
from deisa.ray.validate import _validate_arrays_meta, _validate_system_meta
from deisa.ray.utils import get_node_actor_options
import torch.distributed as dist
logger = logging.getLogger(__name__)
[docs]
class Bridge:
"""
Bridge between MPI ranks and Ray cluster for distributed array processing.
Each Bridge instance is created by an MPI rank to connect to the Ray cluster
and send data chunks. Each Bridge is responsible for managing a chunk of data
from the decomposed distributed array.
Parameters
----------
bridge_id : int
Identifier of the MPI rank that owns this bridge instance.
arrays_metadata : Mapping[str, Mapping[str, Any]]
Metadata describing the array layout managed by this bridge.
system_metadata : Mapping[str, Any]
Cluster-wide metadata (e.g., world size, master address/port).
comm : Comm, optional
Custom communication backend to use instead of the default Gloo
communicator. If ``None``, ``init_gloo_comm`` runs with ``system_metadata``.
_node_id : str or None, optional
Node identifier used for testing or custom scheduling. Defaults to ``None``.
scheduling_actor_cls : Type, optional
Class used to materialize the scheduling actor. Defaults to
:class:`deisa.ray.scheduling_actor.SchedulingActor`.
_init_retries : int, optional
Number of attempts to create and ready the node actor. Defaults to 3.
_comm_timeout : int or None, optional
Timeout (in seconds) for Gloo rendezvous during communicator
initialization. Defaults to 120.
Attributes
----------
node_id : str
The ID of the node this Bridge is associated with.
Notes
-----
The Bridge automatically initializes Ray if it hasn't been initialized yet.
The scheduling actor is created with a detached lifetime to persist beyond
the Bridge initialization. The actor uses node affinity scheduling to ensure
it runs on the specified node.
Examples
--------
Create a bridge for a simulation rank that owns one array ``temperature``::
arrays_metadata = {
"temperature": {
"chunk_shape": (10, 10),
"nb_chunks_per_dim": (4, 4),
"nb_chunks_of_node": 1,
"dtype": np.float64,
"chunk_position": (0, 0),
}
}
system_metadata = {
"nb_ranks": 4,
"ray_address": "auto",
}
bridge = Bridge(
id=0,
arrays_metadata=arrays_metadata,
system_metadata=system_metadata
)
bridge.send(
array_name="temperature",
chunk=np.zeros((10, 10), dtype=np.float64),
timestep=0
)
"""
# TODO: add exception handler? what should default be? If bridge is not instantiated,
# should sim crash? Keep going?
def __init__(
self,
bridge_id: int,
arrays_metadata: Mapping[str, Mapping[str, Any]],
system_metadata: Mapping[str, Any],
comm: Optional[Comm] = None,
*,
_node_id: str | None = None,
scheduling_actor_cls: ActorClass = _RealSchedulingActor,
_init_retries: int = 3,
_comm_timeout: Optional[int] = None,
) -> None:
"""
Initialize the Bridge to connect MPI rank to Ray cluster.
Parameters
----------
bridge_id : int
Unique identifier of this Bridge.
arrays_metadata : Mapping[str, Mapping[str, Any]]
Dictionary that describes the arrays being shared by the simulation.
Keys represent the name of the array while the values are
dictionaries that must at least declare the metadata expected by
:meth:`validate_arrays_meta`.
system_metadata : Mapping[str, Any]
System metadata such as address of Ray cluster, number of MPI
ranks, and other general information that describes the system.
comm : Comm, optional
Communication backend to use. If ``None``, a Gloo communicator
configured from ``system_metadata`` is initialized.
_node_id : str or None, optional
The ID of the node. If None, the ID is taken from the Ray runtime
context. Useful for testing with several scheduling actors on a
single machine. Default is None.
scheduling_actor_cls : Type, optional
The class to use for creating the scheduling actor. Default is
`_RealSchedulingActor`.
_init_retries : int, optional
Number of retry attempts when initializing the scheduling actor.
Default is 3.
_comm_timeout : int or None, optional
Timeout in seconds for Gloo rendezvous. Default is 120 when not
provided.
Raises
------
RuntimeError
If the scheduling actor cannot be created or initialized after
the specified number of retries.
ValueError
If ``_comm_timeout`` is provided and is not strictly positive.
Notes
-----
This method automatically initializes Ray if it hasn't been initialized
yet. The scheduling actor is created with a detached lifetime and uses
node affinity scheduling when `_node_id` is None. The first remote call
to the scheduling actor serves as a readiness check.
"""
self.bridge_id = bridge_id
self._init_retries = _init_retries
self.arrays_metadata = _validate_arrays_meta(arrays_metadata)
# NOTE : Possible error : if two bridges have different first array it will have different
# shape and will be declared twice.
# Possible fix : do it somewhere else (Head or SchedulingActor)
# we add a special array with a name that will signal the end of the simulation
# note we only need the metadata so that it can pass through the entire pipeline correctly and
# in sequential order, so we just replicate the first metadata we have.
self.system_metadata = _validate_system_meta(system_metadata)
if self.bridge_id == 0:
self.arrays_metadata["__deisa_last_iteration_array"] = {
"chunk_shape": (1, 1),
"nb_chunks_per_dim": (1, 1),
"nb_chunks_of_node": 1,
"dtype": np.int16,
"chunk_position": (0, 0),
}
self._comm_timeout = 120 if _comm_timeout is None else int(_comm_timeout)
if self._comm_timeout <= 0:
raise ValueError(f"_comm_timeout must be > 0 seconds, got {self._comm_timeout}")
if comm is None:
try:
comm = init_gloo_comm(
system_metadata["world_size"],
self.bridge_id,
system_metadata["master_address"],
system_metadata["master_port"],
self._comm_timeout,
)
except dist.DistStoreError as e:
e.add_note(
"Gloo rendezvous timeout.\n"
f"Rank: {self.bridge_id}\n"
f"Expected world size: {self.system_metadata['world_size']}\n"
f"Master: {self.system_metadata['master_address']}:{self.system_metadata['master_port']}\n"
"Not all Bridge processes connected before the deadline.\n"
"This usually indicates that one or more Bridge instances crashed, "
"never started, or are blocked during initialization."
)
raise
self.comm = comm
if not ray.is_initialized():
ray.init(address="auto", log_to_driver=False, logging_level=logging.ERROR)
self.node_id = _node_id or ray.get_runtime_context().get_node_id()
name = f"sched-{self.node_id}"
namespace = "deisa_ray"
node_actor_options: Dict[str, Any] = get_node_actor_options(name, namespace)
# place node actor in the same place as detected node_id
if _node_id is None:
node_actor_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy(
node_id=self.node_id,
soft=False,
)
# create node actor
self._create_node_actor(scheduling_actor_cls, node_actor_options)
# exchange meta with node actor
self._exchange_chunks_meta_with_node_actor()
# make sure node actor is ready
ray.get(self.node_actor.ready.remote())
# barrier to make sure that all ranks have reached this point
self.comm.barrier()
# after this function returns we are sure that
# 1. analytics have started
# 2. head actor is created
# 3. all bridges have connected
# 4. all node actors have been created
# 5. all node actors have received description of arrays_md
def _exchange_chunks_meta_with_node_actor(self):
"""
Push per-array metadata to the node actor.
Notes
-----
This method registers both global chunk layout and the bridge-specific
chunk position for every array described in ``arrays_metadata``.
"""
# send metadata of each array chunk (both global and local chunk info) to node actor
refs = []
for array_name, meta in self.arrays_metadata.items():
refs.append(
self.node_actor.register_chunk_meta.remote(
# global info of array (same across bridges)
array_name=array_name,
chunk_shape=meta["chunk_shape"],
nb_chunks_per_dim=meta["nb_chunks_per_dim"],
nb_chunks_of_node=meta["nb_chunks_of_node"],
dtype=meta["dtype"],
# local info of array specific to bridge
bridge_id=self.bridge_id,
chunk_position=meta["chunk_position"],
)
)
ray.get(refs)
[docs]
def send(
self,
*,
array_name: str,
chunk: np.ndarray,
timestep: int,
chunked: bool = True,
store_externally: bool = False,
test_mode: bool = False,
) -> None:
"""
Make a chunk of data available to the analytics.
This method stores the ``chunk`` in Ray's object store, and
sends a reference to the node actor. The method blocks until the
data is processed by the node actor.
Parameters
----------
array_name : str
The name of the array this chunk belongs to.
chunk : numpy.ndarray
The chunk of data to be sent to the analytics.
timestep : int
The timestep index for this chunk of data.
chunked : bool, optional
Whether the chunk was produced by the internal chunking logic.
Currently reserved for future use. Default is True.
store_externally : bool, optional
If True, the data is stored externally. Not implemented yet.
Default is False.
test_mode : bool, optional
Reserved flag for future testing or validation hooks. Currently
ignored. Default is False.
Notes
-----
The chunk is stored in Ray's object store with the node actor as the owner,
ensuring the reference persists even after the simulation script terminates.
This method blocks until the node actor has the chunk.
Raises
------
ContractError
When the scheduling node detects a contract violation for the
provided chunk.
Returns
-------
None
Blocks until the node actor processes the chunk.
"""
try:
# Setting the owner allows keeping the reference when the simulation script terminates.
ref = ray.put(chunk, _owner=self.node_actor)
future: ray.ObjectRef = self.node_actor.add_chunk.remote(
bridge_id=self.bridge_id,
array_name=array_name,
chunk_ref=[ref],
timestep=timestep,
chunked=True,
store_externally=store_externally,
) # type: ignore
# Wait until the data is processed before returning to the simulation
ray.get(future)
except ContractError as e:
raise e
except Exception as e:
_default_exception_handler(e)
[docs]
def close(
self,
*,
timestep: int,
store_externally: bool = False,
) -> None:
"""
Close the bridge by signaling analytics that the simulation finished.
Parameters
----------
timestep : int
The timestep index corresponding to the sentinel chunk.
store_externally : bool, optional
Reserved for future external storage handling. Default is False.
Returns
-------
None
Blocks until the sentinel chunk is known by the node actor.
"""
self.comm.barrier()
if self.bridge_id == 0:
try:
ref = ray.put(0, _owner=self.node_actor)
future: ray.ObjectRef = self.node_actor.add_chunk.remote(
bridge_id=self.bridge_id,
array_name="__deisa_last_iteration_array",
chunk_ref=[ref],
timestep=timestep,
chunked=True,
store_externally=store_externally,
) # type: ignore
ray.get(future)
except Exception as e:
_default_exception_handler(e)
def _create_node_actor(
self,
node_actor_cls: ActorClass = _RealSchedulingActor,
node_actor_options: Mapping[str, Any] | None = None,
) -> None:
"""
Create (or get) the node actor and register arrays metadata.
Parameters
----------
node_actor_cls : Type, optional
Class used to create the node actor. Defaults to
:class:`deisa.ray.scheduling_actor.SchedulingActor`.
node_actor_options : Mapping[str, Any] or None, optional
Options passed to ``node_actor_cls.options(**node_actor_options)``.
If None, :func:`get_node_actor_options` is used with the default
name/namespace derived from the bridge.
Raises
------
RuntimeError
If the node actor cannot be created or readied after
``_init_retries`` attempts.
"""
if node_actor_options is None:
node_actor_options = get_node_actor_options(
name=f"sched-{self.node_id}",
namespace="deisa_ray",
)
last_err = None
for _ in range(max(1, self._init_retries)):
try:
# first rank to arrive creates, others get same handle (get_if_exists)
self.node_actor: RayActorHandle = node_actor_cls.options(**node_actor_options).remote(
actor_id=self.node_id
) # type: ignore
break # success
except Exception as e:
last_err = e
# Try to re-create a fresh actor instance (same name will resolve to existing or new one)
continue
# `else:` clause belongs to for loop. Only executed if it finishes normally without
# encountering a `break`.
else:
raise RuntimeError(f"Failed to create/ready scheduling actor for node {self.node_id}") from last_err
# TODO feedback needs testing
[docs]
def get(
self,
*,
name: str,
default: Any | None = None,
chunked: bool = False,
) -> Any | None:
"""
Retrieve information back from Analytics.
Used for two cases:
1. Retrieve a simple value that is set in the analytics so that the
simulation can react to some event that has been detected. This
case is asynchronous.
2. (Planned) Retrieve a distributed array that has been modified by
the analytics. This case is synchronous and currently not
implemented for ``chunked=True``.
Parameters
----------
name : str
The name of the key that is being retrieved from the Analytics.
default : Any, optional
The default value to return if the key has not been set or does
not exist. Default is None.
chunked : bool, optional
Whether the value that is returned is distributed or not. Should
be set to True only if retrieving a distributed array that is
handled by the bridge. Currently not implemented. Default is
False.
Notes
-----
When ``chunked`` is False, this method simply forwards the request to
the node actor and returns the result (or ``default`` if not set).
When ``chunked`` is True, a :class:`NotImplementedError` is raised.
Returns
-------
Any | None
The value associated with ``name`` or ``default`` if the key is
missing.
Raises
------
NotImplementedError
Chunked retrieval is not implemented yet.
"""
if not chunked:
return ray.get(self.node_actor.get.remote(name, default, chunked))
raise NotImplementedError("Retrieving chunked arrays via Bridge.get is not implemented yet.")
def _delete(self, *, name: str) -> None:
"""
Delete a feedback key from the node actor if present.
Parameters
----------
name : str
Key to remove.
Notes
-----
This is currently used internally after :meth:`get` to avoid
repeatedly signaling the same event. Missing keys are silently
ignored.
"""
self.node_actor.delete.remote(name)