This is an experimental GraphAdapter; there is a possibility of their API changing. That said, the code is stable, and you should feel comfortable giving the code for a spin - let us know how it goes, and what the rough edges are if you find any. We’d love feedback if you are using these to know how to improve them or graduate them.
- class hamilton.experimental.h_cache.CachingGraphAdapter(cache_path: str, *args, force_compute: Set[str] | None = None, writers: Dict[str, Callable[[Any, str, str], None]] | None = None, readers: Dict[str, Callable[[Any, str], Any]] | None = None, **kwargs)#
Any node with tag “cache” will be cached (or loaded from cache) in the format defined by the tag’s value. There are a handful of formats supported, and other formats’ readers and writers can be provided to the constructor.
- Values are loaded from cache if the node’s file exists, unless one of these is true:
node is explicitly forced to be computed with a constructor argument,
any of its (potentially transitive) dependencies that are configured to be cached was nevertheless computed (either forced or missing cached file).
One can provide custom readers and writers for any format by passing them to the constructor. These readers and writers will override the default ones. If you don’t want to override, but rather extend the default ones, you can do so by registering them with the register method on the appropriate function.
Writer functions need to have the following signature: def write_<format>(data: Any, filepath: str, name: str) -> None: … where data is the data to be written, filepath is the path to the file to be written to, and name is the name of the node that is being written.
Reader functions need to have the following signature: def read_<format>(data: Any, filepath: str) -> Any: … where data is an EMPTY OBJECT of the type you wish to instantiate, and filepath is the path to the file to be read from.
For example, if you want to extend JSON reader/writer to work with your custom type T, you can do the following:
@write_json.register(T) def write_json_pd1(data: T, filepath: str, name: str) -> None: ... @read_json.register(T) def read_json_dict(data: T, filepath: str) -> T: ...
This is a simple example of the usage of CachingGraphAdapter.
First, let’s define some nodes in nodes.py:
import pandas as pd from hamilton.function_modifiers import tag def data_a() -> pd.DataFrame: ... @tag(cache="parquet") def data_b() -> pd.DataFrame: ... def transformed(data_a: pd.DataFrame, data_b: pd.DataFrame) -> pd.DataFrame: ...
Notice that data_b is configured to be cached in a parquet file.
We then simply initialize the driver with a caching adapter:
from hamilton import base from hamilton.driver import Driver from hamilton.experimental import h_cache import nodes adapter = h_cache.CachingGraphAdapter(cache_path, base.PandasDataFrameResult()) dr = Driver(config, nodes, adapter=adapter) result = dr.execute(["transformed"]) # Because `data_b` has been cached now, only `data_a` and `transformed` nodes # will actually run. result = dr.execute(["transformed"])
- __init__(cache_path: str, *args, force_compute: Set[str] | None = None, writers: Dict[str, Callable[[Any, str, str], None]] | None = None, readers: Dict[str, Callable[[Any, str], Any]] | None = None, **kwargs)#
Constructs the adapter.
cache_path – Path to the directory where cached files are stored.
force_compute – Set of nodes that should be forced to compute even if cache exists.
writers – A dictionary of writers for custom formats.
readers – A dictionary of readers for custom formats.
- static build_dataframe_with_dataframes(outputs: Dict[str, Any]) DataFrame #
Builds a dataframe from the outputs in an “outer join” manner based on index.
The behavior of pd.Dataframe(outputs) is that it will do an outer join based on indexes of the Series passed in. To handle dataframes, we unpack the dataframe into a dict of series, check to ensure that no columns are redefined in a rolling fashion going in order of the outputs requested. This then results in an “enlarged” outputs dict that is then passed to pd.Dataframe(outputs) to get the final dataframe.
outputs – The outputs to build the dataframe from.
A dataframe with the outputs.
- build_result(**outputs: Dict[str, Any]) Any #
Clears the computed nodes information and delegates to the super class.
- static check_input_type(node_type: Type, input_value: Any) bool #
Used to check whether the user inputs match what the execution strategy & functions can handle.
node_type – The type of the node.
input_value – An actual value that we want to inspect matches our expectation.
- static check_node_type_equivalence(node_type: Type, input_type: Type) bool #
Used to check whether two types are equivalent.
This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.
node_type – The type of the node.
input_type – The type of the input that would flow into the node.
- static check_pandas_index_types_match(all_index_types: Dict[str, List[str]], time_indexes: Dict[str, List[str]], no_indexes: Dict[str, List[str]]) bool #
Checks that pandas index types match.
This only logs warning errors, and if debug is enabled, a debug statement to list index types.
- execute_node(node: Node, kwargs: Dict[str, Any]) Any #
Executes nodes conditionally according to caching rules.
This node is executed if at least one of these is true:
no cache is present,
it is explicitly forced by passing it to the adapter in
at least one of its upstream nodes that had a @cache annotation was computed, either due to lack of cache or being explicitly forced.
- input_types() List[Type[Type]] #
Currently this just shoves anything into a dataframe. We should probably tighten this up.
- output_type() Type #
Returns the output type of this result builder :return: the type that this creates
- static pandas_index_types(outputs: Dict[str, Any]) Tuple[Dict[str, List[str]], Dict[str, List[str]], Dict[str, List[str]]] #
This function creates three dictionaries according to whether there is an index type or not.
The three dicts we create are: 1. Dict of index type to list of outputs that match it. 2. Dict of time series / categorical index types to list of outputs that match it. 3. Dict of no-index key to list of outputs with no index type.
outputs – the dict we’re trying to create a result from.
dict of all index types, dict of time series/categorical index types, dict if there is no index