deisa.ray.window_handler module

class deisa.ray.window_handler.Deisa(*, ray_start: Callable[[], None] | None = None, max_simulation_ahead: int = 1)[source]

Bases: object

Entry point that orchestrates analytics callbacks on Ray.

Provides an API for registering sliding window callbacks and executing them as arrays arrive from simulation ranks.

callback(*window_specs, exception_handler: Callable | None = None, when: Literal['AND', 'OR'] = 'AND')[source]

Decorator that registers a sliding-window analytics callback.

Parameters:
  • *window_specs (WindowSpec) – Array descriptions the callback should receive.

  • exception_handler (Optional[Callable], optional) – Handler invoked when the user callback raises. Defaults to deisa.ray.errors._default_exception_handler().

  • when (Literal["AND", "OR"], optional) – Governs whether all arrays ("AND") or any array ("OR") must be available before the callback runs. Defaults to "AND".

Returns:

Decorator that registers simulation_callback with the window handler.

Return type:

Callable

determine_callback_args(description_of_arrays_needed) dict[str, List[DeisaArray]][source]

Build the kwargs passed to a simulation callback.

Parameters:

description_of_arrays_needed (Sequence[WindowSpec]) – Array descriptions requested by the callback.

Returns:

Mapping from array name to the latest (windowed) list of DeisaArray instances.

Return type:

dict[str, List[DeisaArray]]

execute_callbacks() None[source]

Execute the registered simulation callback loop.

Notes

Supports a single registered callback at present. Manages array retrieval from the head actor, windowed array delivery, and garbage collection between iterations.

generate_queue_per_array()[source]

Prepare per-array queues that respect declared window sizes.

Notes

Each queue is a collections.deque with maxlen matching the largest window requested for that array.

register_callback(simulation_callback: Callable, arrays_spec: list[WindowSpec], exception_handler: Callable | None = None, when: Literal['AND', 'OR'] = 'AND') Callable[source]

Register the analytics callback and array descriptions.

Parameters:
  • simulation_callback (Callable) – Function to run for each iteration; receives arrays as kwargs and timestep.

  • arrays_spec (list[WindowSpec]) – Descriptions of arrays to stream to the callback (with optional sliding windows). Maximum iterations to execute. Default is a large sentinel.

  • exception_handler (Optional[Callable]) – Exception handler to handle any exception thrown by simulation (like division by zero). Defaults to printing the error and moving on.

  • when (Literal['AND', 'OR']) – When callback have multiple arrays, govern when callback should be called. AND: only call callback if ALL required arrays have been shared for a given timestep. OR: call callback if ANY array has been shared for a given timestep.

Returns:

The original callback, allowing decorator-style usage.

Return type:

Callable

set(*, key: Hashable, value: Any, chunked: bool = False, persist: bool = False) None[source]

Broadcast a feedback value to all scheduling actors.

Parameters:
  • key (Hashable) – Identifier for the shared value.

  • value (Any) – Value to distribute.

  • chunked (bool, optional) – Placeholder for future distributed-array feedback. Only False is supported today. Default is False.

  • persist (bool, optional) – Whether the value should survive the next retrieval. Defaults to False.

Notes

The method lazily fetches node actors once and uses fire-and-forget remote calls; callers should not assume synchronous delivery.

should_call(description_of_arrays_needed, when: Literal['AND', 'OR']) bool[source]

Determine whether a callback should execute for the current state.

Parameters:
  • description_of_arrays_needed (Sequence[WindowSpec]) – Array descriptions governing the callback.

  • when (Literal["AND", "OR"]) – Execution mode specifying whether all arrays or any array must have new data.

Returns:

True when the callback criteria are met.

Return type:

bool

unregister_callback(simulation_callback: Callable) None[source]

Unregister a previously registered simulation callback.

Parameters:

simulation_callback (Callable) – Callback to remove from the registry.

Raises:

NotImplementedError – Always, as the feature has not been implemented yet.