deisa.ray.types module¶
- type deisa.ray.types.ActorID = str¶
- class deisa.ray.types.ArrayPerTimestep[source]¶
Bases:
objectInternal class tracking chunks for a specific array and timestep.
Tracks the per-timestep chunks owned by a single scheduling actor. Each instance is keyed by
timestepinside aPartialArray.- chunks_ready_event¶
Triggered when all chunks for this timestep owned by the node actor have arrived and been forwarded to the head actor.
- Type:
asyncio.Event
- local_chunks¶
Mapping of
bridge_idto the double ObjectRef for that chunk. Once forwarded to the head actor the value is replaced with pickled bytes to free memory.- Type:
AsyncDict[int, ray.ObjectRef | bytes]
- class deisa.ray.types.ChunkRef(ref: ObjectRef, actorid: int, array_name: str, timestep: Hashable, bridge_id: int)[source]¶
Bases:
objectRepresents a chunk of an array in a Dask task graph.
This class is used as a placeholder in Dask task graphs to represent a chunk of data. The task corresponding to this object must be scheduled by the actor who has the actual data. This class is used since Dask tends to inline simple tuples, which would prevent proper scheduling.
- Parameters:
ref (ray.ObjectRef) – Ray ObjectRef that eventually points to the chunk data. This is a
refof arefproduced by the patched Dask scheduler.actorid (int) – The ID of the scheduling actor that owns this chunk.
array_name (str) – The real name of the array, without the timestep suffix.
timestep (Timestep) – The timestep this chunk belongs to.
bridge_id (int) – Identifier of the bridge that produced this chunk.
Notes
This class is used to prevent Dask from inlining simple tuples in the task graph, which would break the scheduling mechanism. The behavior may change in newer versions of Dask.
- actorid: int¶
- array_name: str¶
- bridge_id: int¶
- ref: ObjectRef¶
- timestep: Hashable¶
- class deisa.ray.types.DaskArrayData(name)[source]¶
Bases:
objectInformation about a Dask array being built.
Tracks metadata and per-timestep state for a Dask array assembled from chunks sent by scheduling actors.
- Parameters:
name (str) – Array name registered with the head actor.
- name¶
Array name without timestep suffix.
- Type:
str
- fully_defined¶
Set when every chunk owner has been registered.
- Type:
asyncio.Event
- nb_chunks_per_dim¶
Number of chunks per dimension. Set when first chunk owner is registered.
- Type:
tuple[int, …] or None
- nb_chunks¶
Total number of chunks in the array. Set when first chunk owner is registered.
- Type:
int or None
- chunks_size¶
For each dimension, the size of chunks in that dimension. None values indicate unknown chunk sizes.
- Type:
list[list[int | None]] or None
- dtype¶
The numpy dtype of the array chunks. Set when first chunk owner is registered.
- Type:
np.dtype or None
- position_to_node_actorID¶
Mapping from chunk position to the scheduling actor responsible for that chunk.
- Type:
dict[tuple[int, …], int]
- position_to_bridgeID¶
Mapping from chunk position to the producing bridge ID.
- Type:
dict[tuple, int]
- nb_scheduling_actors¶
Number of unique scheduling actors owning chunks of this array. Set when all chunk owners are known.
- Type:
int or None
- chunk_refs¶
For each timestep, the list of per-actor references that keep chunk payloads alive in the object store.
- Type:
dict[Timestep, list[ray.ObjectRef]]
- pos_to_ref_by_timestep¶
For each timestep, the (position, ref) pairs provided by scheduling actors. Used when distributed scheduling is disabled.
- Type:
defaultdict
- add_chunk_ref(chunk_ref: ObjectRef, timestep: Hashable, pos_to_ref: dict[tuple, ObjectRef]) bool[source]¶
Add a reference sent by a scheduling actor.
- Parameters:
chunk_ref (ray.ObjectRef) – Ray object reference to a chunk sent by a scheduling actor.
timestep (Timestep) – The timestep this chunk belongs to.
pos_to_ref (dict[tuple, ray.ObjectRef]) – Mapping of chunk position to the (double) Ray ObjectRef provided by the scheduling actor for this timestep.
- Returns:
True if all chunks for this timestep are ready (i.e., all scheduling actors have sent their chunks), False otherwise.
- Return type:
bool
Notes
This method adds the chunk reference to the list for the given timestep. It returns True only when all chunk owners are known and all have sent their chunks for this timestep.
- chunk_refs: dict[Hashable, list[ObjectRef]]¶
- chunks_size: list[list[int | None]] | None¶
- dtype: dtype | None¶
- fully_defined: Event¶
- get_full_array(timestep: Hashable, *, distributing_scheduling_enabled: bool) Array[source]¶
Return the full Dask array for a given timestep.
- Parameters:
timestep (Timestep) – The timestep for which the full array should be returned.
- Returns:
A Dask array representing the full decomposed array for the given timestep. The array is constructed from chunk references stored in Ray’s object store.
- Return type:
da.Array
- Raises:
AssertionError – If not all chunk owners have been registered, or if the number of chunks is inconsistent.
Notes
When distributed scheduling is enabled the graph uses
ChunkRefplaceholders that keep data owner information. Otherwise the concrete chunk payloads are inlined. Chunk reference lists are deleted after embedding in the graph to avoid leaking memory.
- nb_chunks: int | None¶
- nb_chunks_per_dim: tuple[int, ...] | None¶
- nb_scheduling_actors: int | None¶
- pos_to_ref_by_timestep: dict[Hashable, list[tuple[tuple, ObjectRef]]]¶
- position_to_bridgeID: dict[tuple, int]¶
- position_to_node_actorID: dict[tuple[int, ...], int]¶
- update_meta(nb_chunks_per_dim: tuple[int, ...], dtype: dtype, position: tuple[int, ...], size: tuple[int, ...], node_actor_id: int, bridge_id: int) None[source]¶
Register a scheduling actor as the owner of a chunk at a specific position.
This method records which scheduling actor is responsible for a chunk and updates the array metadata. If this is the first chunk registered, it initializes the array dimensions and dtype.
- Parameters:
nb_chunks_per_dim (tuple[int, ...]) – Number of chunks per dimension in the array decomposition.
dtype (np.dtype) – The numpy dtype of the chunk.
position (tuple[int, ...]) – The position of the chunk in the array decomposition.
size (tuple[int, ...]) – The size of the chunk along each dimension.
node_actor_id (int) – Scheduling actor that owns this chunk.
bridge_id (int) – Bridge identifier that produced the chunk (used for lookups).
- Raises:
AssertionError – If the chunk position is out of bounds, or if subsequent chunks have inconsistent dimensions, dtype, or sizes compared to the first chunk.
- class deisa.ray.types.DeisaArray(dask: dask.array.core.Array, t: int)[source]¶
Bases:
object- dask: Array¶
- t: int¶
- type deisa.ray.types.DoubleRef = ObjectRef¶
- class deisa.ray.types.GraphInfo[source]¶
Bases:
objectInformation about graphs and their scheduling.
Tracks scheduling status and produced references for a Dask task graph scheduled by a
SchedulingActor.- scheduled_event¶
Event set once the graph has been submitted to Ray.
- Type:
asyncio.Event
- refs¶
Mapping from task key to the double Ray ObjectRef returned by the patched Dask-on-Ray scheduler.
- Type:
dict[str, ray.ObjectRef]
- refs: dict[str, ObjectRef]¶
- type deisa.ray.types.GraphKey = Any¶
- type deisa.ray.types.GraphValue = Any¶
- class deisa.ray.types.PartialArray[source]¶
Bases:
objectInternal class tracking metadata and chunks for an array.
Maintains metadata for a single array on one scheduling actor, including the set of locally owned chunks and per-timestep chunk collections.
- ready_event¶
Set once all local bridges have provided metadata for the array and the head actor has been notified.
- Type:
asyncio.Event
- chunks_contained_meta¶
Metadata tuples
(bridge_id, chunk_position, chunk_size)for the chunks owned by this actor.- Type:
set[tuple[int, tuple[int, …], tuple[int, …]]]
- bid_to_pos¶
Maps
bridge_idto the corresponding chunk position for quick lookup when forwarding payloads.- Type:
dict[int, tuple]
- per_timestep_arrays¶
Per-timestep structures that hold chunk references as they arrive.
- Type:
AsyncDict[Timestep, ArrayPerTimestep]
- bid_to_pos: dict[int, tuple]¶
- chunks_contained_meta: set[tuple[int, tuple[int, ...], tuple[int, ...]]]¶
- per_timestep_arrays: AsyncDict[Hashable, ArrayPerTimestep]¶
- class deisa.ray.types.ScheduledByOtherActor(actor_id: ActorID)[source]¶
Bases:
objectRepresents a task that is scheduled by another actor.
This class is used as a placeholder in Dask task graphs to indicate that a task should be scheduled by a different actor. When a task graph is sent to an actor, tasks marked with this class will be delegated to the specified actor.
- Parameters:
actor_id (int) – The ID of the scheduling actor that should schedule this task.
Notes
This is used to handle cross-actor task dependencies in distributed Dask computations where different parts of the task graph are handled by different scheduling actors.
- class deisa.ray.types.WindowSpec(name: str, window_size: int | None = None)[source]¶
Bases:
objectDescription of an array with optional windowing support.
- Parameters:
name (str) – The name of the array.
window_size (int or None, optional) – If specified, creates a sliding window of arrays for this array name. The window will contain the last window_size timesteps. If None, only the current timestep array is provided. Default is None.
Examples
>>> def normalize(arr): ... return arr / arr.max() >>> # Array with windowing: last 5 timesteps >>> array_def = ArrayDefinition(name="temperature", window_size=5) >>> # Array without windowing: current timestep only >>> array_def = ArrayDefinition(name="pressure", window_size=None)
- name: str¶
- window_size: int | None = None¶