deisa.ray.scheduling_actor module

class deisa.ray.scheduling_actor.NodeActorBase(actor_id: int, arrays_metadata: Dict[str, Dict] = {})[source]

Bases: object

Actor responsible for gathering chunks and exchanging data with analytics.

Each node actor is associated with a specific node and is responsible for:

  • Collecting chunks of arrays sent by simulation nodes (via Bridge)

  • Registering its owned chunks with the head node

  • Providing a small key/value channel (set, get, delete) for non-chunked feedback between analytics and simulation.

The SchedulingActor subclass adds graph-scheduling behaviour on top of this base functionality.

Parameters:

actor_id (int) – Unique identifier for this node actor, typically derived from the node ID.

actor_id

The unique identifier for this node actor.

Type:

int

actor_handle

Handle to this actor instance.

Type:

RayActorHandle

head

Handle to the head node (HeadNodeActor).

Type:

RayActorHandle

partial_arrays

Per-array containers that capture metadata and chunk references owned by this node actor.

Type:

AsyncDict[str, PartialArray]

feedback_non_chunked

Dictionary for storing non-chunked feedback values shared between analytics and simulation.

Type:

dict

async add_chunk(bridge_id: int, array_name: str, chunk_ref: list[ObjectRef], timestep: int, chunked: bool = True, *args, _ray_trace_ctx=None, **kwargs) None[source]

Add a chunk of data to this node actor.

This method is called by Bridge instances to send chunks of arrays to this node actor. When all chunks from a node are received, the actor forwards a position->double-ref mapping to the head node.

Parameters:
  • bridge_id (int) – Identifier of the bridge that owns this chunk.

  • array_name (str) – Name of the array receiving the chunk.

  • chunk_ref (list[ray.ObjectRef]) – Single-element list containing the Ray ObjectRef to the chunk data. The extra list level is kept for Dask compatibility.

  • timestep (int) – Timestep index the chunk belongs to.

  • chunked (bool, optional) – Reserved for future multi-chunk sends. Must remain True for the current workflow. Default is True.

Return type:

None

Raises:

ContractError – If the array has not been registered via register_chunk_meta() before chunks are added.

Notes

This method manages chunk collection and coordination: 1. Wraps the single chunk ref in a double ref by calling

_pack_object_ref() remotely.

  1. Stores the double ref in the per-timestep structure.

  2. When all local chunks have arrived, builds a {chunk_position: double_ref} mapping and sends it to the head actor via HeadNodeActor.chunks_ready().

  3. Pickles stored refs to drop in-memory handles and free memory.

  4. Signals or waits on the per-timestep event so callers block until the node’s share of chunks for the timestep is complete.

delete(*args, key: Hashable, _ray_trace_ctx=None, **kwargs) None[source]

Delete a feedback value if present.

Parameters:

key (Hashable) – Identifier to remove from the non-chunked feedback store.

Notes

Missing keys are ignored to keep the call idempotent.

get(key, default=None, chunked=False, *, _ray_trace_ctx=None) Any[source]

Retrieve a feedback value previously stored with set().

Parameters:
  • key (Hashable) – Identifier of the requested value.

  • default (Any, optional) – Value returned when key is not present. Default is None.

  • chunked (bool, optional) – Placeholder for chunked feedback retrieval. Must remain False today. Default is False.

Returns:

Stored value or default if missing.

Return type:

Any

preprocessing_callbacks(*, _ray_trace_ctx=None) ObjectRef[source]

Get the preprocessing callbacks for all arrays.

Returns:

ObjectRef to a dictionary mapping array names to their preprocessing callback functions.

Return type:

ray.ObjectRef

Notes

This method returns an ObjectRef rather than the actual dictionary to avoid blocking. The callbacks are retrieved from the head node and used by Bridge instances to preprocess chunks before sending them to this node actor.

ready(*, _ray_trace_ctx=None) None[source]

Check if the node actor is ready.

Returns:

Always returns None. This method serves as a readiness check for the actor.

Return type:

None

Notes

This method can be called to verify that the actor has been successfully initialized and is ready to receive requests. It is used by get_ready_actor_with_retry to ensure the actor is operational before returning its handle.

async register_chunk_meta(bridge_id: int, array_name: str, chunk_shape, nb_chunks_per_dim, nb_chunks_of_node: int, dtype, chunk_position, *, _ray_trace_ctx=None) None[source]

Register chunk metadata contributed by a bridge on this node.

Parameters:
  • bridge_id (int) – Identifier of the bridge sending the chunk.

  • array_name (str) – Name of the array being populated.

  • chunk_shape (tuple[int, ...]) – Shape of the chunk owned by this bridge.

  • nb_chunks_per_dim (tuple[int, ...]) – Number of chunks per dimension in the global decomposition.

  • nb_chunks_of_node (int) – Number of chunks contributed by this node for the array.

  • dtype (Any) – NumPy dtype of the chunk.

  • chunk_position (tuple[int, ...]) – Position of this chunk in the global grid.

Notes

Once all bridges on the node have registered, the node actor forwards the consolidated metadata to the head actor and toggles the per-array event so concurrent callers can proceed. Until that point additional callers block on ready_event.

set(*args, key: Hashable, value: Any, chunked: bool = False, _ray_trace_ctx=None, **kwargs) None[source]

Store a feedback value shared between analytics and simulation.

Parameters:
  • key (Hashable) – Identifier for the feedback value.

  • value (Any) – Value to store.

  • chunked (bool, optional) – Placeholder for future chunked feedback support. Must remain False today. Default is False.

Notes

The *args and **kwargs parameters are accepted for forward compatibility with future signatures but are currently unused.