Source code for deisa.ray.bridge

"""Bridge between MPI ranks and the Ray-based analytics system.

This module exposes the :class:`Bridge` class used by simulation ranks to
register their data chunks and exchange information with analytics running on
top of Ray.
"""

from __future__ import annotations
import logging
from typing import Any, Dict, Mapping, Optional
import numpy as np
import ray
from ray.actor import ActorClass
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from deisa.ray.scheduling_actor import SchedulingActor as _RealSchedulingActor
from deisa.ray.types import RayActorHandle
from deisa.ray.errors import ContractError, _default_exception_handler
from deisa.ray.comm import Comm, init_gloo_comm
from deisa.ray.validate import _validate_arrays_meta, _validate_system_meta
from deisa.ray.utils import get_node_actor_options
import torch.distributed as dist

logger = logging.getLogger(__name__)


[docs] class Bridge: """ Bridge between MPI ranks and Ray cluster for distributed array processing. Each Bridge instance is created by an MPI rank to connect to the Ray cluster and send data chunks. Each Bridge is responsible for managing a chunk of data from the decomposed distributed array. Parameters ---------- bridge_id : int Identifier of the MPI rank that owns this bridge instance. arrays_metadata : Mapping[str, Mapping[str, Any]] Metadata describing the array layout managed by this bridge. system_metadata : Mapping[str, Any] Cluster-wide metadata (e.g., world size, master address/port). comm : Comm, optional Custom communication backend to use instead of the default Gloo communicator. If ``None``, ``init_gloo_comm`` runs with ``system_metadata``. _node_id : str or None, optional Node identifier used for testing or custom scheduling. Defaults to ``None``. scheduling_actor_cls : Type, optional Class used to materialize the scheduling actor. Defaults to :class:`deisa.ray.scheduling_actor.SchedulingActor`. _init_retries : int, optional Number of attempts to create and ready the node actor. Defaults to 3. _comm_timeout : int or None, optional Timeout (in seconds) for Gloo rendezvous during communicator initialization. Defaults to 120. Attributes ---------- node_id : str The ID of the node this Bridge is associated with. Notes ----- The Bridge automatically initializes Ray if it hasn't been initialized yet. The scheduling actor is created with a detached lifetime to persist beyond the Bridge initialization. The actor uses node affinity scheduling to ensure it runs on the specified node. Examples -------- Create a bridge for a simulation rank that owns one array ``temperature``:: arrays_metadata = { "temperature": { "chunk_shape": (10, 10), "nb_chunks_per_dim": (4, 4), "nb_chunks_of_node": 1, "dtype": np.float64, "chunk_position": (0, 0), } } system_metadata = { "nb_ranks": 4, "ray_address": "auto", } bridge = Bridge( id=0, arrays_metadata=arrays_metadata, system_metadata=system_metadata ) bridge.send( array_name="temperature", chunk=np.zeros((10, 10), dtype=np.float64), timestep=0 ) """ # TODO: add exception handler? what should default be? If bridge is not instantiated, # should sim crash? Keep going? def __init__( self, bridge_id: int, arrays_metadata: Mapping[str, Mapping[str, Any]], system_metadata: Mapping[str, Any], comm: Optional[Comm] = None, *, _node_id: str | None = None, scheduling_actor_cls: ActorClass = _RealSchedulingActor, _init_retries: int = 3, _comm_timeout: Optional[int] = None, ) -> None: """ Initialize the Bridge to connect MPI rank to Ray cluster. Parameters ---------- bridge_id : int Unique identifier of this Bridge. arrays_metadata : Mapping[str, Mapping[str, Any]] Dictionary that describes the arrays being shared by the simulation. Keys represent the name of the array while the values are dictionaries that must at least declare the metadata expected by :meth:`validate_arrays_meta`. system_metadata : Mapping[str, Any] System metadata such as address of Ray cluster, number of MPI ranks, and other general information that describes the system. comm : Comm, optional Communication backend to use. If ``None``, a Gloo communicator configured from ``system_metadata`` is initialized. _node_id : str or None, optional The ID of the node. If None, the ID is taken from the Ray runtime context. Useful for testing with several scheduling actors on a single machine. Default is None. scheduling_actor_cls : Type, optional The class to use for creating the scheduling actor. Default is `_RealSchedulingActor`. _init_retries : int, optional Number of retry attempts when initializing the scheduling actor. Default is 3. _comm_timeout : int or None, optional Timeout in seconds for Gloo rendezvous. Default is 120 when not provided. Raises ------ RuntimeError If the scheduling actor cannot be created or initialized after the specified number of retries. ValueError If ``_comm_timeout`` is provided and is not strictly positive. Notes ----- This method automatically initializes Ray if it hasn't been initialized yet. The scheduling actor is created with a detached lifetime and uses node affinity scheduling when `_node_id` is None. The first remote call to the scheduling actor serves as a readiness check. """ self.bridge_id = bridge_id self._init_retries = _init_retries self.arrays_metadata = _validate_arrays_meta(arrays_metadata) # NOTE : Possible error : if two bridges have different first array it will have different # shape and will be declared twice. # Possible fix : do it somewhere else (Head or SchedulingActor) # we add a special array with a name that will signal the end of the simulation # note we only need the metadata so that it can pass through the entire pipeline correctly and # in sequential order, so we just replicate the first metadata we have. self.system_metadata = _validate_system_meta(system_metadata) if self.bridge_id == 0: self.arrays_metadata["__deisa_last_iteration_array"] = { "chunk_shape": (1, 1), "nb_chunks_per_dim": (1, 1), "nb_chunks_of_node": 1, "dtype": np.int16, "chunk_position": (0, 0), } self._comm_timeout = 120 if _comm_timeout is None else int(_comm_timeout) if self._comm_timeout <= 0: raise ValueError(f"_comm_timeout must be > 0 seconds, got {self._comm_timeout}") if comm is None: try: comm = init_gloo_comm( system_metadata["world_size"], self.bridge_id, system_metadata["master_address"], system_metadata["master_port"], self._comm_timeout, ) except dist.DistStoreError as e: e.add_note( "Gloo rendezvous timeout.\n" f"Rank: {self.bridge_id}\n" f"Expected world size: {self.system_metadata['world_size']}\n" f"Master: {self.system_metadata['master_address']}:{self.system_metadata['master_port']}\n" "Not all Bridge processes connected before the deadline.\n" "This usually indicates that one or more Bridge instances crashed, " "never started, or are blocked during initialization." ) raise self.comm = comm if not ray.is_initialized(): ray.init(address="auto", log_to_driver=False, logging_level=logging.ERROR) self.node_id = _node_id or ray.get_runtime_context().get_node_id() name = f"sched-{self.node_id}" namespace = "deisa_ray" node_actor_options: Dict[str, Any] = get_node_actor_options(name, namespace) # place node actor in the same place as detected node_id if _node_id is None: node_actor_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( node_id=self.node_id, soft=False, ) # create node actor self._create_node_actor(scheduling_actor_cls, node_actor_options) # exchange meta with node actor self._exchange_chunks_meta_with_node_actor() # make sure node actor is ready ray.get(self.node_actor.ready.remote()) # barrier to make sure that all ranks have reached this point self.comm.barrier() # after this function returns we are sure that # 1. analytics have started # 2. head actor is created # 3. all bridges have connected # 4. all node actors have been created # 5. all node actors have received description of arrays_md def _exchange_chunks_meta_with_node_actor(self): """ Push per-array metadata to the node actor. Notes ----- This method registers both global chunk layout and the bridge-specific chunk position for every array described in ``arrays_metadata``. """ # send metadata of each array chunk (both global and local chunk info) to node actor refs = [] for array_name, meta in self.arrays_metadata.items(): refs.append( self.node_actor.register_chunk_meta.remote( # global info of array (same across bridges) array_name=array_name, chunk_shape=meta["chunk_shape"], nb_chunks_per_dim=meta["nb_chunks_per_dim"], nb_chunks_of_node=meta["nb_chunks_of_node"], dtype=meta["dtype"], # local info of array specific to bridge bridge_id=self.bridge_id, chunk_position=meta["chunk_position"], ) ) ray.get(refs)
[docs] def send( self, *, array_name: str, chunk: np.ndarray, timestep: int, chunked: bool = True, store_externally: bool = False, test_mode: bool = False, ) -> None: """ Make a chunk of data available to the analytics. This method stores the ``chunk`` in Ray's object store, and sends a reference to the node actor. The method blocks until the data is processed by the node actor. Parameters ---------- array_name : str The name of the array this chunk belongs to. chunk : numpy.ndarray The chunk of data to be sent to the analytics. timestep : int The timestep index for this chunk of data. chunked : bool, optional Whether the chunk was produced by the internal chunking logic. Currently reserved for future use. Default is True. store_externally : bool, optional If True, the data is stored externally. Not implemented yet. Default is False. test_mode : bool, optional Reserved flag for future testing or validation hooks. Currently ignored. Default is False. Notes ----- The chunk is stored in Ray's object store with the node actor as the owner, ensuring the reference persists even after the simulation script terminates. This method blocks until the node actor has the chunk. Raises ------ ContractError When the scheduling node detects a contract violation for the provided chunk. Returns ------- None Blocks until the node actor processes the chunk. """ try: # Setting the owner allows keeping the reference when the simulation script terminates. ref = ray.put(chunk, _owner=self.node_actor) future: ray.ObjectRef = self.node_actor.add_chunk.remote( bridge_id=self.bridge_id, array_name=array_name, chunk_ref=[ref], timestep=timestep, chunked=True, store_externally=store_externally, ) # type: ignore # Wait until the data is processed before returning to the simulation ray.get(future) except ContractError as e: raise e except Exception as e: _default_exception_handler(e)
[docs] def close( self, *, timestep: int, store_externally: bool = False, ) -> None: """ Close the bridge by signaling analytics that the simulation finished. Parameters ---------- timestep : int The timestep index corresponding to the sentinel chunk. store_externally : bool, optional Reserved for future external storage handling. Default is False. Returns ------- None Blocks until the sentinel chunk is known by the node actor. """ self.comm.barrier() if self.bridge_id == 0: try: ref = ray.put(0, _owner=self.node_actor) future: ray.ObjectRef = self.node_actor.add_chunk.remote( bridge_id=self.bridge_id, array_name="__deisa_last_iteration_array", chunk_ref=[ref], timestep=timestep, chunked=True, store_externally=store_externally, ) # type: ignore ray.get(future) except Exception as e: _default_exception_handler(e)
def _create_node_actor( self, node_actor_cls: ActorClass = _RealSchedulingActor, node_actor_options: Mapping[str, Any] | None = None, ) -> None: """ Create (or get) the node actor and register arrays metadata. Parameters ---------- node_actor_cls : Type, optional Class used to create the node actor. Defaults to :class:`deisa.ray.scheduling_actor.SchedulingActor`. node_actor_options : Mapping[str, Any] or None, optional Options passed to ``node_actor_cls.options(**node_actor_options)``. If None, :func:`get_node_actor_options` is used with the default name/namespace derived from the bridge. Raises ------ RuntimeError If the node actor cannot be created or readied after ``_init_retries`` attempts. """ if node_actor_options is None: node_actor_options = get_node_actor_options( name=f"sched-{self.node_id}", namespace="deisa_ray", ) last_err = None for _ in range(max(1, self._init_retries)): try: # first rank to arrive creates, others get same handle (get_if_exists) self.node_actor: RayActorHandle = node_actor_cls.options(**node_actor_options).remote( actor_id=self.node_id ) # type: ignore break # success except Exception as e: last_err = e # Try to re-create a fresh actor instance (same name will resolve to existing or new one) continue # `else:` clause belongs to for loop. Only executed if it finishes normally without # encountering a `break`. else: raise RuntimeError(f"Failed to create/ready scheduling actor for node {self.node_id}") from last_err # TODO feedback needs testing
[docs] def get( self, *, name: str, default: Any | None = None, chunked: bool = False, ) -> Any | None: """ Retrieve information back from Analytics. Used for two cases: 1. Retrieve a simple value that is set in the analytics so that the simulation can react to some event that has been detected. This case is asynchronous. 2. (Planned) Retrieve a distributed array that has been modified by the analytics. This case is synchronous and currently not implemented for ``chunked=True``. Parameters ---------- name : str The name of the key that is being retrieved from the Analytics. default : Any, optional The default value to return if the key has not been set or does not exist. Default is None. chunked : bool, optional Whether the value that is returned is distributed or not. Should be set to True only if retrieving a distributed array that is handled by the bridge. Currently not implemented. Default is False. Notes ----- When ``chunked`` is False, this method simply forwards the request to the node actor and returns the result (or ``default`` if not set). When ``chunked`` is True, a :class:`NotImplementedError` is raised. Returns ------- Any | None The value associated with ``name`` or ``default`` if the key is missing. Raises ------ NotImplementedError Chunked retrieval is not implemented yet. """ if not chunked: return ray.get(self.node_actor.get.remote(name, default, chunked)) raise NotImplementedError("Retrieving chunked arrays via Bridge.get is not implemented yet.")
def _delete(self, *, name: str) -> None: """ Delete a feedback key from the node actor if present. Parameters ---------- name : str Key to remove. Notes ----- This is currently used internally after :meth:`get` to avoid repeatedly signaling the same event. Missing keys are silently ignored. """ self.node_actor.delete.remote(name)