h_ray.RayWorkflowGraphAdapter#
A Graph Adapter for delegating the execution of hamilton nodes to Ray.
- class hamilton.plugins.h_ray.RayWorkflowGraphAdapter(result_builder: ResultMixin, workflow_id: str)#
Class representing what’s required to make Hamilton run Ray Workflows
Use pip install sf-hamilton[ray] to get the dependencies required to run this.
Ray workflows is a more robust way to scale computation for any type of Hamilton graph.
What’s the difference between this and RayGraphAdapter?#
Ray workflows offer durable computation. That is, they save and checkpoint each function.
This enables one to run a workflow, and not have to restart it if something fails, assuming correct Ray workflow usage.
Tips#
See https://docs.ray.io/en/latest/workflows/basics.html for the source of the following:
Functions should be idempotent.
The workflow ID is what Ray uses to try to resume/restart if run a second time.
Nothing is run until the entire DAG is walked and setup and build_result is called.
Notes on scaling:#
Multi-core on single machine ✅
Distributed computation on a Ray cluster ✅
Scales to any size of data ⛔️; you are LIMITED by the memory on the instance/computer 💻.
Function return object types supported:#
Works for any python object that can be serialized by the Ray framework. ✅
Pandas?#
⛔️ Ray DOES NOT do anything special about Pandas.
CAVEATS#
Serialization costs can outweigh the benefits of parallelism, so you should benchmark your code to see if it’s worth it.
DISCLAIMER – this class is experimental, so signature changes are a possibility!
- __init__(result_builder: ResultMixin, workflow_id: str)#
Constructor
- Parameters:
result_builder – Required. An implementation of base.ResultMixin.
workflow_id – Required. An ID to give the ray workflow to identify it for durability purposes.
max_retries – Optional. The function will be retried for the given number of times if an exception is raised.
- build_result(**outputs: Dict[str, Any]) Any #
Builds the result and brings it back to this running process.
- Parameters:
outputs – the dictionary of key -> Union[ray object reference | value]
- Returns:
The type of object returned by self.result_builder.
- static check_input_type(node_type: Type, input_value: Any) bool #
Used to check whether the user inputs match what the execution strategy & functions can handle.
- Parameters:
node_type – The type of the node.
input_value – An actual value that we want to inspect matches our expectation.
- Returns:
- static check_node_type_equivalence(node_type: Type, input_type: Type) bool #
Used to check whether two types are equivalent.
This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.
- Parameters:
node_type – The type of the node.
input_type – The type of the input that would flow into the node.
- Returns:
- execute_node(node: Node, kwargs: Dict[str, Any]) Any #
Given a node that represents a hamilton function, execute it. Note, in some adapters this might just return some type of “future”.
- Parameters:
node – the Hamilton Node
kwargs – the kwargs required to exercise the node function.
- Returns:
the result of exercising the node.
- input_types() List[Type[Type]] #
Gives the applicable types to this result builder. This is optional for backwards compatibility, but is recommended.
- Returns:
A list of types that this can apply to.
- output_type() Type #
Returns the output type of this result builder :return: the type that this creates