Caching logic¶
Caching Behavior¶
- class hamilton.caching.adapter.CachingBehavior(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
Behavior applied by the caching adapter
- DEFAULT:
Try to retrieve result from cache instead of executing the node. If the node is executed, store the result. Compute the result data version and store it too.
- RECOMPUTE:
Don’t try to retrieve result from cache and always execute the node. Otherwise, behaves as default. Useful when nodes are stochastic (e.g., model training) or interact with external components (e.g., read from database).
- DISABLE:
Node is executed as if the caching feature wasn’t enabled. It never tries to retrieve results. Results are never stored nor versioned. Behaves like IGNORE, but the node remains a dependency for downstream nodes. This means downstream cache lookup will likely fail systematically (i.e., if the cache is empty).
- IGNORE:
Node is executed as if the caching feature wasn’t enable. It never tries to retrieve results. Results are never stored nor versioned. IGNORE means downstream nodes will ignore this node as a dependency for lookup. Ignoring clients and connections can be useful since they shouldn’t directly impact the downstream results.
- classmethod from_string(string: str) CachingBehavior ¶
Create a caching behavior from a string of the enum value. This is leveraged by the
hamilton.lifecycle.caching.SmartCacheAdapter
and thehamilton.function_modifiers.metadata.cache
decorator.CachingBehavior.from_string("recompute")
@cache
decorator¶
- class hamilton.function_modifiers.metadata.cache(*, behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, format: Literal['json', 'file', 'pickle', 'parquet', 'csv', 'feather', 'orc', 'excel'] | str | None = None, target_: str | Collection[str] | None | ellipsis = Ellipsis)¶
- BEHAVIOR_KEY = 'cache.behavior'¶
- FORMAT_KEY = 'cache.format'¶
- __init__(*, behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, format: Literal['json', 'file', 'pickle', 'parquet', 'csv', 'feather', 'orc', 'excel'] | str | None = None, target_: str | Collection[str] | None | ellipsis = Ellipsis)¶
The
@cache
decorator can define the behavior and format of a specific node.This feature is implemented via tags, but that could change. Thus you should not rely on these tags for other purposes.
@cache(behavior="recompute", format="parquet") def raw_data() -> pd.DataFrame: ...
If the function uses other function modifiers and define multiple nodes, you can set
target_
to specify which nodes to cache. The following only caches theperformance
node.@cache(format="json", target_="performance") @extract_fields(trained_model=LinearRegression, performance: dict) def model_training() -> dict: # ... performance = {"rmse": 0.1, "mae": 0.2} return {"trained_model": trained_model, "performance": performance}
- Parameters:
behavior – The behavior of the cache. This can be one of the following: * default: caching is enabled * recompute: always compute the node instead of retrieving * ignore: the data version won’t be part of downstream keys * disable: act as if caching wasn’t enabled.
format – The format of the cache. This can be one of the following: * json: JSON format * file: file format * pickle: pickle format * parquet: parquet format * csv: csv format * feather: feather format * orc: orc format * excel: excel format
target_ – Target nodes to decorate. This can be one of the following: * None: tag all nodes outputted by this that are “final” (E.g. do not have a node outputted by this that depend on them) * Ellipsis (…): tag all nodes outputted by this * Collection[str]: tag only the nodes with the specified names * str: tag only the node with the specified name
- decorate_node(node_: Node) Node ¶
Decorates the nodes with the cache tags.
- Parameters:
node – Node to decorate
- Returns:
Copy of the node, with tags assigned
Logging¶
- class hamilton.caching.adapter.CachingEvent(run_id: str, actor: ~typing.Literal['adapter', 'metadata_store', 'result_store'], event_type: ~hamilton.caching.adapter.CachingEventType, node_name: str, task_id: str | None = None, msg: str | None = None, value: ~typing.Any | None = None, timestamp: float = <factory>)¶
Event logged by the caching adapter
- __init__(run_id: str, actor: ~typing.Literal['adapter', 'metadata_store', 'result_store'], event_type: ~hamilton.caching.adapter.CachingEventType, node_name: str, task_id: str | None = None, msg: str | None = None, value: ~typing.Any | None = None, timestamp: float = <factory>) None ¶
- class hamilton.caching.adapter.CachingEventType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
Event types logged by the caching adapter
Adapter¶
- class hamilton.caching.adapter.HamiltonCacheAdapter(path: str | Path = '.hamilton_cache', metadata_store: MetadataStore | None = None, result_store: ResultStore | None = None, default: Literal[True] | Collection[str] | None = None, recompute: Literal[True] | Collection[str] | None = None, ignore: Literal[True] | Collection[str] | None = None, disable: Literal[True] | Collection[str] | None = None, default_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, default_loader_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, default_saver_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, log_to_file: bool = False, **kwargs)¶
Adapter enabling Hamilton’s caching feature through
Builder.with_cache()
from hamilton import driver import my_dataflow dr = ( driver.Builder() .with_modules(my_dataflow) .with_cache() .build() ) # then, you can access the adapter via dr.cache
- __init__(path: str | Path = '.hamilton_cache', metadata_store: MetadataStore | None = None, result_store: ResultStore | None = None, default: Literal[True] | Collection[str] | None = None, recompute: Literal[True] | Collection[str] | None = None, ignore: Literal[True] | Collection[str] | None = None, disable: Literal[True] | Collection[str] | None = None, default_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, default_loader_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, default_saver_behavior: Literal['default', 'recompute', 'ignore', 'disable'] | None = None, log_to_file: bool = False, **kwargs)¶
Initialize the cache adapter.
- Parameters:
path – path where the cache metadata and results will be stored
metadata_store – BaseStore handling metadata for the cache adapter
result_store – BaseStore caching dataflow execution results
default – Set caching behavior to DEFAULT for specified node names. If True, apply to all nodes.
recompute – Set caching behavior to RECOMPUTE for specified node names. If True, apply to all nodes.
ignore – Set caching behavior to IGNORE for specified node names. If True, apply to all nodes.
disable – Set caching behavior to DISABLE for specified node names. If True, apply to all nodes.
default_behavior – Set the default caching behavior.
default_loader_behavior – Set the default caching behavior DataLoader nodes.
default_saver_behavior – Set the default caching behavior DataSaver nodes.
log_to_file – If True, append cache event logs as they happen in JSONL format.
- do_node_execute(*, run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None, **future_kwargs)¶
Try to retrieve stored result from previous executions or execute the node.
Use the previously created cache_key to retrieve the data_version from memory or the metadata_store. If data_version is retrieved try to retrieve the result. If it fails, execute the node. Else, execute the node.
- get_cache_key(run_id: str, node_name: str, task_id: str | None = None) str | S ¶
Get the
cache_key
stored in-memory for a specificrun_id
,node_name
, andtask_id
.This method is public-facing and can be used directly to inspect the cache.
- Parameters:
run_id – Id of the Hamilton execution run.
node_name – Name of the node associated with the cache key.
node_name
is a unique identifier if task-based execution is not used.task_id – Id of the task when task-based execution is used. Then, the tuple
(node_name, task_id)
is a unique identifier.
- Returns:
The cache key if it exists, otherwise return a sentinel value.
from hamilton import driver import my_dataflow dr = driver.Builder().with_modules(my_dataflow).with_cache().build() dr.execute(...) dr.cache.get_cache_key(run_id=dr.last_run_id, node_name="my_node", task_id=None)
- get_data_version(run_id: str, node_name: str, cache_key: str | None = None, task_id: str | None = None) str | S ¶
Get the
data_version
for a specificrun_id
,node_name
, andtask_id
.This method is public-facing and can be used directly to inspect the cache. This will check data versions stored both in-memory and in the metadata store.
- Parameters:
run_id – Id of the Hamilton execution run.
node_name – Name of the node associated with the data version.
node_name
is a unique identifier if task-based execution is not used.task_id – Id of the task when task-based execution is used. Then, the tuple
(node_name, task_id)
is a unique identifier.
- Returns:
The data version if it exists, otherwise return a sentinel value.
..code-block:: python
from hamilton import driver import my_dataflow
dr = driver.Builder().with_modules(my_dataflow).with_cache().build() dr.execute(…)
dr.cache.get_data_version(run_id=dr.last_run_id, node_name=”my_node”, task_id=None)
- property last_run_id¶
Run id of the last started run. Not necessarily the last to complete.
- logs(run_id: str | None = None, level: Literal['debug', 'info'] = 'info') dict ¶
Execution logs of the cache adapter.
- Parameters:
run_id – If
None
, return all logged runs. If provided arun_id
, group logs by node.level – If
"debug"
log all events. If"info"
only log if result is retrieved or executed.
- Returns:
a mapping between node/task and a list of logged events
from hamilton import driver import my_dataflow dr = driver.Builder().with_modules(my_dataflow).with_cache().build() dr.execute(...) dr.execute(...) all_logs = dr.cache.logs() # all_logs is a dictionary with run_ids as keys and lists of CachingEvent as values. # { # run_id_1: [CachingEvent(...), CachingEvent(...)], # run_id_2: [CachingEvent(...), CachingEvent(...)], # } run_logs = dr.cache.logs(run_id=dr.last_run_id) # run_logs are keyed by ``node_name`` # {node_name: [CachingEvent(...), CachingEvent(...)], ...} # or ``(node_name, task_id)`` if task-based execution is used. # {(node_name_1, task_id_1): [CachingEvent(...), CachingEvent(...)], ...}
- post_node_execute(*, run_id: str, node_: Node, result: str | None, success: bool = True, error: Exception | None = None, task_id: str | None = None, **future_kwargs)¶
Get the cache_key and data_version stored in memory (respectively from pre_node_execute and do_node_execute) and store the result in result_store if it doesn’t exist.
- pre_graph_execute(*, run_id: str, graph: FunctionGraph, final_vars: List[str], inputs: Dict[str, Any], overrides: Dict[str, Any])¶
Set up the state of the adapter for a new execution.
Most attributes need to be keyed by run_id to prevent potential conflicts because the same adapter instance is shared between across all
Driver.execute()
calls.
- pre_node_execute(*, run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None, **future_kwargs)¶
Before node execution or retrieval, create the cache_key and set it in memory. The cache_key is created based on the node’s code version and its dependencies’ data versions.
Collecting
data_version
for upstream dependencies requires handling special cases when task-based execution is used: - If the current node isCOLLECT
, the dependency annotated withCollect[]
needs to be versioned item by item instead of versioning the full container. This is because the collect order is inconsistent. - If the current node isINSIDE
and the dependency isEXPAND
, this means thekwargs
dictionary contains a single item. We need to version this individual item because it will not be available from “inside” the branch for some executors (multiprocessing, multithreading) because they lose access to the data_versions ofOUTSIDE
nodes stored inself.data_versions
.
- resolve_behaviors(run_id: str) Dict[str, CachingBehavior] ¶
Resolve the caching behavior for each node based on the
@cache
decorator and theBuilder.with_cache()
parameters for a specificrun_id
.This is a user-facing method.
Behavior specified via
Builder.with_cache()
have precedence. If no parameters are specified, theCachingBehavior.DEFAULT
is used. If a node isParallelizable
(i.e.,@expand
), theCachingBehavior
is set toCachingBehavior.RECOMPUTE
to ensure the yielded items are versioned individually. Internally, this uses theFunctionGraph
stored for eachrun_id
and logs the resolved caching behavior for each node.- Parameters:
run_id – Id of the Hamilton execution run.
- Returns:
A dictionary of
{node name: caching behavior}
.
- resolve_code_versions(run_id: str, final_vars: List[str] | None = None, inputs: Dict[str, Any] | None = None, overrides: Dict[str, Any] | None = None) Dict[str, str] ¶
Resolve the code version for each node for a specific
run_id
.This is a user-facing method.
If
final_vars
is None, all nodes will be versioned. Iffinal_vars
is provided, theinputs
andoverrides
are used to determine the execution path and only version the code for these nodes.- Parameters:
run_id – Id of the Hamilton execution run.
final_vars – Nodes requested for execution.
inputs – Input node values.
overrides – Override node values.
- Returns:
A dictionary of
{node name: code version}
.
- version_code(node_name: str, run_id: str | None = None) str ¶
Create a unique code version for the source code defining the node
- version_data(result: Any, run_id: str = None) str ¶
Create a unique data version for the result
This is a user-facing method.
- view_run(run_id: str | None = None, output_file_path: str | None = None)¶
View the dataflow execution, including cache hits/misses.
- Parameters:
run_id – If
None
, view the last run. If provided arun_id
, view that run.output_file_path – If provided a path, save the visualization to a file.
from hamilton import driver import my_dataflow dr = driver.Builder().with_modules(my_dataflow).with_cache().build() # execute 3 times dr.execute(...) dr.execute(...) dr.execute(...) # view the last run dr.cache.view_run() # this is equivalent to dr.cache.view_run(run_id=dr.last_run_id) # get a specific run id run_id = dr.cache.run_ids[1] dr.cache.view_run(run_id=run_id)
Quirks and limitations¶
Caching is a large and complex feature. This section is an attempt to list quirks and limitations, known and theoretical, to help debugging and guide feature development
The standard library includes a lot of types which are not primitives. Thus, Hamilton might not be supporting them explicitly. It should be simple to add, so ping us if you need it.
The
ResultStore
could be architectured better to support custom formats. Right now, we use aDataSaver
to produce the.parquet
file and we pickle theDataLoader
for later retrieval. Then, the metadata and result stores are completely unaware of the.parquet
file making it difficult to handle cache eviction.When a function with default parameter values passes through lifecycle hooks, the default values are not part of the
node_kwargs
. They need to be retrieved manually from thenode.Node
object.supporting the Hamilton
AsyncDriver
would require making the adapter async, but also the stores. A potential challenge is ensuring that you can use the same cache (i.e., same SQLite db and filesystem) for both sync and async drivers.If the
@cache
allows to specify theformat
(e.g.,json
,parquet
), we probably want.with_cache()
to support the same feature.Hamilton allows a single
do_node_execute()
hook. Since the caching feature uses it, it is currently incompatible with other adapters leveraging it (PDBDebugger
,CacheAdapter
(deprecated),GracefulErrorAdapter
(somewhat redundant with caching),DiskCacheAdapter
(deprecated),NarwhalsAdapter
(could be refactored))the presence of MD5 hashing can be seen as a security risk and prevent adoption. read more in DVC issues
when hitting the base case of
fingerprinting.hash_value()
we return the constantUNHASHABLE_VALUE
. If the adapter receives this value, it will append a random UUID to it. This is to prevent collision between unhashable types. Thisdata_version
is no longer deterministic, but the value can still be retrieved or be part of another node’scache_key
.having
@functools.singledispatch(object)
allows to override the base case ofhash_value()
because it will catch all types.