deisa.ray.types module

class deisa.ray.types.ArrayPerTimestep[source]

Bases: object

Internal class tracking chunks for a specific array and timestep.

Tracks the per-timestep chunks owned by a single scheduling actor. Each instance is keyed by timestep inside a PartialArray.

chunks_ready_event

Triggered when all chunks for this timestep owned by the node actor have arrived and been forwarded to the head actor.

Type:

asyncio.Event

local_chunks

Mapping of bridge_id to the double ObjectRef for that chunk. Once forwarded to the head actor the value is replaced with pickled bytes to free memory.

Type:

AsyncDict[int, ray.ObjectRef | bytes]

class deisa.ray.types.ChunkRef(actor_id: int, array_name: str, timestep: Hashable, position: tuple[int, ...], bridge_id: int, _all_chunks: ObjectRef | None = None)[source]

Bases: object

Represents a chunk of an array in a Dask task graph.

This class is used as a placeholder in Dask task graphs to represent a chunk of data. The task corresponding to this object must be scheduled by the actor who has the actual data. This class is used since Dask tends to inline simple tuples, which would prevent proper scheduling.

Parameters:
  • actor_id (int) – The ID of the scheduling actor that owns this chunk.

  • array_name (str) – The real name of the array, without the timestep suffix.

  • timestep (Timestep) – The timestep this chunk belongs to.

  • position (tuple[int, ...]) – The position of the chunk in the array decomposition.

  • _all_chunks (ray.ObjectRef or None, optional) – ObjectRef containing all chunks for this timestep. Set for one chunk only to avoid duplication. Default is None.

Notes

This class is used to prevent Dask from inlining simple tuples in the task graph, which would break the scheduling mechanism. The behavior may change in newer versions of Dask.

actor_id: int
array_name: str
bridge_id: int
position: tuple[int, ...]
timestep: Hashable
class deisa.ray.types.DaskArrayData(name, f_preprocessing)[source]

Bases: object

Information about a Dask array being built.

Tracks metadata and per-timestep state for a Dask array assembled from chunks sent by scheduling actors.

Parameters:
  • name (str) – Array name registered with the head actor.

  • f_preprocessing (Callable) – Preprocessing callback applied to each chunk.

name

Array name without timestep suffix.

Type:

str

f_preprocessing

Preprocessing callback supplied at registration.

Type:

Callable

fully_defined

Set when every chunk owner has been registered.

Type:

asyncio.Event

nb_chunks_per_dim

Number of chunks per dimension. Set when first chunk owner is registered.

Type:

tuple[int, …] or None

nb_chunks

Total number of chunks in the array. Set when first chunk owner is registered.

Type:

int or None

chunks_size

For each dimension, the size of chunks in that dimension. None values indicate unknown chunk sizes.

Type:

list[list[int | None]] or None

dtype

The numpy dtype of the array chunks. Set when first chunk owner is registered.

Type:

np.dtype or None

position_to_node_actorID

Mapping from chunk position to the scheduling actor responsible for that chunk.

Type:

dict[tuple[int, …], int]

position_to_bridgeID

Mapping from chunk position to the producing bridge ID.

Type:

dict[tuple, int]

nb_scheduling_actors

Number of unique scheduling actors owning chunks of this array. Set when all chunk owners are known.

Type:

int or None

chunk_refs

For each timestep, the list of per-actor references that keep chunk payloads alive in the object store.

Type:

dict[Timestep, list[ray.ObjectRef]]

pos_to_ref_by_timestep

For each timestep, the (position, ref) pairs provided by scheduling actors. Used when distributed scheduling is disabled.

Type:

defaultdict

add_chunk_ref(chunk_ref: ObjectRef, timestep: Hashable, pos_to_ref) bool[source]

Add a reference sent by a scheduling actor.

Parameters:
  • chunk_ref (ray.ObjectRef) – Ray object reference to a chunk sent by a scheduling actor.

  • timestep (Timestep) – The timestep this chunk belongs to.

  • pos_to_ref (dict[tuple, ray.ObjectRef]) – Mapping of chunk position to the (double) Ray ObjectRef provided by the scheduling actor for this timestep.

Returns:

True if all chunks for this timestep are ready (i.e., all scheduling actors have sent their chunks), False otherwise.

Return type:

bool

Notes

This method adds the chunk reference to the list for the given timestep. It returns True only when all chunk owners are known and all have sent their chunks for this timestep.

get_full_array(timestep: Hashable, *, distributing_scheduling_enabled: bool, is_preparation: bool = False) Array[source]

Return the full Dask array for a given timestep.

Parameters:
  • timestep (Timestep) – The timestep for which the full array should be returned.

  • distributing_scheduling_enabled (bool) – When True, emit a graph containing ChunkRef tasks for distributed scheduling. When False, materialise the actual chunk payloads and build a local Dask array.

  • is_preparation (bool, optional) – If True, the array will not contain ObjectRefs to the actual data. This is used for preparation arrays where only the structure is needed. Default is False.

Returns:

A Dask array representing the full decomposed array for the given timestep. The array is constructed from chunk references stored in Ray’s object store.

Return type:

da.Array

Raises:

AssertionError – If not all chunk owners have been registered, or if the number of chunks is inconsistent.

Notes

When distributed scheduling is enabled the graph uses ChunkRef placeholders that keep data owner information. Otherwise the concrete chunk payloads are inlined. Chunk reference lists are deleted after embedding in the graph to avoid leaking memory. is_preparation skips storing payload refs entirely so analytics can inspect shapes/chunks without materialising data.

update_meta(nb_chunks_per_dim: tuple[int, ...], dtype: dtype, position: tuple[int, ...], size: tuple[int, ...], node_actor_id: int, bridge_id: int) None[source]

Register a scheduling actor as the owner of a chunk at a specific position.

This method records which scheduling actor is responsible for a chunk and updates the array metadata. If this is the first chunk registered, it initializes the array dimensions and dtype.

Parameters:
  • nb_chunks_per_dim (tuple[int, ...]) – Number of chunks per dimension in the array decomposition.

  • dtype (np.dtype) – The numpy dtype of the chunk.

  • position (tuple[int, ...]) – The position of the chunk in the array decomposition.

  • size (tuple[int, ...]) – The size of the chunk along each dimension.

  • node_actor_id (int) – Scheduling actor that owns this chunk.

  • bridge_id (int) – Bridge identifier that produced the chunk (used for lookups).

Raises:

AssertionError – If the chunk position is out of bounds, or if subsequent chunks have inconsistent dimensions, dtype, or sizes compared to the first chunk.

class deisa.ray.types.GraphInfo[source]

Bases: object

Information about graphs and their scheduling.

Tracks scheduling status and produced references for a Dask task graph scheduled by a SchedulingActor.

scheduled_event

Event set once the graph has been submitted to Ray.

Type:

asyncio.Event

refs

Mapping from task key to the double Ray ObjectRef returned by the patched Dask-on-Ray scheduler.

Type:

dict[str, ray.ObjectRef]

class deisa.ray.types.PartialArray[source]

Bases: object

Internal class tracking metadata and chunks for an array.

Maintains metadata for a single array on one scheduling actor, including the set of locally owned chunks and per-timestep chunk collections.

ready_event

Set once all local bridges have provided metadata for the array and the head actor has been notified.

Type:

asyncio.Event

chunks_contained_meta

Metadata tuples (bridge_id, chunk_position, chunk_size) for the chunks owned by this actor.

Type:

set[tuple[int, tuple[int, …], tuple[int, …]]]

bid_to_pos

Maps bridge_id to the corresponding chunk position for quick lookup when forwarding payloads.

Type:

dict[int, tuple]

per_timestep_arrays

Per-timestep structures that hold chunk references as they arrive.

Type:

AsyncDict[Timestep, ArrayPerTimestep]

class deisa.ray.types.ScheduledByOtherActor(actor_id: ActorID)[source]

Bases: object

Represents a task that is scheduled by another actor.

This class is used as a placeholder in Dask task graphs to indicate that a task should be scheduled by a different actor. When a task graph is sent to an actor, tasks marked with this class will be delegated to the specified actor.

Parameters:

actor_id (int) – The ID of the scheduling actor that should schedule this task.

Notes

This is used to handle cross-actor task dependencies in distributed Dask computations where different parts of the task graph are handled by different scheduling actors.

actor_id: ActorID
class deisa.ray.types.WindowArrayDefinition(name: str, window_size: int | None = None, preprocess: ~typing.Callable = <function WindowArrayDefinition.<lambda>>)[source]

Bases: object

Description of an array with optional windowing support.

Parameters:
  • name (str) – The name of the array.

  • window_size (int or None, optional) – If specified, creates a sliding window of arrays for this array name. The window will contain the last window_size timesteps. If None, only the current timestep array is provided. Default is None.

  • preprocess (Callable, optional) – A preprocessing function to apply to chunks of this array before they are sent to the analytics. The function should take a numpy array and return a processed numpy array. Default is the identity function (no preprocessing).

Examples

>>> def normalize(arr):
...     return arr / arr.max()
>>> # Array with windowing: last 5 timesteps
>>> array_def = ArrayDefinition(name="temperature", window_size=5, preprocess=normalize)
>>> # Array without windowing: current timestep only
>>> array_def = ArrayDefinition(name="pressure", window_size=None)
name: str
preprocess()
window_size: int | None = None