Quick Start¶
Deisa-ray (Dask Enabled In Situ Analytics with a Ray backend) lets HPC simulations stream data into Python analytics while keeping computation close to where the data is produced. Dask builds task graphs from your analysis code, and Ray runs those tasks across the cluster. The result is fast, asynchronous analytics with minimal network transfer.
Assumptions and model¶
Simulation side¶
The simulation is distributed (MPI or similar) and iterative.
Any rank that will ever send data must instantiate a
Bridge.The total number of participating ranks (
world_size) is known up front.Each
Bridgehas a uniquebridge_id, and there is always a bridge withbridge_id=0(the master bridge).Each bridge describes the arrays it will share via
arrays_metadata.Sends are ordered by non-decreasing timestep: all sends for timestep i happen before any send for timestep j > i.
If data is produced on GPU, copy it to CPU before calling
Bridge.send.
Analytics side¶
Analytics run on a Ray head node. Dask arrays are backed by Ray tasks.
You register callbacks with
Deisaand then execute them.Callback arguments are lists of
DeisaArrayobjects, one list perWindowSpec.The array name in
WindowSpecmust match the bridge metadata name, otherwise the callback will not run for that array.The window list is time-ordered and only reflects the timesteps that were actually sent by the simulation.
Cluster setup (Ray)¶
Start a Ray head node on the analytics host, then join the simulation nodes. For example (often launched via Slurm):
ray start --head
ray start --address <head-node-address>
Simulation quick snippet¶
The simulation creates one Bridge per participating rank and sends chunks.
The example metadata structure mirrors tests/utils.py.
import numpy as np
from deisa.ray.bridge import Bridge
world_size = 4
sys_md = {
"world_size": world_size,
"master_address": "127.0.0.1",
"master_port": 29500,
}
arrays_md = {
"temperature": {
"chunk_shape": (64, 64),
"nb_chunks_per_dim": (4, 4),
"nb_chunks_of_node": 1,
"dtype": np.float64,
"chunk_position": (0, 0),
}
}
bridge = Bridge(
bridge_id=0,
arrays_metadata=arrays_md,
system_metadata=sys_md,
)
for t in range(10):
chunk = np.ones((64, 64), dtype=np.float64) * t
bridge.send(array_name="temperature", chunk=chunk, timestep=t)
bridge.close(timestep=10)
Analytics quick snippet¶
Define the analytics callback using Dask operations. DeisaArray.dask gives
access to standard Dask array methods, and DeisaArray.t is the timestep.
from deisa.ray.window_handler import Deisa
from deisa.ray.types import WindowSpec
deisa = Deisa()
def summary_callback(temperature_window):
latest = temperature_window[-1]
mean_value = latest.dask.mean().compute()
print(f"t={latest.t} mean={mean_value}")
deisa.register_callback(
summary_callback,
[WindowSpec("temperature", window_size=3)],
)
deisa.execute_callbacks()
Where to go next¶
Analytics shows more callback patterns and window usage.
The API reference under
deisa.raydocuments the full interface.