h_ray.RayWorkflowGraphAdapter#

A Graph Adapter for delegating the execution of hamilton nodes to Ray.

class hamilton.plugins.h_ray.RayWorkflowGraphAdapter(result_builder: ResultMixin, workflow_id: str)#

Class representing what’s required to make Hamilton run Ray Workflows

Use pip install sf-hamilton[ray] to get the dependencies required to run this.

Ray workflows is a more robust way to scale computation for any type of Hamilton graph.

What’s the difference between this and RayGraphAdapter?#

  • Ray workflows offer durable computation. That is, they save and checkpoint each function.

  • This enables one to run a workflow, and not have to restart it if something fails, assuming correct Ray workflow usage.

Tips#

See https://docs.ray.io/en/latest/workflows/basics.html for the source of the following:

  1. Functions should be idempotent.

  2. The workflow ID is what Ray uses to try to resume/restart if run a second time.

  3. Nothing is run until the entire DAG is walked and setup and build_result is called.

Notes on scaling:#

  • Multi-core on single machine ✅

  • Distributed computation on a Ray cluster ✅

  • Scales to any size of data ⛔️; you are LIMITED by the memory on the instance/computer 💻.

Function return object types supported:#

  • Works for any python object that can be serialized by the Ray framework. ✅

Pandas?#

  • ⛔️ Ray DOES NOT do anything special about Pandas.

CAVEATS#

  • Serialization costs can outweigh the benefits of parallelism, so you should benchmark your code to see if it’s worth it.

DISCLAIMER – this class is experimental, so signature changes are a possibility!

__init__(result_builder: ResultMixin, workflow_id: str)#

Constructor

Parameters:
  • result_builder – Required. An implementation of base.ResultMixin.

  • workflow_id – Required. An ID to give the ray workflow to identify it for durability purposes.

  • max_retries – Optional. The function will be retried for the given number of times if an exception is raised.

build_result(**outputs: Dict[str, Any]) Any#

Builds the result and brings it back to this running process.

Parameters:

outputs – the dictionary of key -> Union[ray object reference | value]

Returns:

The type of object returned by self.result_builder.

static check_input_type(node_type: Type, input_value: Any) bool#

Used to check whether the user inputs match what the execution strategy & functions can handle.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

static check_node_type_equivalence(node_type: Type, input_type: Type) bool#

Used to check whether two types are equivalent.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

execute_node(node: Node, kwargs: Dict[str, Any]) Any#

Given a node that represents a hamilton function, execute it. Note, in some adapters this might just return some type of “future”.

Parameters:
  • node – the Hamilton Node

  • kwargs – the kwargs required to exercise the node function.

Returns:

the result of exercising the node.

input_types() List[Type[Type]]#

Gives the applicable types to this result builder. This is optional for backwards compatibility, but is recommended.

Returns:

A list of types that this can apply to.

output_type() Type#

Returns the output type of this result builder :return: the type that this creates