h_ray.RayWorkflowGraphAdapter

A Graph Adapter for delegating the execution of hamilton nodes to Ray.

class hamilton.plugins.h_ray.RayWorkflowGraphAdapter(result_builder: ResultMixin, workflow_id: str)

Class representing what’s required to make Hamilton run Ray Workflows

Use pip install sf-hamilton[ray] to get the dependencies required to run this.

Ray workflows is a more robust way to scale computation for any type of Hamilton graph.

What’s the difference between this and RayGraphAdapter?

  • Ray workflows offer durable computation. That is, they save and checkpoint each function.

  • This enables one to run a workflow, and not have to restart it if something fails, assuming correct Ray workflow usage.

Tips

See https://docs.ray.io/en/latest/workflows/basics.html for the source of the following:

  1. Functions should be idempotent.

  2. The workflow ID is what Ray uses to try to resume/restart if run a second time.

  3. Nothing is run until the entire DAG is walked and setup and build_result is called.

Notes on scaling:

  • Multi-core on single machine βœ…

  • Distributed computation on a Ray cluster βœ…

  • Scales to any size of data ⛔️; you are LIMITED by the memory on the instance/computer πŸ’».

Function return object types supported:

  • Works for any python object that can be serialized by the Ray framework. βœ…

Pandas?

  • ⛔️ Ray DOES NOT do anything special about Pandas.

CAVEATS

  • Serialization costs can outweigh the benefits of parallelism, so you should benchmark your code to see if it’s worth it.

DISCLAIMER – this class is experimental, so signature changes are a possibility!

__init__(result_builder: ResultMixin, workflow_id: str)

Constructor

Parameters:
  • result_builder – Required. An implementation of base.ResultMixin.

  • workflow_id – Required. An ID to give the ray workflow to identify it for durability purposes.

  • max_retries – Optional. The function will be retried for the given number of times if an exception is raised.

build_result(**outputs: Dict[str, Any]) Any

Builds the result and brings it back to this running process.

Parameters:

outputs – the dictionary of key -> Union[ray object reference | value]

Returns:

The type of object returned by self.result_builder.

static check_input_type(node_type: Type, input_value: Any) bool

Used to check whether the user inputs match what the execution strategy & functions can handle.

Static purely for legacy reasons.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

True if the input is valid, False otherwise.

static check_node_type_equivalence(node_type: Type, input_type: Type) bool

Used to check whether two types are equivalent.

Static, purely for legacy reasons.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

True if the types are equivalent, False otherwise.

do_build_result(outputs: Dict[str, Any]) Any

Implements the do_build_result method from the BaseDoBuildResult class. This is kept from the user as the public-facing API is build_result, allowing us to change the API/implementation of the internal set of hooks

do_check_edge_types_match(type_from: type, type_to: type) bool

Method that checks whether two types are equivalent. This is used when the function graph is being created.

Parameters:
  • type_from – The type of the node that is the source of the edge.

  • type_to – The type of the node that is the destination of the edge.

Return bool:

Whether or not they are equivalent

do_node_execute(run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None) Any

Method that is called to implement node execution. This can replace the execution of a node with something all together, augment it, or delegate it.

Parameters:
  • run_id – ID of the run, unique in scope of the driver.

  • node – Node that is being executed

  • kwargs – Keyword arguments that are being passed into the node

  • task_id – ID of the task, defaults to None if not in a task setting

do_validate_input(node_type: type, input_value: Any) bool

Method that an input value maches an expected type.

Parameters:
  • node_type – The type of the node.

  • input_value – The value that we want to validate.

Returns:

Whether or not the input value matches the expected type.

execute_node(node: Node, kwargs: Dict[str, Any]) Any

Function that is called as we walk the graph to determine how to execute a hamilton function.

Parameters:
  • node – the node from the graph.

  • kwargs – the arguments that should be passed to it.

Returns:

returns a ray object reference.

input_types() List[Type[Type]]

Gives the applicable types to this result builder. This is optional for backwards compatibility, but is recommended.

Returns:

A list of types that this can apply to.

output_type() Type

Returns the output type of this result builder :return: the type that this creates