deisa.ray.window_handler module¶
- class deisa.ray.window_handler.Deisa(*, ray_start: Callable[[], None] | None = None, max_simulation_ahead: int = 1)[source]¶
Bases:
objectEntry point that orchestrates analytics callbacks on Ray.
Provides an API for registering sliding window callbacks and executing them as arrays arrive from simulation ranks.
- callback(*window_specs, exception_handler: Callable | None = None, when: Literal['AND', 'OR'] = 'AND')[source]¶
Decorator that registers a sliding-window analytics callback.
- Parameters:
*window_specs (WindowSpec) – Array descriptions the callback should receive.
exception_handler (Optional[Callable], optional) – Handler invoked when the user callback raises. Defaults to
deisa.ray.errors._default_exception_handler().when (Literal["AND", "OR"], optional) – Governs whether all arrays (
"AND") or any array ("OR") must be available before the callback runs. Defaults to"AND".
- Returns:
Decorator that registers
simulation_callbackwith the window handler.- Return type:
Callable
- determine_callback_args(description_of_arrays_needed) dict[str, List[DeisaArray]][source]¶
Build the kwargs passed to a simulation callback.
- Parameters:
description_of_arrays_needed (Sequence[WindowSpec]) – Array descriptions requested by the callback.
- Returns:
Mapping from array name to the latest (windowed) list of
DeisaArrayinstances.- Return type:
dict[str, List[DeisaArray]]
- execute_callbacks() None[source]¶
Execute the registered simulation callback loop.
Notes
Supports a single registered callback at present. Manages array retrieval from the head actor, windowed array delivery, and garbage collection between iterations.
- generate_queue_per_array()[source]¶
Prepare per-array queues that respect declared window sizes.
Notes
Each queue is a
collections.dequewithmaxlenmatching the largest window requested for that array.
- register_callback(simulation_callback: Callable, arrays_spec: list[WindowSpec], exception_handler: Callable | None = None, when: Literal['AND', 'OR'] = 'AND') Callable[source]¶
Register the analytics callback and array descriptions.
- Parameters:
simulation_callback (Callable) – Function to run for each iteration; receives arrays as kwargs and
timestep.arrays_spec (list[WindowSpec]) – Descriptions of arrays to stream to the callback (with optional sliding windows). Maximum iterations to execute. Default is a large sentinel.
exception_handler (Optional[Callable]) – Exception handler to handle any exception thrown by simulation (like division by zero). Defaults to printing the error and moving on.
when (Literal['AND', 'OR']) – When callback have multiple arrays, govern when callback should be called. AND: only call callback if ALL required arrays have been shared for a given timestep. OR: call callback if ANY array has been shared for a given timestep.
- Returns:
The original callback, allowing decorator-style usage.
- Return type:
Callable
- set(*, key: Hashable, value: Any, chunked: bool = False, persist: bool = False) None[source]¶
Broadcast a feedback value to all scheduling actors.
- Parameters:
key (Hashable) – Identifier for the shared value.
value (Any) – Value to distribute.
chunked (bool, optional) – Placeholder for future distributed-array feedback. Only
Falseis supported today. Default isFalse.persist (bool, optional) – Whether the value should survive the next retrieval. Defaults to
False.
Notes
The method lazily fetches node actors once and uses fire-and-forget remote calls; callers should not assume synchronous delivery.
- should_call(description_of_arrays_needed, when: Literal['AND', 'OR']) bool[source]¶
Determine whether a callback should execute for the current state.
- Parameters:
description_of_arrays_needed (Sequence[WindowSpec]) – Array descriptions governing the callback.
when (Literal["AND", "OR"]) – Execution mode specifying whether all arrays or any array must have new data.
- Returns:
Truewhen the callback criteria are met.- Return type:
bool