Caching logic¶

Caching Behavior¶

class hamilton.caching.adapter.CachingBehavior(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶

Behavior applied by the caching adapter

DEFAULT:

Try to retrieve result from cache instead of executing the node. If the node is executed, store the result. Compute the result data version and store it too.

RECOMPUTE:

Don’t try to retrieve result from cache and always execute the node. Otherwise, behaves as default. Useful when nodes are stochastic (e.g., model training) or interact with external components (e.g., read from database).

DISABLE:

Node is executed as if the caching feature wasn’t enabled. It never tries to retrieve results. Results are never stored nor versioned. Behaves like IGNORE, but the node remains a dependency for downstream nodes. This means downstream cache lookup will likely fail systematically (i.e., if the cache is empty).

IGNORE:

Node is executed as if the caching feature wasn’t enable. It never tries to retrieve results. Results are never stored nor versioned. IGNORE means downstream nodes will ignore this node as a dependency for lookup. Ignoring clients and connections can be useful since they shouldn’t directly impact the downstream results.

classmethod from_string(string: str) CachingBehavior¶

Create a caching behavior from a string of the enum value. This is leveraged by the hamilton.lifecycle.caching.SmartCacheAdapter and the hamilton.function_modifiers.metadata.cache decorator.

CachingBehavior.from_string("recompute")

@cache decorator¶

class hamilton.function_modifiers.metadata.cache(*, behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, format: Literal['json', 'file', 'pickle', 'parquet', 'csv', 'feather', 'orc', 'excel'] | str | None = None, target_: str | Collection[str] | None | ellipsis = Ellipsis)¶
BEHAVIOR_KEY = 'cache.behavior'¶
FORMAT_KEY = 'cache.format'¶
__init__(*, behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, format: Literal['json', 'file', 'pickle', 'parquet', 'csv', 'feather', 'orc', 'excel'] | str | None = None, target_: str | Collection[str] | None | ellipsis = Ellipsis)¶

The @cache decorator can define the behavior and format of a specific node.

This feature is implemented via tags, but that could change. Thus you should not rely on these tags for other purposes.

@cache(behavior="recompute", format="parquet")
def raw_data() -> pd.DataFrame: ...

If the function uses other function modifiers and define multiple nodes, you can set target_ to specify which nodes to cache. The following only caches the performance node.

@cache(format="json", target_="performance")
@extract_fields(trained_model=LinearRegression, performance: dict)
def model_training() -> dict:
    # ...
    performance = {"rmse": 0.1, "mae": 0.2}
    return {"trained_model": trained_model, "performance": performance}
Parameters:
  • behavior – The behavior of the cache. This can be one of the following: * default: caching is enabled * recompute: always compute the node instead of retrieving * ignore: the data version won’t be part of downstream keys * disable: act as if caching wasn’t enabled.

  • format – The format of the cache. This can be one of the following: * json: JSON format * file: file format * pickle: pickle format * parquet: parquet format * csv: csv format * feather: feather format * orc: orc format * excel: excel format

  • target_ – Target nodes to decorate. This can be one of the following: * None: tag all nodes outputted by this that are “final” (E.g. do not have a node outputted by this that depend on them) * Ellipsis (…): tag all nodes outputted by this * Collection[str]: tag only the nodes with the specified names * str: tag only the node with the specified name

decorate_node(node_: Node) Node¶

Decorates the nodes with the cache tags.

Parameters:

node – Node to decorate

Returns:

Copy of the node, with tags assigned

Logging¶

class hamilton.caching.adapter.CachingEvent(run_id: str, actor: ~typing.Literal['adapter', 'metadata_store', 'result_store'], event_type: ~hamilton.caching.adapter.CachingEventType, node_name: str, task_id: str | None = None, msg: str | None = None, value: ~typing.Any | None = None, timestamp: float = <factory>)¶

Event logged by the caching adapter

__init__(run_id: str, actor: ~typing.Literal['adapter', 'metadata_store', 'result_store'], event_type: ~hamilton.caching.adapter.CachingEventType, node_name: str, task_id: str | None = None, msg: str | None = None, value: ~typing.Any | None = None, timestamp: float = <factory>) None¶
class hamilton.caching.adapter.CachingEventType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶

Event types logged by the caching adapter

Adapter¶

class hamilton.caching.adapter.HamiltonCacheAdapter(path: str | Path = '.hamilton_cache', metadata_store: MetadataStore | None = None, result_store: ResultStore | None = None, default: Literal[True] | Collection[str] | None = None, recompute: Literal[True] | Collection[str] | None = None, ignore: Literal[True] | Collection[str] | None = None, disable: Literal[True] | Collection[str] | None = None, default_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, default_loader_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, default_saver_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, log_to_file: bool = False, **kwargs)¶

Adapter enabling Hamilton’s caching feature through Builder.with_cache()

from hamilton import driver
import my_dataflow

dr = (
    driver.Builder()
    .with_modules(my_dataflow)
    .with_cache()
    .build()
)

# then, you can access the adapter via
dr.cache
__init__(path: str | Path = '.hamilton_cache', metadata_store: MetadataStore | None = None, result_store: ResultStore | None = None, default: Literal[True] | Collection[str] | None = None, recompute: Literal[True] | Collection[str] | None = None, ignore: Literal[True] | Collection[str] | None = None, disable: Literal[True] | Collection[str] | None = None, default_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, default_loader_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, default_saver_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, log_to_file: bool = False, **kwargs)¶

Initialize the cache adapter.

Parameters:
  • path – path where the cache metadata and results will be stored

  • metadata_store – BaseStore handling metadata for the cache adapter

  • result_store – BaseStore caching dataflow execution results

  • default – Set caching behavior to DEFAULT for specified node names. If True, apply to all nodes.

  • recompute – Set caching behavior to RECOMPUTE for specified node names. If True, apply to all nodes.

  • ignore – Set caching behavior to IGNORE for specified node names. If True, apply to all nodes.

  • disable – Set caching behavior to DISABLE for specified node names. If True, apply to all nodes.

  • default_behavior – Set the default caching behavior.

  • default_loader_behavior – Set the default caching behavior DataLoader nodes.

  • default_saver_behavior – Set the default caching behavior DataSaver nodes.

  • log_to_file – If True, append cache event logs as they happen in JSONL format.

do_node_execute(*, run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None, **future_kwargs)¶

Try to retrieve stored result from previous executions or execute the node.

Use the previously created cache_key to retrieve the data_version from memory or the metadata_store. If data_version is retrieved try to retrieve the result. If it fails, execute the node. Else, execute the node.

get_cache_key(run_id: str, node_name: str, task_id: str | None = None) str | S¶

Get the cache_key stored in-memory for a specific run_id, node_name, and task_id.

This method is public-facing and can be used directly to inspect the cache.

Parameters:
  • run_id – Id of the Hamilton execution run.

  • node_name – Name of the node associated with the cache key. node_name is a unique identifier if task-based execution is not used.

  • task_id – Id of the task when task-based execution is used. Then, the tuple (node_name, task_id) is a unique identifier.

Returns:

The cache key if it exists, otherwise return a sentinel value.

from hamilton import driver
import my_dataflow

dr = driver.Builder().with_modules(my_dataflow).with_cache().build()
dr.execute(...)

dr.cache.get_cache_key(run_id=dr.last_run_id, node_name="my_node", task_id=None)
get_data_version(run_id: str, node_name: str, cache_key: str | None = None, task_id: str | None = None) str | S¶

Get the data_version for a specific run_id, node_name, and task_id.

This method is public-facing and can be used directly to inspect the cache. This will check data versions stored both in-memory and in the metadata store.

Parameters:
  • run_id – Id of the Hamilton execution run.

  • node_name – Name of the node associated with the data version. node_name is a unique identifier if task-based execution is not used.

  • task_id – Id of the task when task-based execution is used. Then, the tuple (node_name, task_id) is a unique identifier.

Returns:

The data version if it exists, otherwise return a sentinel value.

..code-block:: python

from hamilton import driver import my_dataflow

dr = driver.Builder().with_modules(my_dataflow).with_cache().build() dr.execute(…)

dr.cache.get_data_version(run_id=dr.last_run_id, node_name=”my_node”, task_id=None)

property last_run_id¶

Run id of the last started run. Not necessarily the last to complete.

logs(run_id: str | None = None, level: Literal['debug', 'info'] = 'info') dict¶

Execution logs of the cache adapter.

Parameters:
  • run_id – If None, return all logged runs. If provided a run_id, group logs by node.

  • level – If "debug" log all events. If "info" only log if result is retrieved or executed.

Returns:

a mapping between node/task and a list of logged events

from hamilton import driver
import my_dataflow

dr = driver.Builder().with_modules(my_dataflow).with_cache().build()
dr.execute(...)
dr.execute(...)

all_logs = dr.cache.logs()
# all_logs is a dictionary with run_ids as keys and lists of CachingEvent as values.
# {
#    run_id_1: [CachingEvent(...), CachingEvent(...)],
#    run_id_2: [CachingEvent(...), CachingEvent(...)],
# }


run_logs = dr.cache.logs(run_id=dr.last_run_id)
# run_logs are keyed by ``node_name``
# {node_name: [CachingEvent(...), CachingEvent(...)], ...}
# or ``(node_name, task_id)`` if task-based execution is used.
# {(node_name_1, task_id_1): [CachingEvent(...), CachingEvent(...)], ...}
post_node_execute(*, run_id: str, node_: Node, result: str | None, success: bool = True, error: Exception | None = None, task_id: str | None = None, **future_kwargs)¶

Get the cache_key and data_version stored in memory (respectively from pre_node_execute and do_node_execute) and store the result in result_store if it doesn’t exist.

pre_graph_execute(*, run_id: str, graph: FunctionGraph, final_vars: List[str], inputs: Dict[str, Any], overrides: Dict[str, Any])¶

Set up the state of the adapter for a new execution.

Most attributes need to be keyed by run_id to prevent potential conflicts because the same adapter instance is shared between across all Driver.execute() calls.

pre_node_execute(*, run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None, **future_kwargs)¶

Before node execution or retrieval, create the cache_key and set it in memory. The cache_key is created based on the node’s code version and its dependencies’ data versions.

Collecting data_version for upstream dependencies requires handling special cases when task-based execution is used: - If the current node is COLLECT , the dependency annotated with Collect[] needs to be versioned item by item instead of versioning the full container. This is because the collect order is inconsistent. - If the current node is INSIDE and the dependency is EXPAND, this means the kwargs dictionary contains a single item. We need to version this individual item because it will not be available from “inside” the branch for some executors (multiprocessing, multithreading) because they lose access to the data_versions of OUTSIDE nodes stored in self.data_versions.

resolve_behaviors(run_id: str) Dict[str, CachingBehavior]¶

Resolve the caching behavior for each node based on the @cache decorator and the Builder.with_cache() parameters for a specific run_id.

This is a user-facing method.

Behavior specified via Builder.with_cache() have precedence. If no parameters are specified, the CachingBehavior.DEFAULT is used. If a node is Parallelizable (i.e., @expand), the CachingBehavior is set to CachingBehavior.RECOMPUTE to ensure the yielded items are versioned individually. Internally, this uses the FunctionGraph stored for each run_id and logs the resolved caching behavior for each node.

Parameters:

run_id – Id of the Hamilton execution run.

Returns:

A dictionary of {node name: caching behavior}.

resolve_code_versions(run_id: str, final_vars: List[str] | None = None, inputs: Dict[str, Any] | None = None, overrides: Dict[str, Any] | None = None) Dict[str, str]¶

Resolve the code version for each node for a specific run_id.

This is a user-facing method.

If final_vars is None, all nodes will be versioned. If final_vars is provided, the inputs and overrides are used to determine the execution path and only version the code for these nodes.

Parameters:
  • run_id – Id of the Hamilton execution run.

  • final_vars – Nodes requested for execution.

  • inputs – Input node values.

  • overrides – Override node values.

Returns:

A dictionary of {node name: code version}.

version_code(node_name: str, run_id: str | None = None) str¶

Create a unique code version for the source code defining the node

version_data(result: Any, run_id: str = None) str¶

Create a unique data version for the result

This is a user-facing method.

view_run(run_id: str | None = None, output_file_path: str | None = None)¶

View the dataflow execution, including cache hits/misses.

Parameters:
  • run_id – If None, view the last run. If provided a run_id, view that run.

  • output_file_path – If provided a path, save the visualization to a file.

from hamilton import driver
import my_dataflow

dr = driver.Builder().with_modules(my_dataflow).with_cache().build()

# execute 3 times
dr.execute(...)
dr.execute(...)
dr.execute(...)

# view the last run
dr.cache.view_run()
# this is equivalent to
dr.cache.view_run(run_id=dr.last_run_id)

# get a specific run id
run_id = dr.cache.run_ids[1]
dr.cache.view_run(run_id=run_id)

Quirks and limitations¶

Caching is a large and complex feature. This section is an attempt to list quirks and limitations, known and theoretical, to help debugging and guide feature development

  • The standard library includes a lot of types which are not primitives. Thus, Hamilton might not be supporting them explicitly. It should be simple to add, so ping us if you need it.

  • The ResultStore could be architectured better to support custom formats. Right now, we use a DataSaver to produce the .parquet file and we pickle the DataLoader for later retrieval. Then, the metadata and result stores are completely unaware of the .parquet file making it difficult to handle cache eviction.

  • When a function with default parameter values passes through lifecycle hooks, the default values are not part of the node_kwargs. They need to be retrieved manually from the node.Node object.

  • supporting the Hamilton AsyncDriver would require making the adapter async, but also the stores. A potential challenge is ensuring that you can use the same cache (i.e., same SQLite db and filesystem) for both sync and async drivers.

  • If the @cache allows to specify the format (e.g., json, parquet), we probably want .with_cache() to support the same feature.

  • Hamilton allows a single do_node_execute() hook. Since the caching feature uses it, it is currently incompatible with other adapters leveraging it (PDBDebugger, CacheAdapter (deprecated), GracefulErrorAdapter (somewhat redundant with caching), DiskCacheAdapter (deprecated), NarwhalsAdapter (could be refactored))

  • the presence of MD5 hashing can be seen as a security risk and prevent adoption. read more in DVC issues

  • when hitting the base case of fingerprinting.hash_value() we return the constant UNHASHABLE_VALUE. If the adapter receives this value, it will append a random UUID to it. This is to prevent collision between unhashable types. This data_version is no longer deterministic, but the value can still be retrieved or be part of another node’s cache_key.

  • having @functools.singledispatch(object) allows to override the base case of hash_value() because it will catch all types.