deisa.ray.types module¶
- class deisa.ray.types.ArrayPerTimestep[source]¶
Bases:
objectInternal class tracking chunks for a specific array and timestep.
Tracks the per-timestep chunks owned by a single scheduling actor. Each instance is keyed by
timestepinside aPartialArray.- chunks_ready_event¶
Triggered when all chunks for this timestep owned by the node actor have arrived and been forwarded to the head actor.
- Type:
asyncio.Event
- local_chunks¶
Mapping of
bridge_idto the double ObjectRef for that chunk. Once forwarded to the head actor the value is replaced with pickled bytes to free memory.- Type:
AsyncDict[int, ray.ObjectRef | bytes]
- class deisa.ray.types.ChunkRef(actor_id: int, array_name: str, timestep: Hashable, position: tuple[int, ...], bridge_id: int, _all_chunks: ObjectRef | None = None)[source]¶
Bases:
objectRepresents a chunk of an array in a Dask task graph.
This class is used as a placeholder in Dask task graphs to represent a chunk of data. The task corresponding to this object must be scheduled by the actor who has the actual data. This class is used since Dask tends to inline simple tuples, which would prevent proper scheduling.
- Parameters:
actor_id (int) – The ID of the scheduling actor that owns this chunk.
array_name (str) – The real name of the array, without the timestep suffix.
timestep (Timestep) – The timestep this chunk belongs to.
position (tuple[int, ...]) – The position of the chunk in the array decomposition.
_all_chunks (ray.ObjectRef or None, optional) – ObjectRef containing all chunks for this timestep. Set for one chunk only to avoid duplication. Default is None.
Notes
This class is used to prevent Dask from inlining simple tuples in the task graph, which would break the scheduling mechanism. The behavior may change in newer versions of Dask.
- actor_id: int¶
- array_name: str¶
- bridge_id: int¶
- position: tuple[int, ...]¶
- timestep: Hashable¶
- class deisa.ray.types.DaskArrayData(name, f_preprocessing)[source]¶
Bases:
objectInformation about a Dask array being built.
Tracks metadata and per-timestep state for a Dask array assembled from chunks sent by scheduling actors.
- Parameters:
name (str) – Array name registered with the head actor.
f_preprocessing (Callable) – Preprocessing callback applied to each chunk.
- name¶
Array name without timestep suffix.
- Type:
str
- f_preprocessing¶
Preprocessing callback supplied at registration.
- Type:
Callable
- fully_defined¶
Set when every chunk owner has been registered.
- Type:
asyncio.Event
- nb_chunks_per_dim¶
Number of chunks per dimension. Set when first chunk owner is registered.
- Type:
tuple[int, …] or None
- nb_chunks¶
Total number of chunks in the array. Set when first chunk owner is registered.
- Type:
int or None
- chunks_size¶
For each dimension, the size of chunks in that dimension. None values indicate unknown chunk sizes.
- Type:
list[list[int | None]] or None
- dtype¶
The numpy dtype of the array chunks. Set when first chunk owner is registered.
- Type:
np.dtype or None
- position_to_node_actorID¶
Mapping from chunk position to the scheduling actor responsible for that chunk.
- Type:
dict[tuple[int, …], int]
- position_to_bridgeID¶
Mapping from chunk position to the producing bridge ID.
- Type:
dict[tuple, int]
- nb_scheduling_actors¶
Number of unique scheduling actors owning chunks of this array. Set when all chunk owners are known.
- Type:
int or None
- chunk_refs¶
For each timestep, the list of per-actor references that keep chunk payloads alive in the object store.
- Type:
dict[Timestep, list[ray.ObjectRef]]
- pos_to_ref_by_timestep¶
For each timestep, the (position, ref) pairs provided by scheduling actors. Used when distributed scheduling is disabled.
- Type:
defaultdict
- add_chunk_ref(chunk_ref: ObjectRef, timestep: Hashable, pos_to_ref) bool[source]¶
Add a reference sent by a scheduling actor.
- Parameters:
chunk_ref (ray.ObjectRef) – Ray object reference to a chunk sent by a scheduling actor.
timestep (Timestep) – The timestep this chunk belongs to.
pos_to_ref (dict[tuple, ray.ObjectRef]) – Mapping of chunk position to the (double) Ray ObjectRef provided by the scheduling actor for this timestep.
- Returns:
True if all chunks for this timestep are ready (i.e., all scheduling actors have sent their chunks), False otherwise.
- Return type:
bool
Notes
This method adds the chunk reference to the list for the given timestep. It returns True only when all chunk owners are known and all have sent their chunks for this timestep.
- get_full_array(timestep: Hashable, *, distributing_scheduling_enabled: bool, is_preparation: bool = False) Array[source]¶
Return the full Dask array for a given timestep.
- Parameters:
timestep (Timestep) – The timestep for which the full array should be returned.
distributing_scheduling_enabled (bool) – When
True, emit a graph containingChunkReftasks for distributed scheduling. WhenFalse, materialise the actual chunk payloads and build a local Dask array.is_preparation (bool, optional) – If True, the array will not contain ObjectRefs to the actual data. This is used for preparation arrays where only the structure is needed. Default is False.
- Returns:
A Dask array representing the full decomposed array for the given timestep. The array is constructed from chunk references stored in Ray’s object store.
- Return type:
da.Array
- Raises:
AssertionError – If not all chunk owners have been registered, or if the number of chunks is inconsistent.
Notes
When distributed scheduling is enabled the graph uses
ChunkRefplaceholders that keep data owner information. Otherwise the concrete chunk payloads are inlined. Chunk reference lists are deleted after embedding in the graph to avoid leaking memory.is_preparationskips storing payload refs entirely so analytics can inspect shapes/chunks without materialising data.
- update_meta(nb_chunks_per_dim: tuple[int, ...], dtype: dtype, position: tuple[int, ...], size: tuple[int, ...], node_actor_id: int, bridge_id: int) None[source]¶
Register a scheduling actor as the owner of a chunk at a specific position.
This method records which scheduling actor is responsible for a chunk and updates the array metadata. If this is the first chunk registered, it initializes the array dimensions and dtype.
- Parameters:
nb_chunks_per_dim (tuple[int, ...]) – Number of chunks per dimension in the array decomposition.
dtype (np.dtype) – The numpy dtype of the chunk.
position (tuple[int, ...]) – The position of the chunk in the array decomposition.
size (tuple[int, ...]) – The size of the chunk along each dimension.
node_actor_id (int) – Scheduling actor that owns this chunk.
bridge_id (int) – Bridge identifier that produced the chunk (used for lookups).
- Raises:
AssertionError – If the chunk position is out of bounds, or if subsequent chunks have inconsistent dimensions, dtype, or sizes compared to the first chunk.
- class deisa.ray.types.GraphInfo[source]¶
Bases:
objectInformation about graphs and their scheduling.
Tracks scheduling status and produced references for a Dask task graph scheduled by a
SchedulingActor.- scheduled_event¶
Event set once the graph has been submitted to Ray.
- Type:
asyncio.Event
- refs¶
Mapping from task key to the double Ray ObjectRef returned by the patched Dask-on-Ray scheduler.
- Type:
dict[str, ray.ObjectRef]
- class deisa.ray.types.PartialArray[source]¶
Bases:
objectInternal class tracking metadata and chunks for an array.
Maintains metadata for a single array on one scheduling actor, including the set of locally owned chunks and per-timestep chunk collections.
- ready_event¶
Set once all local bridges have provided metadata for the array and the head actor has been notified.
- Type:
asyncio.Event
- chunks_contained_meta¶
Metadata tuples
(bridge_id, chunk_position, chunk_size)for the chunks owned by this actor.- Type:
set[tuple[int, tuple[int, …], tuple[int, …]]]
- bid_to_pos¶
Maps
bridge_idto the corresponding chunk position for quick lookup when forwarding payloads.- Type:
dict[int, tuple]
- per_timestep_arrays¶
Per-timestep structures that hold chunk references as they arrive.
- Type:
AsyncDict[Timestep, ArrayPerTimestep]
- class deisa.ray.types.ScheduledByOtherActor(actor_id: ActorID)[source]¶
Bases:
objectRepresents a task that is scheduled by another actor.
This class is used as a placeholder in Dask task graphs to indicate that a task should be scheduled by a different actor. When a task graph is sent to an actor, tasks marked with this class will be delegated to the specified actor.
- Parameters:
actor_id (int) – The ID of the scheduling actor that should schedule this task.
Notes
This is used to handle cross-actor task dependencies in distributed Dask computations where different parts of the task graph are handled by different scheduling actors.
- actor_id: ActorID¶
- class deisa.ray.types.WindowArrayDefinition(name: str, window_size: int | None = None, preprocess: ~typing.Callable = <function WindowArrayDefinition.<lambda>>)[source]¶
Bases:
objectDescription of an array with optional windowing support.
- Parameters:
name (str) – The name of the array.
window_size (int or None, optional) – If specified, creates a sliding window of arrays for this array name. The window will contain the last window_size timesteps. If None, only the current timestep array is provided. Default is None.
preprocess (Callable, optional) – A preprocessing function to apply to chunks of this array before they are sent to the analytics. The function should take a numpy array and return a processed numpy array. Default is the identity function (no preprocessing).
Examples
>>> def normalize(arr): ... return arr / arr.max() >>> # Array with windowing: last 5 timesteps >>> array_def = ArrayDefinition(name="temperature", window_size=5, preprocess=normalize) >>> # Array without windowing: current timestep only >>> array_def = ArrayDefinition(name="pressure", window_size=None)
- name: str¶
- preprocess()¶
- window_size: int | None = None¶