Builder#

Use this to instantiate a driver.

class hamilton.driver.Builder#
__init__()#

Constructs a driver builder. No parameters as you call methods to set fields.

build() Driver#

Builds the driver – note that this can return a different class, so you’ll likely want to have a sense of what it returns.

Note: this defaults to a dictionary adapter if no adapter is set.

Returns:

The driver you specified.

enable_dynamic_execution(*, allow_experimental_mode: bool = False) Builder#

Enables the Parallelizable[] type, which in turn enables: 1. Grouped execution into tasks 2. Parallel execution :return: self

with_adapter(adapter: HamiltonGraphAdapter) Builder#

Sets the adapter to use.

Parameters:

adapter – Adapter to use.

Returns:

self

with_adapters(*adapters: BasePreDoAnythingHook | BaseDoCheckEdgeTypesMatch | BaseDoValidateInput | BaseValidateNode | BaseValidateGraph | BasePostGraphConstruct | BasePostGraphConstructAsync | BasePreGraphExecute | BasePreGraphExecuteAsync | BasePreTaskExecute | BasePreTaskExecuteAsync | BasePreNodeExecute | BasePreNodeExecuteAsync | BaseDoNodeExecute | BaseDoNodeExecuteAsync | BasePostNodeExecute | BasePostNodeExecuteAsync | BasePostTaskExecute | BasePostTaskExecuteAsync | BasePostGraphExecute | BasePostGraphExecuteAsync | BaseDoBuildResult) Builder#

Sets the adapter to use.

Parameters:

adapter – Adapter to use.

Returns:

self

with_config(config: Dict[str, Any]) Builder#

Adds the specified configuration to the config. This can be called multilple times – later calls will take precedence.

Parameters:

config – Config to use.

Returns:

self

with_execution_manager(execution_manager: ExecutionManager) Builder#

Sets the execution manager to use. Note that this cannot be used if local_executor or remote_executor are also set

Parameters:

execution_manager

Returns:

self

with_grouping_strategy(grouping_strategy: GroupingStrategy) Builder#

Sets a node grouper, which tells the driver how to group nodes into tasks for execution.

Parameters:

node_grouper – Node grouper to use.

Returns:

self

with_local_executor(local_executor: TaskExecutor) Builder#

Sets the execution manager to use. Note that this cannot be used if local_executor or remote_executor are also set

Parameters:

local_executor – Local executor to use

Returns:

self

with_modules(*modules: module) Builder#

Adds the specified modules to the modules list. This can be called multiple times – later calls will take precedence.

Parameters:

modules – Modules to use.

Returns:

self

with_remote_executor(remote_executor: TaskExecutor) Builder#

Sets the execution manager to use. Note that this cannot be used if local_executor or remote_executor are also set

Parameters:

remote_executor – Remote executor to use

Returns:

self

Driver#

Use this driver in a general python context. E.g. batch, jupyter notebook, etc.

class hamilton.driver.Driver(config: Dict[str, Any], *modules: module, adapter: BasePreDoAnythingHook | BaseDoCheckEdgeTypesMatch | BaseDoValidateInput | BaseValidateNode | BaseValidateGraph | BasePostGraphConstruct | BasePostGraphConstructAsync | BasePreGraphExecute | BasePreGraphExecuteAsync | BasePreTaskExecute | BasePreTaskExecuteAsync | BasePreNodeExecute | BasePreNodeExecuteAsync | BaseDoNodeExecute | BaseDoNodeExecuteAsync | BasePostNodeExecute | BasePostNodeExecuteAsync | BasePostTaskExecute | BasePostTaskExecuteAsync | BasePostGraphExecute | BasePostGraphExecuteAsync | BaseDoBuildResult | List[BasePreDoAnythingHook | BaseDoCheckEdgeTypesMatch | BaseDoValidateInput | BaseValidateNode | BaseValidateGraph | BasePostGraphConstruct | BasePostGraphConstructAsync | BasePreGraphExecute | BasePreGraphExecuteAsync | BasePreTaskExecute | BasePreTaskExecuteAsync | BasePreNodeExecute | BasePreNodeExecuteAsync | BaseDoNodeExecute | BaseDoNodeExecuteAsync | BasePostNodeExecute | BasePostNodeExecuteAsync | BasePostTaskExecute | BasePostTaskExecuteAsync | BasePostGraphExecute | BasePostGraphExecuteAsync | BaseDoBuildResult] | None = None, _graph_executor: GraphExecutor = None, _use_legacy_adapter: bool = True)#

This class orchestrates creating and executing the DAG to create a dataframe.

from hamilton import driver
from hamilton import base

# 1. Setup config or invariant input.
config = {}

# 2. we need to tell hamilton where to load function definitions from
import my_functions
# or programmatically (e.g. you can script module loading)
module_name = 'my_functions'
my_functions = importlib.import_module(module_name)

# 3. Determine the return type -- default is a pandas.DataFrame.
adapter = base.SimplePythonDataFrameGraphAdapter() # See GraphAdapter docs for more details.

# These all feed into creating the driver & thus DAG.
dr = driver.Driver(config, module, adapter=adapter)
__init__(config: Dict[str, Any], *modules: module, adapter: BasePreDoAnythingHook | BaseDoCheckEdgeTypesMatch | BaseDoValidateInput | BaseValidateNode | BaseValidateGraph | BasePostGraphConstruct | BasePostGraphConstructAsync | BasePreGraphExecute | BasePreGraphExecuteAsync | BasePreTaskExecute | BasePreTaskExecuteAsync | BasePreNodeExecute | BasePreNodeExecuteAsync | BaseDoNodeExecute | BaseDoNodeExecuteAsync | BasePostNodeExecute | BasePostNodeExecuteAsync | BasePostTaskExecute | BasePostTaskExecuteAsync | BasePostGraphExecute | BasePostGraphExecuteAsync | BaseDoBuildResult | List[BasePreDoAnythingHook | BaseDoCheckEdgeTypesMatch | BaseDoValidateInput | BaseValidateNode | BaseValidateGraph | BasePostGraphConstruct | BasePostGraphConstructAsync | BasePreGraphExecute | BasePreGraphExecuteAsync | BasePreTaskExecute | BasePreTaskExecuteAsync | BasePreNodeExecute | BasePreNodeExecuteAsync | BaseDoNodeExecute | BaseDoNodeExecuteAsync | BasePostNodeExecute | BasePostNodeExecuteAsync | BasePostTaskExecute | BasePostTaskExecuteAsync | BasePostGraphExecute | BasePostGraphExecuteAsync | BaseDoBuildResult] | None = None, _graph_executor: GraphExecutor = None, _use_legacy_adapter: bool = True)#

Constructor: creates a DAG given the configuration & modules to crawl.

Parameters:
  • config – This is a dictionary of initial data & configuration. The contents are used to help create the DAG.

  • modules – Python module objects you want to inspect for Hamilton Functions.

  • adapter – Optional. A way to wire in another way of “executing” a hamilton graph. Defaults to using original Hamilton adapter which is single threaded in memory python.

  • _graph_executor – Not public facing, do not use this parameter. This is injected by the builder. If you need to tune execution, use the builder to do so.

  • _use_legacy_adapter – Not public facing, do not use this parameter. This represents whether or not to use the legacy adapter. Defaults to True, as this should be backwards compatible. In Hamilton 2.0.0, this will be removed.

capture_constructor_telemetry(error: str | None, modules: Tuple[module], config: Dict[str, Any], adapter: LifecycleAdapterSet)#

Captures constructor telemetry. Notes: (1) we want to do this in a way that does not break. (2) we need to account for all possible states, e.g. someone passing in None, or assuming that the entire constructor code ran without issue, e.g. adapter was assigned to self.

Parameters:
  • error – the sanitized error string to send.

  • modules – the list of modules, could be None.

  • config – the config dict passed, could be None.

  • adapter – the adapter passed in, might not be attached to self yet.

capture_execute_telemetry(error: str | None, final_vars: List[str], inputs: Dict[str, Any], overrides: Dict[str, Any], run_successful: bool, duration: float)#

Captures telemetry after execute has run.

Notes: (1) we want to be quite defensive in not breaking anyone’s code with things we do here. (2) thus we want to double-check that values exist before doing something with them.

Parameters:
  • error – the sanitized error string to capture, if any.

  • final_vars – the list of final variables to get.

  • inputs – the inputs to the execute function.

  • overrides – any overrides to the execute function.

  • run_successful – whether this run was successful.

  • duration – time it took to run execute.

display_all_functions(output_file_path: str = None, render_kwargs: dict = None, graphviz_kwargs: dict = None, show_legend: bool = True, orient: str = 'LR', hide_inputs: bool = False, deduplicate_inputs: bool = False, show_schema: bool = True, custom_style_function: Callable = None) graphviz.Digraph | None#

Displays the graph of all functions loaded!

Parameters:
  • output_file_path – the full URI of path + file name to save the dot file to. E.g. ‘some/path/graph-all.dot’. Optional. No need to pass it in if you’re in a Jupyter Notebook.

  • render_kwargs – a dictionary of values we’ll pass to graphviz render function. Defaults to viewing. If you do not want to view the file, pass in {‘view’:False}. See https://graphviz.readthedocs.io/en/stable/api.html#graphviz.Graph.render for other options.

  • graphviz_kwargs – Optional. Kwargs to be passed to the graphviz graph object to configure it. E.g. dict(graph_attr={‘ratio’: ‘1’}) will set the aspect ratio to be equal of the produced image. See https://graphviz.org/doc/info/attrs.html for options.

  • show_legend – If True, add a legend to the visualization based on the DAG’s nodes.

  • orientLR stands for “left to right”. Accepted values are TB, LR, BT, RL. orient will be overwridden by the value of graphviz_kwargs[‘graph_attr’][‘rankdir’] see (https://graphviz.org/docs/attr-types/rankdir/)

  • hide_inputs – If True, no input nodes are displayed.

  • deduplicate_inputs – If True, remove duplicate input nodes. Can improve readability depending on the specifics of the DAG.

  • show_schema – If True, display the schema of the DAG if the nodes have schema data provided

  • custom_style_function – Optional. Custom style function. See example in repository for example use.

Returns:

the graphviz object if you want to do more with it. If returned as the result in a Jupyter Notebook cell, it will render.

display_downstream_of(*node_names: str, output_file_path: str = None, render_kwargs: dict = None, graphviz_kwargs: dict = None, show_legend: bool = True, orient: str = 'LR', hide_inputs: bool = False, deduplicate_inputs: bool = False, show_schema: bool = True, custom_style_function: Callable = None) graphviz.Digraph | None#

Creates a visualization of the DAG starting from the passed in function name(s).

Note: for any “node” visualized, we will also add its parents to the visualization as well, so there could be more nodes visualized than strictly what is downstream of the passed in function name(s).

Parameters:
  • node_names – names of function(s) that are starting points for traversing the graph.

  • output_file_path – the full URI of path + file name to save the dot file to. E.g. ‘some/path/graph.dot’. Optional. No need to pass it in if you’re in a Jupyter Notebook.

  • render_kwargs – a dictionary of values we’ll pass to graphviz render function. Defaults to viewing. If you do not want to view the file, pass in {‘view’:False}.

  • graphviz_kwargs – Kwargs to be passed to the graphviz graph object to configure it. E.g. dict(graph_attr={‘ratio’: ‘1’}) will set the aspect ratio to be equal of the produced image.

  • show_legend – If True, add a legend to the visualization based on the DAG’s nodes.

  • orientLR stands for “left to right”. Accepted values are TB, LR, BT, RL. orient will be overwridden by the value of graphviz_kwargs[‘graph_attr’][‘rankdir’] see (https://graphviz.org/docs/attr-types/rankdir/)

  • hide_inputs – If True, no input nodes are displayed.

  • deduplicate_inputs – If True, remove duplicate input nodes. Can improve readability depending on the specifics of the DAG.

  • show_schema – If True, display the schema of the DAG if nodes have schema data provided

  • custom_style_function – Optional. Custom style function.

Returns:

the graphviz object if you want to do more with it. If returned as the result in a Jupyter Notebook cell, it will render.

display_upstream_of(*node_names: str, output_file_path: str = None, render_kwargs: dict = None, graphviz_kwargs: dict = None, show_legend: bool = True, orient: str = 'LR', hide_inputs: bool = False, deduplicate_inputs: bool = False, show_schema: bool = True, custom_style_function: Callable = None) graphviz.Digraph | None#

Creates a visualization of the DAG going backwards from the passed in function name(s).

Note: for any “node” visualized, we will also add its parents to the visualization as well, so there could be more nodes visualized than strictly what is upstream of the passed in function name(s).

Parameters:
  • node_names – names of function(s) that are starting points for traversing the graph.

  • output_file_path – the full URI of path + file name to save the dot file to. E.g. ‘some/path/graph.dot’. Optional. No need to pass it in if you’re in a Jupyter Notebook.

  • render_kwargs – a dictionary of values we’ll pass to graphviz render function. Defaults to viewing. If you do not want to view the file, pass in {‘view’:False}. Optional.

  • graphviz_kwargs – Kwargs to be passed to the graphviz graph object to configure it. E.g. dict(graph_attr={‘ratio’: ‘1’}) will set the aspect ratio to be equal of the produced image. Optional.

  • show_legend – If True, add a legend to the visualization based on the DAG’s nodes.

  • orientLR stands for “left to right”. Accepted values are TB, LR, BT, RL. orient will be overwridden by the value of graphviz_kwargs[‘graph_attr’][‘rankdir’] see (https://graphviz.org/docs/attr-types/rankdir/)

  • hide_inputs – If True, no input nodes are displayed.

  • deduplicate_inputs – If True, remove duplicate input nodes. Can improve readability depending on the specifics of the DAG.

  • show_schema – If True, display the schema of the DAG if nodes have schema data provided

  • custom_style_function – Optional. Custom style function.

Returns:

the graphviz object if you want to do more with it. If returned as the result in a Jupyter Notebook cell, it will render.

execute(final_vars: List[str | Callable | HamiltonNode], overrides: Dict[str, Any] = None, display_graph: bool = False, inputs: Dict[str, Any] = None) Any#

Executes computation.

Parameters:
  • final_vars – the final list of outputs we want to compute.

  • overrides – values that will override “nodes” in the DAG.

  • display_graph – DEPRECATED. Whether we want to display the graph being computed.

  • inputs – Runtime inputs to the DAG.

Returns:

an object consisting of the variables requested, matching the type returned by the GraphAdapter. See constructor for how the GraphAdapter is initialized. The default one right now returns a pandas dataframe.

export_execution(final_vars: List[str], inputs: Dict[str, Any] = None, overrides: Dict[str, Any] = None) str#

Method to create JSON representation of the Graph.

Parameters:
  • final_vars – The final variables to compute.

  • inputs – Optional. The inputs to the DAG.

  • overrides – Optional. Overrides to the DAG.

Returns:

JSON string representation of the graph.

has_cycles(final_vars: List[str | Callable | HamiltonNode], _fn_graph: FunctionGraph = None) bool#

Checks that the created graph does not have cycles.

Parameters:
  • final_vars – the outputs we want to compute.

  • _fn_graph – the function graph to check for cycles, used internally

Returns:

boolean True for cycles, False for no cycles.

list_available_variables(*, tag_filter: Dict[str, str | None | List[str]] = None) List[HamiltonNode]#

Returns available variables, i.e. outputs.

These variables correspond 1:1 with nodes in the DAG, and contain the following information:

  1. name: the name of the node

  2. tags: the tags associated with this node

  3. type: The type of data this node returns

  4. is_external_input: Whether this node represents an external input (required from outside), or not (has a function specifying its behavior).

# gets all
dr.list_available_variables()
# gets exact matching tag name and tag value
dr.list_available_variables({"TAG_NAME": "TAG_VALUE"})
# gets all matching tag name and at least one of the values in the list
dr.list_available_variables({"TAG_NAME": ["TAG_VALUE1", "TAG_VALUE2"]})
# gets all with matching tag name, irrespective of value
dr.list_available_variables({"TAG_NAME": None})
# AND query between the two tags (i.e. both need to match)
dr.list_available_variables({"TAG_NAME": "TAG_VALUE", "TAG_NAME2": "TAG_VALUE2"}
Parameters:

tag_filter – A dictionary of tags to filter by. Only nodes matching the tags and their values will be returned. If the value for a tag is None, then we will return all nodes with that tag. If the value is non-empty we will return all nodes with that tag and that value.

Returns:

list of available variables (i.e. outputs).

materialize(*materializers: MaterializerFactory | ExtractorFactory, additional_vars: List[str | Callable | HamiltonNode] = None, overrides: Dict[str, Any] = None, inputs: Dict[str, Any] = None) Tuple[Any, Dict[str, Any]]#

Executes and materializes with ad-hoc materializers (to) and extractors (from_).This does the following:

  1. Creates a new graph, appending the desired materialization nodes and prepending the desired extraction nodes

  2. Runs the portion of the DAG upstream of the materialization nodes outputted, as well as any additional nodes requested (which can be empty)

  3. Returns a Tuple[Materialization metadata, additional vars result]

For instance, say you want to load data, process it, then materialize the output of a node to CSV:

from hamilton import driver, base
from hamilton.io.materialization import to
dr = driver.Driver(my_module, {})
# foo, bar are pd.Series
metadata, result = dr.materialize(
    from_.csv(
        target="input_data",
        path="./input.csv"
    ),
    to.csv(
        path="./output.csv"
        id="foo_bar_csv",
        dependencies=["foo", "bar"],
        combine=base.PandasDataFrameResult()
    ),
    additional_vars=["foo", "bar"]
)

The code above will do the following:

  1. Load the CSV at “./input.csv” and inject it into he DAG as input_data

  2. Run the nodes in the DAG on which “foo” and “bar” depend

  3. Materialize the dataframe with “foo” and “bar” as columns, saving it as a CSV file at “./output.csv”. The metadata will contain any additional relevant information, and result will be a dictionary with the keys “foo” and “bar” containing the original data.

Note that we pass in a ResultBuilder as the combine argument to to, as we may be materializing several nodes. This is not relevant in from_ as we are only loading one dataset.

additional_vars is used for debugging – E.G. if you want to both realize side-effects and return an output for inspection. If left out, it will return an empty dictionary.

You can bypass the combine keyword for to if only one output is required. In this circumstance “combining/joining” isn’t required, e.g. you do that yourself in a function and/or the output of the function can be directly used. In the case below the output can be turned in to a CSV.

from hamilton import driver, base
from hamilton.io.materialization import to
dr = driver.Driver(my_module, {})
# foo, bar are pd.Series
metadata, _ = dr.materialize(
    from_.csv(
        target="input_data",
        path="./input.csv"
    ),
    to.csv(
        path="./output.csv"
        id="foo_bar_csv",
        dependencies=["foo_bar_already_joined],
    ),
)

This will just save it to a csv.

Note that materializers can be any valid DataSaver – these have an isomorphic relationship with the @save_to decorator, which means that any key utilizable in save_to can be used in a materializer. The constructor arguments for a materializer are the same as the arguments for @save_to, with an additional trick – instead of requiring everything to be a source or value, you can pass in a literal, and it will be interpreted as a value.

That said, if you want to parameterize your materializer based on input or some node in the DAG, you can easily do that as well:

from hamilton import driver, base
from hamilton.function_modifiers import source
from hamilton.io.materialization import to
dr = driver.Driver(my_module, {})
# foo, bar are pd.Series
metadata, result = dr.Materialize(
    from_.csv(
       target="input_data",
       path=source("load_path")
    ),
    to.csv(
        path=source("save_path"),
        id="foo_bar_csv",
        dependencies=["foo", "bar"],
        combine=base.PandasDataFrameResult()
    ),
    additional_vars=["foo", "bar"],
    inputs={"save_path" : "./output.csv"}
)

While this is a contrived example, you could imagine something more powerful. Say, for instance, say you have created and registered a custom model_registry materializer that applies to an argument of your model class, and requires training_data to infer the signature. You could call it like this:

from hamilton import driver, base
from hamilton.function_modifiers import source
from hamilton.io.materialization import to
dr = driver.Driver(my_module, {})
metadata, _ = dr.Materialize(
    to.model_registry(
        training_data=source("training_data"),
        id="foo_model_registry",
        tags={"run_id" : ..., "training_date" : ..., ...},
        dependencies=["foo_model"]
    ),
)

In this case, we bypass a result builder (as there’s only one model), the single node we depend on gets saved, and we pass in the training data as an input so the materializer can infer the signature.

You could also imagine a driver that loads up a model, runs inference, then saves the result:

from hamilton import driver, base
from hamilton.function_modifiers import source
from hamilton.io.materialization import to
dr = driver.Driver(my_module, {})
metadata, _ = dr.Materialize(
    from_.model_registry(
        target="input_model",
        query_tags={"training_date" : ..., model_version: ...}, # query based on run_id, model_version
    ),
    to.csv(
        path=source("save_path"),
        id="save_inference_data",
        dependencies=["inference_data"],
    )
)

Note that the “from” extractor has an interesting property – it effectively functions as overrides. This means that it can replace nodes within a DAG, short-circuiting their behavior. Similar to passing overrides, but they are dynamically computed with the DAG, rather than statically included from the beginning.

This is customizable through a few APIs:
  1. Custom data savers ( Function modifiers)

  2. Custom result builders

  3. Custom data loaders ( Function modifiers)

If you find yourself writing these, please consider contributing back! We would love to round out the set of available materialization tools.

Parameters:
  • materializers – Materializer/extractors to use, created with to.xyz or from.xyz

  • additional_vars – Additional variables to return from the graph

  • overrides – Overrides to pass to execution

  • inputs – Inputs to pass to execution

Returns:

Tuple[Materialization metadata|data, additional_vars result]

static normalize_adapter_input(adapter: BasePreDoAnythingHook | BaseDoCheckEdgeTypesMatch | BaseDoValidateInput | BaseValidateNode | BaseValidateGraph | BasePostGraphConstruct | BasePostGraphConstructAsync | BasePreGraphExecute | BasePreGraphExecuteAsync | BasePreTaskExecute | BasePreTaskExecuteAsync | BasePreNodeExecute | BasePreNodeExecuteAsync | BaseDoNodeExecute | BaseDoNodeExecuteAsync | BasePostNodeExecute | BasePostNodeExecuteAsync | BasePostTaskExecute | BasePostTaskExecuteAsync | BasePostGraphExecute | BasePostGraphExecuteAsync | BaseDoBuildResult | List[BasePreDoAnythingHook | BaseDoCheckEdgeTypesMatch | BaseDoValidateInput | BaseValidateNode | BaseValidateGraph | BasePostGraphConstruct | BasePostGraphConstructAsync | BasePreGraphExecute | BasePreGraphExecuteAsync | BasePreTaskExecute | BasePreTaskExecuteAsync | BasePreNodeExecute | BasePreNodeExecuteAsync | BaseDoNodeExecute | BaseDoNodeExecuteAsync | BasePostNodeExecute | BasePostNodeExecuteAsync | BasePostTaskExecute | BasePostTaskExecuteAsync | BasePostGraphExecute | BasePostGraphExecuteAsync | BaseDoBuildResult] | LifecycleAdapterSet | None, use_legacy_adapter: bool = True) LifecycleAdapterSet#

Normalizes the adapter argument in the driver to a list of adapters. Adds back the legacy adapter if needed.

Note that, in the past, hamilton required a graph adapter. Now it is only required to be included in the legacy case default behavior has been modified to handle anything a result builder did.

Parameters:
  • adapter – Adapter to include

  • use_legacy_adapter – Whether to use the legacy adapter. Defaults to True.

Returns:

A lifecycle adapter set.

raw_execute(final_vars: List[str], overrides: Dict[str, Any] = None, display_graph: bool = False, inputs: Dict[str, Any] = None, _fn_graph: FunctionGraph = None) Dict[str, Any]#

Raw execute function that does the meat of execute.

Don’t use this entry point for execution directly. Always go through .execute(). In case you are using .raw_execute() directly, please switch to .execute() using a base.DictResult(). Note: base.DictResult() is the default return of execute if you are using the driver.Builder() class to create a Driver() object.

Parameters:
  • final_vars – Final variables to compute

  • overrides – Overrides to run.

  • display_graph – DEPRECATED. DO NOT USE. Whether or not to display the graph when running it

  • inputs – Runtime inputs to the DAG

Returns:

validate_execution(final_vars: List[str | Callable | HamiltonNode], overrides: Dict[str, Any] = None, inputs: Dict[str, Any] = None)#

Validates execution of the graph. One can call this to validate execution, independently of actually executing. Note this has no return – it will raise a ValueError if there is an issue.

Parameters:
  • final_vars – Final variables to compute

  • overrides – Overrides to pass to execution.

  • inputs – Inputs to pass to execution.

Raises:

ValueError – if any issues with executino can be detected.

static validate_inputs(fn_graph: FunctionGraph, adapter: BasePreDoAnythingHook | BaseDoCheckEdgeTypesMatch | BaseDoValidateInput | BaseValidateNode | BaseValidateGraph | BasePostGraphConstruct | BasePostGraphConstructAsync | BasePreGraphExecute | BasePreGraphExecuteAsync | BasePreTaskExecute | BasePreTaskExecuteAsync | BasePreNodeExecute | BasePreNodeExecuteAsync | BaseDoNodeExecute | BaseDoNodeExecuteAsync | BasePostNodeExecute | BasePostNodeExecuteAsync | BasePostTaskExecute | BasePostTaskExecuteAsync | BasePostGraphExecute | BasePostGraphExecuteAsync | BaseDoBuildResult | List[BasePreDoAnythingHook | BaseDoCheckEdgeTypesMatch | BaseDoValidateInput | BaseValidateNode | BaseValidateGraph | BasePostGraphConstruct | BasePostGraphConstructAsync | BasePreGraphExecute | BasePreGraphExecuteAsync | BasePreTaskExecute | BasePreTaskExecuteAsync | BasePreNodeExecute | BasePreNodeExecuteAsync | BaseDoNodeExecute | BaseDoNodeExecuteAsync | BasePostNodeExecute | BasePostNodeExecuteAsync | BasePostTaskExecute | BasePostTaskExecuteAsync | BasePostGraphExecute | BasePostGraphExecuteAsync | BaseDoBuildResult] | LifecycleAdapterSet, user_nodes: Collection[Node], inputs: Dict[str, Any] | None = None, nodes_set: Collection[Node] = None)#

Validates that inputs meet our expectations. This means that: 1. The runtime inputs don’t clash with the graph’s config 2. All expected graph inputs are provided, either in config or at runtime

Parameters:
  • fn_graph – The function graph to validate.

  • adapter – The adapter to use for validation.

  • user_nodes – The required nodes we need for computation.

  • inputs – the user inputs provided.

  • nodes_set – the set of nodes to use for validation; Optional.

validate_materialization(*materializers: MaterializerFactory, additional_vars: List[str | Callable | HamiltonNode] = None, overrides: Dict[str, Any] = None, inputs: Dict[str, Any] = None)#

Validates materialization of the graph. Effectively .materialize() with a dry-run. Note this has no return – it will raise a ValueError if there is an issue.

Parameters:
  • materializers – Materializers to use, see the materialize() function

  • additional_vars – Additional variables to compute (in addition to materializers)

  • overrides – Overrides to pass to execution. Optional.

  • inputs – Inputs to pass to execution. Optional.

Raises:

ValueError – if any issues with materialization can be detected.

visualize_execution(final_vars: List[str | Callable | HamiltonNode], output_file_path: str = None, render_kwargs: dict = None, inputs: Dict[str, Any] = None, graphviz_kwargs: dict = None, overrides: Dict[str, Any] = None, show_legend: bool = True, orient: str = 'LR', hide_inputs: bool = False, deduplicate_inputs: bool = False, show_schema: bool = True, custom_style_function: Callable = None, bypass_validation: bool = False) graphviz.Digraph | None#

Visualizes Execution.

Note: overrides are not handled at this time.

Shapes:

  • ovals are nodes/functions

  • rectangles are nodes/functions that are requested as output

  • shapes with dotted lines are inputs required to run the DAG.

Parameters:
  • final_vars – the outputs we want to compute. They will become rectangles in the graph.

  • output_file_path – the full URI of path + file name to save the dot file to. E.g. ‘some/path/graph.dot’. Optional. No need to pass it in if you’re in a Jupyter Notebook.

  • render_kwargs – a dictionary of values we’ll pass to graphviz render function. Defaults to viewing. If you do not want to view the file, pass in {‘view’:False}. See https://graphviz.readthedocs.io/en/stable/api.html#graphviz.Graph.render for other options.

  • inputs – Optional. Runtime inputs to the DAG.

  • graphviz_kwargs – Optional. Kwargs to be passed to the graphviz graph object to configure it. E.g. dict(graph_attr={‘ratio’: ‘1’}) will set the aspect ratio to be equal of the produced image. See https://graphviz.org/doc/info/attrs.html for options.

  • overrides – Optional. Overrides to the DAG.

  • show_legend – If True, add a legend to the visualization based on the DAG’s nodes.

  • orientLR stands for “left to right”. Accepted values are TB, LR, BT, RL. orient will be overwridden by the value of graphviz_kwargs[‘graph_attr’][‘rankdir’] see (https://graphviz.org/docs/attr-types/rankdir/)

  • hide_inputs – If True, no input nodes are displayed.

  • deduplicate_inputs – If True, remove duplicate input nodes. Can improve readability depending on the specifics of the DAG.

  • show_schema – If True, display the schema of the DAG if nodes have schema data provided

  • custom_style_function – Optional. Custom style function.

Returns:

the graphviz object if you want to do more with it. If returned as the result in a Jupyter Notebook cell, it will render.

visualize_materialization(*materializers: MaterializerFactory | ExtractorFactory, output_file_path: str = None, render_kwargs: dict = None, additional_vars: List[str | Callable | HamiltonNode] = None, inputs: Dict[str, Any] = None, graphviz_kwargs: dict = None, overrides: Dict[str, Any] = None, show_legend: bool = True, orient: str = 'LR', hide_inputs: bool = False, deduplicate_inputs: bool = False, show_schema: bool = True, custom_style_function: Callable = None, bypass_validation: bool = False) graphviz.Digraph | None#

Visualizes materialization. This helps give you a sense of how materialization will impact the DAG.

Parameters:
  • materializers – Materializers/Extractors to use, see the materialize() function

  • additional_vars – Additional variables to compute (in addition to materializers)

  • output_file_path – Path to output file. Optional. Skip if in a Jupyter Notebook.

  • render_kwargs – Arguments to pass to render. Optional.

  • inputs – Inputs to pass to execution. Optional.

  • graphviz_kwargs – Arguments to pass to graphviz. Optional.

  • overrides – Overrides to pass to execution. Optional.

  • show_legend – If True, add a legend to the visualization based on the DAG’s nodes.

  • orientLR stands for “left to right”. Accepted values are TB, LR, BT, RL. orient will be overwridden by the value of graphviz_kwargs[‘graph_attr’][‘rankdir’] see (https://graphviz.org/docs/attr-types/rankdir/)

  • hide_inputs – If True, no input nodes are displayed.

  • deduplicate_inputs – If True, remove duplicate input nodes. Can improve readability depending on the specifics of the DAG.

  • show_schema – If True, show the schema of the materialized nodes if nodes have schema metadata attached.

  • custom_style_function – Optional. Custom style function.

  • bypass_validation – If True, bypass validation. Optional.

Returns:

The graphviz graph, if you want to do something with it

visualize_path_between(upstream_node_name: str, downstream_node_name: str, output_file_path: str | None = None, render_kwargs: dict = None, graphviz_kwargs: dict = None, strict_path_visualization: bool = False, show_legend: bool = True, orient: str = 'LR', hide_inputs: bool = False, deduplicate_inputs: bool = False, show_schema: bool = True, custom_style_function: Callable = None) graphviz.Digraph | None#

Visualizes the path between two nodes.

This is useful for debugging and understanding the path between two nodes.

Parameters:
  • upstream_node_name – the name of the node that we want to start from.

  • downstream_node_name – the name of the node that we want to end at.

  • output_file_path – the full URI of path + file name to save the dot file to. E.g. ‘some/path/graph.dot’. Pass in None to skip saving any file.

  • render_kwargs – a dictionary of values we’ll pass to graphviz render function. Defaults to viewing. If you do not want to view the file, pass in {‘view’:False}.

  • graphviz_kwargs – Kwargs to be passed to the graphviz graph object to configure it. E.g. dict(graph_attr={‘ratio’: ‘1’}) will set the aspect ratio to be equal of the produced image.

  • strict_path_visualization – If True, only the nodes in the path will be visualized. If False, the nodes in the path and their dependencies, i.e. parents, will be visualized.

  • show_legend – If True, add a legend to the visualization based on the DAG’s nodes.

  • orientLR stands for “left to right”. Accepted values are TB, LR, BT, RL. orient will be overwridden by the value of graphviz_kwargs[‘graph_attr’][‘rankdir’] see (https://graphviz.org/docs/attr-types/rankdir/)

  • hide_inputs – If True, no input nodes are displayed.

  • deduplicate_inputs – If True, remove duplicate input nodes. Can improve readability depending on the specifics of the DAG.

  • show_schema – If True, display the schema of the DAG if nodes have schema data provided

  • custom_style_function – Optional. Custom style function.

Returns:

graphviz object.

Raises:

ValueError – if the upstream or downstream node names are not found in the graph, or there is no path between them.

what_is_downstream_of(*node_names: str) List[HamiltonNode]#

Tells you what is downstream of this function(s), i.e. node(s).

Parameters:

node_names – names of function(s) that are starting points for traversing the graph.

Returns:

list of “variables” (i.e. nodes), inclusive of the function names, that are downstream of the passed in function names.

what_is_the_path_between(upstream_node_name: str, downstream_node_name: str) List[HamiltonNode]#

Tells you what nodes are on the path between two nodes.

Note: this is inclusive of the two nodes, and returns an unsorted list of nodes.

Parameters:
  • upstream_node_name – the name of the node that we want to start from.

  • downstream_node_name – the name of the node that we want to end at.

Returns:

Nodes representing the path between the two nodes, inclusive of the two nodes, unsorted. Returns empty list if no path exists.

Raises:

ValueError – if the upstream or downstream node name is not in the graph.

what_is_upstream_of(*node_names: str) List[HamiltonNode]#

Tells you what is upstream of this function(s), i.e. node(s).

Parameters:

node_names – names of function(s) that are starting points for traversing the graph backwards.

Returns:

list of “variables” (i.e. nodes), inclusive of the function names, that are upstream of the passed in function names.

DefaultGraphExecutor#

This is the default graph executor. It can handle limited parallelism through graph adapters, and conducts execution using a simple recursive depth first traversal. Note this cannot handle parallelism with Parallelizable[]/Collect[]. Note that this is only exposed through the Builder (and it comes default on Driver instantiation) – it is here purely for documentation, and you should never need to instantiate it directly.

class hamilton.driver.DefaultGraphExecutor(adapter: LifecycleAdapterSet | None = None)#
__init__(adapter: LifecycleAdapterSet | None = None)#

Constructor for the default graph executor.

Parameters:

adapter – Adapter to use for execution (optional).

execute(fg: FunctionGraph, final_vars: List[str], overrides: Dict[str, Any], inputs: Dict[str, Any], run_id: str) Dict[str, Any]#

Basic executor for a function graph. Does no task-based execution, just does a DFS and executes the graph in order, in memory.

validate(nodes_to_execute: List[Node])#

The default graph executor cannot handle parallelizable[]/collect[] nodes.

Parameters:

nodes_to_execute

Raises:

InvalidExecutorException – if the graph contains parallelizable[]/collect[] nodes.

TaskBasedGraphExecutor#

This is a task based graph executor. It can handle parallelism with the Parallelizable/Collect constructs, allowing it to spawn dynamic tasks and execute them as a group. Note that this is only exposed through the Builder when called with enable_dynamic_execution(allow_experimental_mode: bool) – it is here purely for documentation, and you should never need to instantiate it directly.

class hamilton.driver.TaskBasedGraphExecutor(execution_manager: ExecutionManager, grouping_strategy: GroupingStrategy, adapter: LifecycleAdapterSet)#
__init__(execution_manager: ExecutionManager, grouping_strategy: GroupingStrategy, adapter: LifecycleAdapterSet)#

Executor for task-based execution. This enables grouping of nodes into tasks, as well as parallel execution/dynamic spawning of nodes.

Parameters:
  • execution_manager – Utility to assign task executors to node groups

  • grouping_strategy – Utility to group nodes into tasks

  • result_builder – Utility to build the final result

execute(fg: FunctionGraph, final_vars: List[str], overrides: Dict[str, Any], inputs: Dict[str, Any], run_id: str) Dict[str, Any]#

Executes a graph, task by task. This blocks until completion.

This does the following: 1. Groups the nodes into tasks 2. Creates an execution state and a results cache 3. Runs it to completion, populating the results cache 4. Returning the results from the results cache

validate(nodes_to_execute: List[Node])#

Currently this can run every valid graph