CachingGraphAdapter

This is an experimental GraphAdapter; there is a possibility of their API changing. That said, the code is stable, and you should feel comfortable giving the code for a spin - let us know how it goes, and what the rough edges are if you find any. We’d love feedback if you are using these to know how to improve them or graduate them.

class hamilton.experimental.h_cache.CachingGraphAdapter(cache_path: str, *args, force_compute: Set[str] | None = None, writers: Dict[str, Callable[[Any, str, str], None]] | None = None, readers: Dict[str, Callable[[Any, str], Any]] | None = None, **kwargs)

Caching adapter.

Any node with tag “cache” will be cached (or loaded from cache) in the format defined by the tag’s value. There are a handful of formats supported, and other formats’ readers and writers can be provided to the constructor.

Values are loaded from cache if the node’s file exists, unless one of these is true:
  • node is explicitly forced to be computed with a constructor argument,

  • any of its (potentially transitive) dependencies that are configured to be cached was nevertheless computed (either forced or missing cached file).

Custom Serializers

One can provide custom readers and writers for any format by passing them to the constructor. These readers and writers will override the default ones. If you don’t want to override, but rather extend the default ones, you can do so by registering them with the register method on the appropriate function.

Writer functions need to have the following signature: def write_<format>(data: Any, filepath: str, name: str) -> None: … where data is the data to be written, filepath is the path to the file to be written to, and name is the name of the node that is being written.

Reader functions need to have the following signature: def read_<format>(data: Any, filepath: str) -> Any: … where data is an EMPTY OBJECT of the type you wish to instantiate, and filepath is the path to the file to be read from.

For example, if you want to extend JSON reader/writer to work with your custom type T, you can do the following:

@write_json.register(T)
def write_json_pd1(data: T, filepath: str, name: str) -> None:
    ...

@read_json.register(T)
def read_json_dict(data: T, filepath: str) -> T:
    ...

Usage

This is a simple example of the usage of CachingGraphAdapter.

First, let’s define some nodes in nodes.py:

import pandas as pd
from hamilton.function_modifiers import tag

def data_a() -> pd.DataFrame:
    ...

@tag(cache="parquet")
def data_b() -> pd.DataFrame:
    ...

def transformed(data_a: pd.DataFrame, data_b: pd.DataFrame) -> pd.DataFrame:
    ...

Notice that data_b is configured to be cached in a parquet file.

We then simply initialize the driver with a caching adapter:

from hamilton import base
from hamilton.driver import Driver
from hamilton.experimental import h_cache

import nodes

adapter = h_cache.CachingGraphAdapter(cache_path, base.PandasDataFrameResult())
dr = Driver(config, nodes, adapter=adapter)
result = dr.execute(["transformed"])

# Because `data_b` has been cached now, only `data_a` and `transformed` nodes
# will actually run.
result = dr.execute(["transformed"])
__init__(cache_path: str, *args, force_compute: Set[str] | None = None, writers: Dict[str, Callable[[Any, str, str], None]] | None = None, readers: Dict[str, Callable[[Any, str], Any]] | None = None, **kwargs)

Constructs the adapter.

Parameters:
  • cache_path – Path to the directory where cached files are stored.

  • force_compute – Set of nodes that should be forced to compute even if cache exists.

  • writers – A dictionary of writers for custom formats.

  • readers – A dictionary of readers for custom formats.

static build_dataframe_with_dataframes(outputs: Dict[str, Any]) DataFrame

Builds a dataframe from the outputs in an “outer join” manner based on index.

The behavior of pd.Dataframe(outputs) is that it will do an outer join based on indexes of the Series passed in. To handle dataframes, we unpack the dataframe into a dict of series, check to ensure that no columns are redefined in a rolling fashion going in order of the outputs requested. This then results in an “enlarged” outputs dict that is then passed to pd.Dataframe(outputs) to get the final dataframe.

Parameters:

outputs – The outputs to build the dataframe from.

Returns:

A dataframe with the outputs.

build_result(**outputs: Dict[str, Any]) Any

Clears the computed nodes information and delegates to the super class.

static check_input_type(node_type: Type, input_value: Any) bool

Used to check whether the user inputs match what the execution strategy & functions can handle.

Static purely for legacy reasons.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

True if the input is valid, False otherwise.

static check_node_type_equivalence(node_type: Type, input_type: Type) bool

Used to check whether two types are equivalent.

Static, purely for legacy reasons.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

True if the types are equivalent, False otherwise.

static check_pandas_index_types_match(all_index_types: Dict[str, List[str]], time_indexes: Dict[str, List[str]], no_indexes: Dict[str, List[str]]) bool

Checks that pandas index types match.

This only logs warning errors, and if debug is enabled, a debug statement to list index types.

do_build_result(outputs: Dict[str, Any]) Any

Implements the do_build_result method from the BaseDoBuildResult class. This is kept from the user as the public-facing API is build_result, allowing us to change the API/implementation of the internal set of hooks

do_check_edge_types_match(type_from: type, type_to: type) bool

Method that checks whether two types are equivalent. This is used when the function graph is being created.

Parameters:
  • type_from – The type of the node that is the source of the edge.

  • type_to – The type of the node that is the destination of the edge.

Return bool:

Whether or not they are equivalent

do_node_execute(run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None) Any

Method that is called to implement node execution. This can replace the execution of a node with something all together, augment it, or delegate it.

Parameters:
  • run_id – ID of the run, unique in scope of the driver.

  • node – Node that is being executed

  • kwargs – Keyword arguments that are being passed into the node

  • task_id – ID of the task, defaults to None if not in a task setting

do_validate_input(node_type: type, input_value: Any) bool

Method that an input value maches an expected type.

Parameters:
  • node_type – The type of the node.

  • input_value – The value that we want to validate.

Returns:

Whether or not the input value matches the expected type.

execute_node(node: Node, kwargs: Dict[str, Any]) Any

Executes nodes conditionally according to caching rules.

This node is executed if at least one of these is true:

  • no cache is present,

  • it is explicitly forced by passing it to the adapter in force_compute,

  • at least one of its upstream nodes that had a @cache annotation was computed, either due to lack of cache or being explicitly forced.

input_types() List[Type[Type]]

Currently this just shoves anything into a dataframe. We should probably tighten this up.

output_type() Type

Returns the output type of this result builder :return: the type that this creates

static pandas_index_types(outputs: Dict[str, Any]) Tuple[Dict[str, List[str]], Dict[str, List[str]], Dict[str, List[str]]]

This function creates three dictionaries according to whether there is an index type or not.

The three dicts we create are: 1. Dict of index type to list of outputs that match it. 2. Dict of time series / categorical index types to list of outputs that match it. 3. Dict of no-index key to list of outputs with no index type.

Parameters:

outputs – the dict we’re trying to create a result from.

Returns:

dict of all index types, dict of time series/categorical index types, dict if there is no index