Analytics

In callback examples, the name passed to WindowSpec is also the keyword argument name used when DEISA calls the callback. Each argument is a list[DeisaArray] ordered from oldest to newest; with no explicit window_size the list contains only the latest shared timestep.

Simple example

from deisa.ray.window_handler import Deisa
from deisa.ray.types import DeisaArray, WindowSpec

d = Deisa()

@d.callback(WindowSpec("temperature"))
def summarize_temperature(temperature: list[DeisaArray]):
    mean_temperature = temperature[0].dask.mean().compute()
    print("Mean temperature:", mean_temperature)

d.execute_callbacks()

Several arrays

from deisa.ray.window_handler import Deisa
from deisa.ray.types import DeisaArray, WindowSpec

d = Deisa()

@d.callback(WindowSpec("temperature"), WindowSpec("pressure"))
def compare_state(
    temperature: list[DeisaArray],
    pressure: list[DeisaArray],
):
    thermal_pressure_balance = (
        temperature[0].dask - pressure[0].dask
    ).mean().compute()
    print("thermal-pressure balance:", thermal_pressure_balance)

d.execute_callbacks()

when="AND" and when="OR"

When a callback depends on several arrays, when controls which arrivals are allowed to trigger it.

Before applying when, DEISA first groups shares by timestep. It keeps collecting arrays for the current timestep and does not analyze callbacks for that timestep until it receives any share from a higher timestep, or the final close sentinel. The higher-timestep share acts as the signal that the previous timestep is complete enough to inspect.

incoming shares:

event      share received                  DEISA action
1          temperature, t = 1              collect t = 1; do not call yet
2          pressure, t = 1                 collect t = 1; do not call yet
3          velocity, t = 1                 collect t = 1; do not call yet
4          temperature, t = 2              now analyze callbacks for t = 1
                                            keep temperature, t = 2 for later

After that boundary is reached, DEISA uses the callback’s when value to decide whether the callback should run for the timestep being analyzed. when is evaluated per callback, using only the arrays requested by that callback.

when="AND" is the default. The callback runs only if all arrays requested by that callback have a new share for the timestep being analyzed. If a callback asks for temperature, pressure, and velocity, but only temperature and pressure were shared for timestep 2, the callback is not called.

when="OR" runs if at least one array requested by the callback has a new share for the timestep being analyzed. Arrays that did not receive a new share reuse their most recent available window, after they have been seen at least once. If none of the arrays requested by a callback have a new share, the callback is not called in either AND or OR mode.

Callback inputs:
    WindowSpec("temperature"), WindowSpec("pressure"), WindowSpec("velocity")

timestep being analyzed      new shares seen          when="AND"        when="OR"
t = 1                        temperature, pressure,   run               run
                             velocity
t = 2                        temperature, pressure    do not run        run
t = 3                        none of these arrays     do not run        do not run

In both modes, callback arguments are still lists of DeisaArray objects. With when="OR", a list may be reused from an older timestep if that array did not produce a new share for the current trigger.

Sliding window

If the analysis requires access to several iterations (for example, to compute a time derivative), it is possible to use the window_size parameter. Choose the window size from both the available system memory and the analytics algorithm you want to run. A window_size of 5 means DEISA may need to keep five timesteps of that array available, so it costs more memory than a smaller window. It is the right choice for an analysis that needs five temporal points, such as a five-point stencil finite-difference approximation.

The window size is an upper bound on what the callback receives, not a requirement that every operation in the callback must consume the whole window. A callback registered with window_size=5 can still compute single-timestep statistics from temperature[-1], three-timestep estimates from the newest three entries, and five-timestep estimates only when all five entries are available.

Callbacks are called as soon as their input arrays are available, even before the sliding window is full. During the first few calls, the list may contain fewer entries than window_size. The callback must check len(window) for any operation that needs a minimum number of timesteps.

from deisa.ray.window_handler import Deisa
from deisa.ray.types import DeisaArray, WindowSpec

d = Deisa()

@d.callback(WindowSpec("temperature", window_size=5))
def estimate_temperature_change(temperature: list[DeisaArray]):
    latest_mean = temperature[-1].dask.mean().compute()
    print("mean temperature:", latest_mean)

    if len(temperature) >= 3:
        newest = temperature[-1]
        middle = temperature[-2]
        oldest = temperature[-3]
        three_point_rate = (
            newest.dask - oldest.dask
        ) / (newest.t - oldest.t)
        print("three-point mean dT/dt:", three_point_rate.mean().compute())

    if len(temperature) < 5:
        return

    five_point_average = sum(
        timestep.dask for timestep in temperature
    ) / 5
    print("five-point average:", five_point_average.mean().compute())

d.execute_callbacks()

Dask persist

Dask’s persist is supported:

import dask.array as da
from deisa.ray.window_handler import Deisa
from deisa.ray.types import DeisaArray, WindowSpec

d = Deisa()

@d.callback(WindowSpec("vorticity"))
def track_vorticity(vorticity: list[DeisaArray]):
    total_vorticity = vorticity[0].dask.sum().persist()

    # The result is still a Dask array, but the sum is computing in the background.
    assert isinstance(total_vorticity, da.Array)

    print("t=", vorticity[0].t, "total vorticity=", total_vorticity.compute())

d.execute_callbacks()

Saving to HDF5

DeisaArray provides a convenience method for writing one array to HDF5:

from deisa.ray.window_handler import Deisa
from deisa.ray.types import DeisaArray, WindowSpec

d = Deisa()

@d.callback(WindowSpec("temperature"))
def save_hotspot_temperature(temperature: list[DeisaArray]):
    if temperature[0].t == 5:
        temperature[0].to_hdf5("interesting-event.h5", "temperature")

d.execute_callbacks()

If you want to save several arrays into the same HDF5 file, use deisa.ray.types.to_hdf5:

from deisa.ray.types import DeisaArray, WindowSpec, to_hdf5
from deisa.ray.window_handler import Deisa

d = Deisa()

@d.callback(WindowSpec("temperature"), WindowSpec("pressure"))
def save_state_snapshot(
    temperature: list[DeisaArray],
    pressure: list[DeisaArray],
):
    if temperature[0].t == 5:
        to_hdf5(
            "state.h5",
            {
                "temperature": temperature[0],
                "pressure": pressure[0],
            },
        )

d.execute_callbacks()

Converting to Xarray

If you want to work with Xarray APIs, build an xarray.DataArray from the underlying Dask array:

import xarray as xr
from deisa.ray.window_handler import Deisa
from deisa.ray.types import DeisaArray, WindowSpec

d = Deisa()

@d.callback(WindowSpec("temperature"))
def inspect_temperature_field(temperature: list[DeisaArray]):
    temperature_da = xr.DataArray(
        temperature[0].dask,
        dims=["x", "y"],
        name="temperature",
    )

    print("timestep:", temperature[0].t)
    print(temperature_da)

d.execute_callbacks()

Saving Xarray to NetCDF

One convenient pattern is to convert the DeisaArray to an xarray.DataArray and then write it to NetCDF:

import xarray as xr
from deisa.ray.window_handler import Deisa
from deisa.ray.types import DeisaArray, WindowSpec

d = Deisa()

@d.callback(WindowSpec("temperature"))
def save_temperature_netcdf(temperature: list[DeisaArray]):
    if temperature[0].t == 5:
        xarray_da = xr.DataArray(
            temperature[0].dask,
            dims=["x", "y"],
            name="temperature",
        ).compute()

        xarray_da.to_netcdf("interesting-event.nc")

d.execute_callbacks()