Analytics

Simple example

from deisa.ray.window_handler import Deisa
from deisa.ray.types import WindowSpec

deisa = Deisa()

def simulation_callback(array: list[DeisaArray]):
    x = array[0].dask.sum().compute()
    print("Sum:", x)

deisa.register_callback(
    simulation_callback,
    [WindowSpec("array")],
)
deisa.execute_callbacks()

Several arrays

from deisa.ray.window_handler import Deisa
from deisa.ray.types import WindowSpec

deisa = Deisa()

def simulation_callback(a: list[DeisaArray], b: list[DeisaArray]):
    r = (a[0].dask - a[0].dask).sum().compute()

deisa.register_callback(
    simulation_callback,
    [WindowSpec("a"), WindowSpec("b")]
)
deisa.execute_callbacks()

Sliding window

If the analysis requires access to several iterations (for example, to compute a time derivative), it is possible to use the window_size parameter.

from deisa.ray.window_handler import Deisa
from deisa.ray.types import WindowSpec

deisa = Deisa()

def simulation_callback(arrays: list[DeisaArray]):
    if len(arrays) < 2:  # For the first iteration
        return

    current_array = arrays[1].dask
    previous_array = arrays[0].dask

    ...

deisa.register_callback(
    simulation_callback,
    [
        WindowSpec("array", window_size=2),  # Enable sliding window
    ],
)
deisa.execute_callbacks()

Dask persist

Dask’s persist is supported:

from deisa.ray.window_handler import Deisa
from deisa.ray.types import WindowSpec

deisa = Deisa()

def simulation_callback(array: list[DeisaArray]):
    x = array[0].dask.sum().persist()

    # x is still a Dask array, but the sum is being computed in the background
    assert isinstance(x, da.Array)

    x_final = x.compute()
    assert x_final == 10 * timestep

deisa.register_callback(
    simulation_callback,
    [WindowSpec("array")],
)

deisa.execute_callbacks()