Source code for deisa.ray.utils
from typing import Dict
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
import time
import ray
import random
[docs]
def get_ready_actor_with_retry(name, namespace, deadline_s=180):
"""
Get a Ray actor by name with retry logic and readiness check.
This function attempts to retrieve a Ray actor by name and namespace,
checking that it is ready before returning. It implements exponential
backoff retry logic with a deadline.
Parameters
----------
name : str
The name of the actor to retrieve.
namespace : str
The namespace of the actor.
deadline_s : float, optional
Maximum time in seconds to wait for the actor to become available.
Default is 180.
Returns
-------
RayActorHandle
The handle to the ready actor.
Raises
------
TimeoutError
If the actor is not found or not ready within the deadline.
Notes
-----
The function uses exponential backoff with jitter for retries. The delay
starts at 0.2 seconds and increases by a factor of 1.5 up to a maximum
of 5.0 seconds. A small random jitter (0-0.1 seconds) is added to avoid
thundering herd problems.
"""
start, delay = time.time(), 0.2
while True:
try:
actor = ray.get_actor(name=name, namespace=namespace)
# ready gate
# TODO for even more reliability, in the future we should handle
# actor exists, but unavailable
# actor exists, crashed, need to recreate
ray.get(actor.ready.remote())
return actor
except ValueError:
if time.time() - start > deadline_s:
raise TimeoutError(f"{namespace}/{name} not found in {deadline_s}s")
time.sleep(delay + random.random() * 0.1)
delay = min(delay * 1.5, 5.0)
[docs]
def get_head_node_id() -> str:
"""
Get the node ID of the Ray cluster head node.
Returns
-------
str
The node ID of the head node.
Raises
------
AssertionError
If there is not exactly one head node in the cluster.
Notes
-----
This function queries Ray's state API to find the head node. It assumes
there is exactly one head node in the cluster.
"""
from ray.util import state
nodes = state.list_nodes(filters=[("is_head_node", "=", True)])
assert len(nodes) == 1, "There should be exactly one head node"
return nodes[0].node_id
[docs]
def get_head_actor_options() -> dict:
"""
Return the options that should be used to start the head actor.
Returns
-------
dict
Dictionary of Ray actor options including:
- name: "simulation_head"
- namespace: "deisa_ray"
- scheduling_strategy: NodeAffinitySchedulingStrategy for the head node
- max_concurrency: Very high value to prevent blocking
- lifetime: "detached" to persist beyond function scope
- enable_task_events: False for performance
Notes
-----
The head actor is scheduled on the head node with a detached lifetime
to ensure it persists. High concurrency is set to prevent the actor
from being blocked when gathering many references.
"""
return dict(
# The workers will be able to access to this actor using its name
name="simulation_head",
namespace="deisa_ray",
# Schedule the actor on this node
scheduling_strategy=NodeAffinitySchedulingStrategy(
node_id=get_head_node_id(),
soft=False,
),
# Prevents the actor from being stuck when it needs to gather many refs
max_concurrency=1000_000_000,
# Prevents the actor from being deleted when the function ends
lifetime="detached",
# Disabled for performance reasons
enable_task_events=False,
)