Source code for deisa.ray.bridge

"""Bridge between MPI ranks and the Ray-based analytics system.

This module exposes the :class:`Bridge` class used by simulation ranks to
register their data chunks and exchange information with analytics running on
top of Ray.
"""

import logging
from typing import Any, Callable, Dict, Mapping, Type

import numpy as np
import ray
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from deisa.ray.scheduling_actor import SchedulingActor as _RealSchedulingActor
from deisa.ray.types import RayActorHandle

logger = logging.getLogger(__name__)


[docs] def get_node_actor_options(name: str, namespace: str) -> Dict[str, Any]: """Return Ray options used to create (or get) a node scheduling actor. Parameters ---------- name : str Actor name to use for the node actor. namespace : str Ray namespace where the actor will live. Returns ------- dict Dictionary of options to be passed to ``SchedulingActor.options``. Notes ----- The options use ``get_if_exists=True`` to avoid race conditions when several bridges on the same node attempt to create the same actor. The actor is configured with: - ``lifetime='detached'`` so it survives the creating task - ``num_cpus=0`` so it does not reserve CPU resources - a very large ``max_concurrency`` because the actor is async-only and used mainly as a coordination point. """ return { "name": name, "namespace": namespace, "lifetime": "detached", "get_if_exists": True, # WARNING: if not using async actor this will make OS try to spawn many threads # and blow everything up. Scheduling actors need to be async because of this. "max_concurrency": 1_000_000_000, "num_cpus": 0, "enable_task_events": False, }
[docs] class Bridge: """ Bridge between MPI ranks and Ray cluster for distributed array processing. Each Bridge instance is created by an MPI rank to connect to the Ray cluster and send data chunks. Each Bridge is responsible for managing a chunk of data from the decomposed distributed array. Parameters ---------- _node_id : str or None, optional The ID of the node. If None, the ID is taken from the Ray runtime context. Useful for testing with several scheduling actors on a single machine. Default is None. scheduling_actor_cls : Type, optional The class to use for creating the scheduling actor. Default is `_RealSchedulingActor`. _init_retries : int, optional Number of retry attempts when initializing the scheduling actor. Default is 3. Attributes ---------- node_id : str The ID of the node this Bridge is associated with. scheduling_actor : RayActorHandle The Ray actor handle for the scheduling actor. preprocessing_callbacks : dict[str, Callable] Dictionary mapping array names to their preprocessing callback functions. Notes ----- The Bridge automatically initializes Ray if it hasn't been initialized yet. The scheduling actor is created with a detached lifetime to persist beyond the Bridge initialization. The actor uses node affinity scheduling to ensure it runs on the specified node. Examples -------- Create a bridge for a simulation rank that owns one array ``temperature``:: arrays_metadata = { "temperature": { "chunk_shape": (10, 10), "nb_chunks_per_dim": (4, 4), "nb_chunks_of_node": 1, "dtype": np.float64, "chunk_position": (0, 0), } } system_metadata = { "nb_ranks": 4, "ray_address": "auto", } bridge = Bridge( id=0, arrays_metadata=arrays_metadata, system_metadata=system_metadata, ) bridge.send( array_name="temperature", chunk=np.zeros((10, 10), dtype=np.float64), timestep=0, ) """ def __init__( self, id: int, arrays_metadata: Mapping[str, Mapping[str, Any]], system_metadata: Mapping[str, Any], *args: Any, _node_id: str | None = None, scheduling_actor_cls: Type = _RealSchedulingActor, _init_retries: int = 3, **kwargs: Any, ) -> None: """ Initialize the Bridge to connect MPI rank to Ray cluster. Parameters ---------- id : int Unique identifier of this Bridge. arrays_metadata : Mapping[str, Mapping[str, Any]] Dictionary that describes the arrays being shared by the simulation. Keys represent the name of the array while the values are dictionaries that must at least declare the metadata expected by :meth:`validate_arrays_meta`. system_metadata : Mapping[str, Any] System metadata such as address of Ray cluster, number of MPI ranks, and other general information that describes the system. _node_id : str or None, optional The ID of the node. If None, the ID is taken from the Ray runtime context. Useful for testing with several scheduling actors on a single machine. Default is None. scheduling_actor_cls : Type, optional The class to use for creating the scheduling actor. Default is `_RealSchedulingActor`. _init_retries : int, optional Number of retry attempts when initializing the scheduling actor. Default is 3. Raises ------ RuntimeError If the scheduling actor cannot be created or initialized after the specified number of retries. Notes ----- This method automatically initializes Ray if it hasn't been initialized yet. The scheduling actor is created with a detached lifetime and uses node affinity scheduling when `_node_id` is None. The first remote call to the scheduling actor serves as a readiness check. Other Parameters ---------------- *args, **kwargs Currently ignored. Present for backward compatibility with older versions of the API. """ self.id = id self._init_retries = _init_retries self.arrays_metadata = self._validate_arrays_meta(arrays_metadata) self.system_metadata = self._validate_system_meta(system_metadata) if not ray.is_initialized(): ray.init(address="auto", log_to_driver=False, logging_level=logging.ERROR) self.node_id = _node_id or ray.get_runtime_context().get_node_id() name = f"sched-{self.node_id}" namespace = "deisa_ray" node_actor_options: Dict[str, Any] = get_node_actor_options(name, namespace) # place node actor in the same place as detected node_id if _node_id is None: node_actor_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( node_id=self.node_id, soft=False, ) # create node actor self._create_node_actor(scheduling_actor_cls, node_actor_options) self._exchange_chunks_meta_with_node_actor() def _exchange_chunks_meta_with_node_actor(self): """ Push per-array metadata to the node actor and cache preprocessing callbacks. Notes ----- This method registers both global chunk layout and the bridge-specific chunk position for every array described in ``arrays_metadata``. It then fetches preprocessing callbacks from the node actor and stores them locally for use during :meth:`send`. """ # send metadata of each array chunk (both global and local chunk info) to node actor for array_name, meta in self.arrays_metadata.items(): self.node_actor.register_chunk_meta.remote( # global info of array (same across bridges) array_name=array_name, chunk_shape=meta["chunk_shape"], nb_chunks_per_dim=meta["nb_chunks_per_dim"], nb_chunks_of_node=meta["nb_chunks_of_node"], dtype=meta["dtype"], # local info of array specific to bridge bridge_id=self.id, chunk_position=meta["chunk_position"], ) # double ray.get because method returns a ref itself self.preprocessing_callbacks: dict[str, Callable] = ray.get( ray.get( self.node_actor.preprocessing_callbacks.remote() # type: ignore ) ) assert isinstance(self.preprocessing_callbacks, dict)
[docs] def send( self, *args: Any, array_name: str, chunk: np.ndarray, timestep: int, chunked: bool = True, store_externally: bool = False, **kwargs: Any, ) -> None: """ Make a chunk of data available to the analytics. This method applies the preprocessing callback associated with ``array_name`` to the chunk, stores it in Ray's object store, and sends a reference to the node actor. The method blocks until the data is processed by the node actor. Parameters ---------- array_name : str The name of the array this chunk belongs to. chunk : numpy.ndarray The chunk of data to be sent to the analytics. timestep : int The timestep index for this chunk of data. chunked : bool, optional Whether the chunk was produced by the internal chunking logic. Currently reserved for future use. Default is True. store_externally : bool, optional If True, the data is stored externally. Not implemented yet. Default is False. Notes ----- The chunk is first processed through the preprocessing callback associated with ``array_name``. The processed chunk is then stored in Ray's object store with the node actor as the owner, ensuring the reference persists even after the simulation script terminates. This method blocks until the node actor has processed the chunk. Raises ------ KeyError If ``array_name`` is not found in the preprocessing callbacks dictionary. """ # ``chunked`` and additional args/kwargs are currently reserved for # future extensions (e.g. multi-chunk sends). For now we only support # sending a single chunk described by ``arrays_metadata``. del args, kwargs # explicitly unused chunk = self.preprocessing_callbacks[array_name](chunk) # Setting the owner allows keeping the reference when the simulation script terminates. ref = ray.put(chunk, _owner=self.node_actor) future: ray.ObjectRef = self.node_actor.add_chunk.remote( bridge_id=self.id, array_name=array_name, chunk_ref=[ref], timestep=timestep, chunked=True, store_externally=store_externally, ) # type: ignore # Wait until the data is processed before returning to the simulation ray.get(future)
def _validate_arrays_meta( self, arrays_metadata: Mapping[str, Mapping[str, Any]], ) -> dict[str, dict[str, Any]]: """ Validate and normalize the ``arrays_metadata`` argument. Parameters ---------- arrays_metadata : Mapping[str, Mapping[str, Any]] User-provided metadata for all arrays handled by this bridge. Returns ------- dict[str, dict[str, Any]] A shallow-copied and validated version of the input mapping. Raises ------ TypeError If the top-level mapping, keys or values have incorrect types. ValueError If required keys are missing for any array. """ if not isinstance(arrays_metadata, Mapping): raise TypeError(f"arrays_metadata must be a mapping from str to dict, got {type(arrays_metadata).__name__}") required_keys = { "chunk_shape", "nb_chunks_per_dim", "nb_chunks_of_node", "dtype", "chunk_position", } validated: dict[str, dict[str, Any]] = {} for array_name, meta in arrays_metadata.items(): # key type if not isinstance(array_name, str): raise TypeError(f"arrays_metadata keys must be str, got {type(array_name).__name__}") # value type if not isinstance(meta, Mapping): raise TypeError(f"arrays_metadata['{array_name}'] must be a mapping, got {type(meta).__name__}") # required keys present? missing = required_keys - meta.keys() if missing: raise ValueError(f"arrays_metadata['{array_name}'] is missing required keys: {missing}") self._validate_single_array_metadata(array_name, meta) validated[array_name] = dict(meta) return validated def _validate_single_array_metadata( self, name: str, meta: Mapping[str, Any], ) -> None: """ Validate metadata for a single array entry. Parameters ---------- name : str Array name. meta : Mapping[str, Any] Metadata for this array. Must contain at least: - ``chunk_shape``: sequence of positive ints - ``nb_chunks_per_dim``: sequence of positive ints - ``nb_chunks_of_node``: positive int - ``dtype``: NumPy dtype or anything accepted by ``np.dtype`` - ``chunk_position``: sequence of ints of same length as ``chunk_shape`` Raises ------ TypeError If any field has an invalid type. ValueError If shapes/positions have inconsistent lengths. """ # chunk_shape: tuple/list of positive ints chunk_shape = meta["chunk_shape"] if not (isinstance(chunk_shape, (tuple, list)) and all(isinstance(n, int) and n > 0 for n in chunk_shape)): raise TypeError( f"arrays_metadata['{name}']['chunk_shape'] must be a sequence of positive ints, got {chunk_shape!r}" ) # nb_chunks_per_dim: same pattern nb_chunks_per_dim = meta["nb_chunks_per_dim"] if not ( isinstance(nb_chunks_per_dim, (tuple, list)) and all(isinstance(n, int) and n > 0 for n in nb_chunks_per_dim) ): raise TypeError( f"arrays_metadata['{name}']['nb_chunks_per_dim'] must be a " f"sequence of positive ints, got {nb_chunks_per_dim!r}" ) # nb_chunks_of_node: positive int nb_chunks_of_node = meta["nb_chunks_of_node"] if not (isinstance(nb_chunks_of_node, int) and nb_chunks_of_node > 0): raise TypeError( f"arrays_metadata['{name}']['nb_chunks_of_node'] must be a positive int, " f"got {type(meta['nb_chunks_of_node']).__name__}" ) # chunk_position: sequence of ints of same length as chunk_shape (optional) chunk_position = meta["chunk_position"] if not ( isinstance(chunk_position, (tuple, list)) and all( isinstance(pos, int) and 0 <= pos < nb_chunks for pos, nb_chunks in zip(chunk_position, nb_chunks_per_dim) ) ): raise TypeError( f"arrays_metadata['{name}']['chunk_position'] must be a sequence of ints, got {chunk_position!r}" ) if len(chunk_position) != len(meta["chunk_shape"]): raise ValueError(f"arrays_metadata['{name}']['chunk_position'] must have the same length as 'chunk_shape'") def _validate_system_meta(self, system_meta: Mapping[str, Any]) -> dict[str, Any]: """ Validate and normalize the ``system_metadata`` argument. Parameters ---------- system_meta : Mapping[str, Any] User-provided system-level metadata. Returns ------- dict[str, Any] A shallow-copied version of the input mapping. Raises ------ TypeError If ``system_meta`` is not a mapping. """ if not isinstance(system_meta, Mapping): raise TypeError(f"system_metadata must be a mapping, got {type(system_meta).__name__}") return dict(system_meta) def _create_node_actor( self, node_actor_cls: Type = _RealSchedulingActor, node_actor_options: Mapping[str, Any] | None = None, ) -> None: """ Create (or get) the node actor and register arrays metadata. Parameters ---------- node_actor_cls : Type, optional Class used to create the node actor. Defaults to :class:`deisa.ray.scheduling_actor.SchedulingActor`. node_actor_options : Mapping[str, Any] or None, optional Options passed to ``node_actor_cls.options(**node_actor_options)``. If None, :func:`get_node_actor_options` is used with the default name/namespace derived from the bridge. Raises ------ RuntimeError If the node actor cannot be created or readied after ``_init_retries`` attempts. """ if node_actor_options is None: node_actor_options = get_node_actor_options( name=f"sched-{self.node_id}", namespace="deisa_ray", ) last_err = None for _ in range(max(1, self._init_retries)): try: # first rank to arrive creates, others get same handle (get_if_exists) self.node_actor: RayActorHandle = node_actor_cls.options(**node_actor_options).remote( actor_id=self.node_id ) # type: ignore break # success except Exception as e: last_err = e # Try to re-create a fresh actor instance (same name will resolve to existing or new one) continue # `else:` clause belongs to for loop. Only executed if it finishes normally without # encountering a `break`. else: raise RuntimeError(f"Failed to create/ready scheduling actor for node {self.node_id}") from last_err # TODO feedback needs testing
[docs] def get( self, *args: Any, name: str, default: Any | None = None, chunked: bool = False, **kwargs: Any, ) -> Any | None: """ Retrieve information back from Analytics. Used for two cases: 1. Retrieve a simple value that is set in the analytics so that the simulation can react to some event that has been detected. This case is asynchronous. 2. (Planned) Retrieve a distributed array that has been modified by the analytics. This case is synchronous and currently not implemented for ``chunked=True``. Parameters ---------- name : str The name of the key that is being retrieved from the Analytics. default : Any, optional The default value to return if the key has not been set or does not exist. Default is None. chunked : bool, optional Whether the value that is returned is distributed or not. Should be set to True only if retrieving a distributed array that is handled by the bridge. Currently not implemented. Default is False. Notes ----- When ``chunked`` is False, this method simply forwards the request to the node actor and returns the result (or ``default`` if not set). When ``chunked`` is True, a :class:`NotImplementedError` is raised. """ del args, kwargs # explicitly unused if not chunked: return ray.get(self.node_actor.get.remote(name, default, chunked)) raise NotImplementedError("Retrieving chunked arrays via Bridge.get is not implemented yet.")
def _delete(self, *args: Any, name: str, **kwargs: Any) -> None: """ Delete a feedback key from the node actor if present. Parameters ---------- name : str Key to remove. Notes ----- This is currently used internally after :meth:`get` to avoid repeatedly signaling the same event. Missing keys are silently ignored. """ del args, kwargs # explicitly unused # Currently the semantics of deletion are still under discussion. For # now, delegate to the node actor which maintains the shared state. self.node_actor.delete.remote(name)