Caching¶

In Hamilton, caching broadly refers to “reusing results from previous executions to skip redundant computation”. If you change code or pass new data, it will automatically determine which results can be reused and which nodes need to be re-executed. This improves execution speed and reduces resource usage (computation, API credits, etc.).

Note

Open the notebook in Google Colab for an interactive version and better syntax highlighting.

Throughout this tutorial, we’ll be using the Hamilton notebook extension to define dataflows directly in the notebook (see tutorial).

from hamilton import driver

# load the notebook extension
%reload_ext hamilton.plugins.jupyter_magic

We import the logging module and get the logger from hamilton.caching. With the level set to INFO, we’ll see GET_RESULT and EXECUTE_NODE cache events as they happen.

import logging

logger = logging.getLogger("hamilton.caching")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

The next cell deletes the cached data to ensure this notebook can be run from top to bottom without any issues.

import shutil

shutil.rmtree("./.hamilton_cache", ignore_errors=True)

Basics¶

Throughout this notebook, we’ll use the same simple dataflow that processes transactions in various locations and currencies.

We use the cell magic %%cell_to_module from the Hamilton notebook extension. It will convert the content of the cell into a Python module that can be loaded by Hamilton. The --display flag allows to visualize the dataflow.

%%cell_to_module basics_module --display
import pandas as pd

DATA = {
    "cities": ["New York", "Los Angeles", "Chicago", "Montréal", "Vancouver"],
    "date": ["2024-09-13", "2024-09-12", "2024-09-11", "2024-09-11", "2024-09-09"],
    "amount": [478.23, 251.67, 989.34, 742.14, 584.56],
    "country": ["USA", "USA", "USA", "Canada", "Canada"],
    "currency": ["USD", "USD", "USD", "CAD", "CAD"],
}

def raw_data() -> pd.DataFrame:
    """Loading raw data. This simulates loading from a file, database, or external service."""
    return pd.DataFrame(DATA)

def processed_data(raw_data: pd.DataFrame, cutoff_date: str) -> pd.DataFrame:
    """Filter out rows before cutoff date and convert currency to USD."""
    df = raw_data.loc[raw_data.date > cutoff_date].copy()
    df["amound_in_usd"] = df["amount"]
    df.loc[df.country == "Canada", "amound_in_usd"] *= 0.73
    return df
../../_images/1e83e2488806989a3e8d851ca9599dd6efcb3cb7564e574d703f8c115927d1bc.svg

Then, we build the Driver with caching enabled and execute the dataflow.

basics_dr = driver.Builder().with_modules(basics_module).with_cache().build()

basics_results_1 = basics_dr.execute(["processed_data"], inputs={"cutoff_date": "2024-09-01"})
print()
print(basics_results_1["processed_data"].head())
raw_data::adapter::execute_node
processed_data::adapter::execute_node
        cities        date  amount country currency  amound_in_usd
0     New York  2024-09-13  478.23     USA      USD       478.2300
1  Los Angeles  2024-09-12  251.67     USA      USD       251.6700
2      Chicago  2024-09-11  989.34     USA      USD       989.3400
3     Montréal  2024-09-11  742.14  Canada      CAD       541.7622
4    Vancouver  2024-09-09  584.56  Canada      CAD       426.7288

We can view what values were retrieved from the cache using dr.cache.view_run(). Since this was the first execution, nothing is retrieved.

basics_dr.cache.view_run()
../../_images/b09930f285394509c047b0f9475d96dfbd906af26aca3e8777d3f017056bcca3.svg

On the second execution, processed_data is retrieved from cache as reported in the logs and highlighted in the visualization

basics_results_2 = basics_dr.execute(["processed_data"], inputs={"cutoff_date": "2024-09-01"})
print()
print(basics_results_2["processed_data"].head())
print()
basics_dr.cache.view_run()
raw_data::result_store::get_result::hit
processed_data::result_store::get_result::hit
        cities        date  amount country currency  amound_in_usd
0     New York  2024-09-13  478.23     USA      USD       478.2300
1  Los Angeles  2024-09-12  251.67     USA      USD       251.6700
2      Chicago  2024-09-11  989.34     USA      USD       989.3400
3     Montréal  2024-09-11  742.14  Canada      CAD       541.7622
4    Vancouver  2024-09-09  584.56  Canada      CAD       426.7288
../../_images/ca81b82f367a3921e372f4d1951340b309c4705c002a46a8f4224e4f2db03ea8.svg

Understanding the cache_key¶

The Hamilton cache stores results using a cache_key. It is composed of the node’s name (node_name), the code that defines it (code_version), and its data inputs (data_version of its dependencies).

For example, the cache keys for the previous cells are:

{
    "node_name": "raw_data",
    "code_version": "9d727859b9fd883247c3379d4d25a35af4a56df9d9fde20c75c6375dde631c68",
    "dependencies_data_versions": {}  // it has no dependencies
}
{
    "node_name": "processed_data",
    "code_version": "c9e3377d6c5044944bd89eeb7073c730ee8707627c39906b4156c6411f056f00",
    "dependencies_data_versions": {
        "cutoff_date": "WkGjJythLWYAIj2Qr8T_ug==",  // input value
        "raw_data": "t-BDcMLikFSNdn4piUKy1mBcKPoEsnsYjUNzWg=="  // raw_data's result
    }
}

Results could be successfully retrieved because nodes in the first execution and second execution shared the same cache_key.

The cache_key objects are internal and you won’t have to interact with them directly. However, keep that concept in mind throughout this tutorial. Towards the end, we show how to manually handle the cache_key for debugging.

Adding a node¶

Let’s say you’re iteratively developing your dataflow and you add a new node. Here, we copy the previous module into a new module named adding_node_module and define the node amount_per_country.

In practice, you would edit the cell directly, but this makes the notebook easier to read and maintain

%%cell_to_module adding_node_module --display
import pandas as pd

DATA = {
    "cities": ["New York", "Los Angeles", "Chicago", "Montréal", "Vancouver"],
    "date": ["2024-09-13", "2024-09-12", "2024-09-11", "2024-09-11", "2024-09-09"],
    "amount": [478.23, 251.67, 989.34, 742.14, 584.56],
    "country": ["USA", "USA", "USA", "Canada", "Canada"],
    "currency": ["USD", "USD", "USD", "CAD", "CAD"],
}

def raw_data() -> pd.DataFrame:
    """Loading raw data. This simulates loading from a file, database, or external service."""
    return pd.DataFrame(DATA)

def processed_data(raw_data: pd.DataFrame, cutoff_date: str) -> pd.DataFrame:
    """Filter out rows before cutoff date and convert currency to USD."""
    df = raw_data.loc[raw_data.date > cutoff_date].copy()
    df["amound_in_usd"] = df["amount"]
    df.loc[df.country == "Canada", "amound_in_usd"] *= 0.73
    return df

def amount_per_country(processed_data: pd.DataFrame) -> pd.DataFrame:
    """Sum the amount in USD per country"""
    return processed_data.groupby("country")["amound_in_usd"].sum().to_frame()
../../_images/45f163869e87bd049f81c053641ae90c2783ddf30411449a5c2f1bac2ff1da6e.svg

We build a new Driver with adding_node_module and execute the dataflow. You’ll notice that raw_data and processed_data are retrieved and only amount_per_country is executed.

adding_node_dr = driver.Builder().with_modules(adding_node_module).with_cache().build()

adding_node_results = adding_node_dr.execute(
    ["processed_data", "amount_per_country"],
    inputs={"cutoff_date": "2024-09-01"}
)
print()
print(adding_node_results["amount_per_country"].head())
print()
adding_node_dr.cache.view_run()
raw_data::result_store::get_result::hit
processed_data::result_store::get_result::hit
amount_per_country::adapter::execute_node
         amound_in_usd
country               
Canada         968.491
USA           1719.240
../../_images/f75eb13ed268ce05dc7e45abe473f3ef29b1f801ee774998d9b21f42e18becd9.svg

Even though this is the first execution of adding_node_dr and the module adding_node_module, the cache contains results for raw_data and processed_data. We’re able to retrieve values because they have the same cache keys (code version and dependencies data versions).

This means you can reuse cached results across dataflows. This is particularly useful with training and inference machine learning pipelines.

Changing inputs¶

We reuse the same dataflow adding_node_module, but change the input cutoff_date from "2024-09-01" to "2024-09-11".

This new input forces processed_data to be re-executed. This produces a new result for processed_data, which cascades and also forced amount_per_country to be re-executed.

changing_inputs_dr = driver.Builder().with_modules(adding_node_module).with_cache().build()

changing_inputs_results_1 = changing_inputs_dr.execute(
    ["processed_data", "amount_per_country"],
    inputs={"cutoff_date": "2024-09-11"}
)
print()
print(changing_inputs_results_1["amount_per_country"].head())
print()
changing_inputs_dr.cache.view_run()
raw_data::result_store::get_result::hit
processed_data::adapter::execute_node
amount_per_country::adapter::execute_node
         amound_in_usd
country               
USA              729.9
../../_images/8635b2f729922dc7b680c54e8709b3c4b03939d84e5fec4c3cb878a364c332ca.svg

Now, we execute with the cutoff_date value "2024-09-05", which forces processed_data to be executed.

changing_inputs_results_2 = changing_inputs_dr.execute(
    ["processed_data", "amount_per_country"],
    inputs={"cutoff_date": "2024-09-05"}
)
print()
print(changing_inputs_results_2["amount_per_country"].head())
print()
changing_inputs_dr.cache.view_run()
raw_data::result_store::get_result::hit
processed_data::adapter::execute_node
amount_per_country::result_store::get_result::hit
         amound_in_usd
country               
Canada         968.491
USA           1719.240
../../_images/1b539b88b1d02240de09b908155ac486f37633c59e6040466a0e36631f7f220d.svg

Notice that the cache could still retrieve amount_per_country. This is because processed_data return a value that had been cached previously (in the Adding a node section).

In concrete terms, filtering rows by the date "2024-09-05" or "2024-09-01" includes the same rows and produces the same dataframe.

print(adding_node_results["processed_data"])
print()
print(changing_inputs_results_2["processed_data"])
        cities        date  amount country currency  amound_in_usd
0     New York  2024-09-13  478.23     USA      USD       478.2300
1  Los Angeles  2024-09-12  251.67     USA      USD       251.6700
2      Chicago  2024-09-11  989.34     USA      USD       989.3400
3     Montréal  2024-09-11  742.14  Canada      CAD       541.7622
4    Vancouver  2024-09-09  584.56  Canada      CAD       426.7288

        cities        date  amount country currency  amound_in_usd
0     New York  2024-09-13  478.23     USA      USD       478.2300
1  Los Angeles  2024-09-12  251.67     USA      USD       251.6700
2      Chicago  2024-09-11  989.34     USA      USD       989.3400
3     Montréal  2024-09-11  742.14  Canada      CAD       541.7622
4    Vancouver  2024-09-09  584.56  Canada      CAD       426.7288

Changing code¶

As you develop your dataflow, you will need to edit upstream nodes. Caching will automatically detect code changes and determine which node needs to be re-executed. In processed_data(), we’ll change the conversation rate from 0.73 to 0.71.

NOTE. changes to docstrings and comments # are ignored when versioning a node.

%%cell_to_module changing_code_module
import pandas as pd

DATA = {
    "cities": ["New York", "Los Angeles", "Chicago", "Montréal", "Vancouver"],
    "date": ["2024-09-13", "2024-09-12", "2024-09-11", "2024-09-11", "2024-09-09"],
    "amount": [478.23, 251.67, 989.34, 742.14, 584.56],
    "country": ["USA", "USA", "USA", "Canada", "Canada"],
    "currency": ["USD", "USD", "USD", "CAD", "CAD"],
}

def raw_data() -> pd.DataFrame:
    """Loading raw data. This simulates loading from a file, database, or external service."""
    return pd.DataFrame(DATA)

def processed_data(raw_data: pd.DataFrame, cutoff_date: str) -> pd.DataFrame:
    """Filter out rows before cutoff date and convert currency to USD."""
    df = raw_data.loc[raw_data.date > cutoff_date].copy()
    df["amound_in_usd"] = df["amount"]
    df.loc[df.country == "Canada", "amound_in_usd"] *= 0.71  # <- VALUE CHANGED FROM module_2
    return df

def amount_per_country(processed_data: pd.DataFrame) -> pd.DataFrame:
    """Sum the amount in USD per country"""
    return processed_data.groupby("country")["amound_in_usd"].sum().to_frame()

We need to execute processed_data because the code change created a new cache_key and led to a cache miss. Then, processed_data returns a previously unseen value, forcing amount_per_country to also be re-executed

changing_code_dr_1 = driver.Builder().with_modules(changing_code_module).with_cache().build()

changing_code_results_1 = changing_code_dr_1.execute(
    ["processed_data", "amount_per_country"],
    inputs={"cutoff_date": "2024-09-01"}
)
print()
print(changing_code_results_1["amount_per_country"].head())
print()
changing_code_dr_1.cache.view_run()
raw_data::result_store::get_result::hit
processed_data::adapter::execute_node
amount_per_country::adapter::execute_node
         amound_in_usd
country               
Canada         941.957
USA           1719.240
../../_images/8635b2f729922dc7b680c54e8709b3c4b03939d84e5fec4c3cb878a364c332ca.svg

We make another code change to processed_data to accomodate currency conversion for Brazil and Mexico.

%%cell_to_module changing_code_module_2
import pandas as pd

DATA = {
    "cities": ["New York", "Los Angeles", "Chicago", "Montréal", "Vancouver"],
    "date": ["2024-09-13", "2024-09-12", "2024-09-11", "2024-09-11", "2024-09-09"],
    "amount": [478.23, 251.67, 989.34, 742.14, 584.56],
    "country": ["USA", "USA", "USA", "Canada", "Canada"],
    "currency": ["USD", "USD", "USD", "CAD", "CAD"],
}

def raw_data() -> pd.DataFrame:
    """Loading raw data. This simulates loading from a file, database, or external service."""
    return pd.DataFrame(DATA)

def processed_data(raw_data: pd.DataFrame, cutoff_date: str) -> pd.DataFrame:
    """Filter out rows before cutoff date and convert currency to USD."""
    df = raw_data.loc[raw_data.date > cutoff_date].copy()
    df["amound_in_usd"] = df["amount"]
    df.loc[df.country == "Canada", "amound_in_usd"] *= 0.71  
    df.loc[df.country == "Brazil", "amound_in_usd"] *= 0.18  # <- LINE ADDED
    df.loc[df.country == "Mexico", "amound_in_usd"] *= 0.05  # <- LINE ADDED
    return df

def amount_per_country(processed_data: pd.DataFrame) -> pd.DataFrame:
    """Sum the amount in USD per country"""
    return processed_data.groupby("country")["amound_in_usd"].sum().to_frame()

Again, the code change forces processed_data to be executed.

changing_code_dr_2 = driver.Builder().with_modules(changing_code_module_2).with_cache().build()

changing_code_results_2 = changing_code_dr_2.execute(["processed_data","amount_per_country"], inputs={"cutoff_date": "2024-09-01"})
print()
print(changing_code_results_2["amount_per_country"].head())
print()
changing_code_dr_2.cache.view_run()
raw_data::result_store::get_result::hit
processed_data::adapter::execute_node
amount_per_country::result_store::get_result::hit
         amound_in_usd
country               
Canada         941.957
USA           1719.240
../../_images/1b539b88b1d02240de09b908155ac486f37633c59e6040466a0e36631f7f220d.svg

However, amount_per_country can be retrieved because processed_data returned a previously seen value.

In concrete terms, adding code to process currency from Brazil and Mexico didn’t change the processed_data result because it only includes data from the USA and Canada.

NOTE. This is similar to what happened at the end of the section Changing inputs.

print(changing_code_results_1["processed_data"])
print()
print(changing_code_results_2["processed_data"])
        cities        date  amount country currency  amound_in_usd
0     New York  2024-09-13  478.23     USA      USD       478.2300
1  Los Angeles  2024-09-12  251.67     USA      USD       251.6700
2      Chicago  2024-09-11  989.34     USA      USD       989.3400
3     Montréal  2024-09-11  742.14  Canada      CAD       526.9194
4    Vancouver  2024-09-09  584.56  Canada      CAD       415.0376

        cities        date  amount country currency  amound_in_usd
0     New York  2024-09-13  478.23     USA      USD       478.2300
1  Los Angeles  2024-09-12  251.67     USA      USD       251.6700
2      Chicago  2024-09-11  989.34     USA      USD       989.3400
3     Montréal  2024-09-11  742.14  Canada      CAD       526.9194
4    Vancouver  2024-09-09  584.56  Canada      CAD       415.0376

Changing external data¶

Hamilton’s caching mechanism uses the node’s code_version and its dependencies data_version to determine if the node needs to be executed or the result can be retrieved from cache. By default, it assumes idempotency of operations.

This section covers how to handle node with external effects, such as reading or writing external data.

Idempotency¶

To illustrate idempotency, let’s use this minimal dataflow which has a single node that returns the current date and time:

import datetime

def current_datetime() -> datetime.datetime:
    return datetime.datetime.now()

The first execution will execute the node and store the resulting date and time. On the second execution, the cache will read the stored result instead of re-executing. Why? Because the code_version is the same and the dependencies data_version (it has no dependencies) haven’t changed.

A similar situation occurs when reading from external data, as shown here:

import pandas as pd

def dataset(file_path: str) -> pd.DataFrame:
    return pd.read_csv(file_path)

Here, the code of dataset() and the value for file_path can stay the same, but the file itself could be updated (e.g., new rows added).

The next sections show how to always re-execute a node and ensure the latest data is used. The DATA constant is modified with transactions in Brazil and Mexico to simulate raw_data loading a new dataset.

%%cell_to_module changing_external_module
import pandas as pd

DATA = {
    "cities": ["New York", "Los Angeles", "Chicago", "Montréal", "Vancouver", "Houston", "Phoenix", "Mexico City", "Chihuahua City", "Rio de Janeiro"],
    "date": ["2024-09-13", "2024-09-12", "2024-09-11", "2024-09-11", "2024-09-09", "2024-09-08", "2024-09-07", "2024-09-06", "2024-09-05", "2024-09-04"],
    "amount": [478.23, 251.67, 989.34, 742.14, 584.56, 321.85, 918.67, 135.22, 789.12, 432.78],
    "country": ["USA", "USA", "USA", "Canada", "Canada", "USA", "USA", "Mexico", "Mexico", "Brazil"],
    "currency": ["USD", "USD", "USD", "CAD", "CAD", "USD", "USD", "MXN", "MXN", "BRL"],
}

def raw_data() -> pd.DataFrame:
    """Loading raw data. This simulates loading from a file, database, or external service."""
    return pd.DataFrame(DATA)

def processed_data(raw_data: pd.DataFrame, cutoff_date: str) -> pd.DataFrame:
    """Filter out rows before cutoff date and convert currency to USD."""
    df = raw_data.loc[raw_data.date > cutoff_date].copy()
    df["amound_in_usd"] = df["amount"]
    df.loc[df.country == "Canada", "amound_in_usd"] *= 0.71  
    df.loc[df.country == "Brazil", "amound_in_usd"] *= 0.18
    df.loc[df.country == "Mexico", "amound_in_usd"] *= 0.05
    return df

def amount_per_country(processed_data: pd.DataFrame) -> pd.DataFrame:
    """Sum the amount in USD per country"""
    return processed_data.groupby("country")["amound_in_usd"].sum().to_frame()

At execution, we see raw_data being retrieved along with all downstream nodes. Also, we note that the printed results don’t include Brazil nor Mexico.

changing_external_dr = driver.Builder().with_modules(changing_external_module).with_cache().build()

changing_external_results = changing_external_dr.execute(["amount_per_country"], inputs={"cutoff_date": "2024-09-01"})
print()
print(changing_external_results["amount_per_country"].head())
print()
changing_external_dr.cache.view_run()
raw_data::result_store::get_result::hit
processed_data::result_store::get_result::hit
amount_per_country::result_store::get_result::hit
         amound_in_usd
country               
Canada         941.957
USA           1719.240
../../_images/eea7bb8b5fbb5e62b5d5fc5afb75560e8f8b051e675e02a74f8aa069fc6890cd.svg

.with_cache() to specify caching behavior¶

Here, we build a new Driver with the same changing_external_module, but we specify in .with_cache() to always recompute raw_data.

The visualization shows that raw_data was executed, and because of the new data, all downstream nodes also need to be executed. The results now include Brazil and Mexico.

changing_external_with_cache_dr = driver.Builder().with_modules(changing_external_module).with_cache(recompute=["raw_data"]).build()

changing_external_with_cache_results = changing_external_with_cache_dr.execute(["amount_per_country"], inputs={"cutoff_date": "2024-09-01"})
print()
print(changing_external_with_cache_results["amount_per_country"].head())
print()
changing_external_with_cache_dr.cache.view_run()
raw_data::adapter::execute_node
processed_data::adapter::execute_node
amount_per_country::adapter::execute_node
         amound_in_usd
country               
Brazil         77.9004
Canada        941.9570
Mexico         46.2170
USA          2959.7600
../../_images/07680997efcfb99a67e5c023937a860033cd2b884441c9f81fe04f687511ba15.svg

@cache to specify caching behavior¶

Another way to specify the RECOMPUTE behavior is to use the @cache decorator.

%%cell_to_module changing_external_decorator_module
import pandas as pd
from hamilton.function_modifiers import cache

DATA = {
    "cities": ["New York", "Los Angeles", "Chicago", "Montréal", "Vancouver", "Houston", "Phoenix", "Mexico City", "Chihuahua City", "Rio de Janeiro"],
    "date": ["2024-09-13", "2024-09-12", "2024-09-11", "2024-09-11", "2024-09-09", "2024-09-08", "2024-09-07", "2024-09-06", "2024-09-05", "2024-09-04"],
    "amount": [478.23, 251.67, 989.34, 742.14, 584.56, 321.85, 918.67, 135.22, 789.12, 432.78],
    "country": ["USA", "USA", "USA", "Canada", "Canada", "USA", "USA", "Mexico", "Mexico", "Brazil"],
    "currency": ["USD", "USD", "USD", "CAD", "CAD", "USD", "USD", "MXN", "MXN", "BRL"],
}

@cache(behavior="recompute")
def raw_data() -> pd.DataFrame:
    """Loading raw data. This simulates loading from a file, database, or external service."""
    return pd.DataFrame(DATA)

def processed_data(raw_data: pd.DataFrame, cutoff_date: str) -> pd.DataFrame:
    """Filter out rows before cutoff date and convert currency to USD."""
    df = raw_data.loc[raw_data.date > cutoff_date].copy()
    df["amound_in_usd"] = df["amount"]
    df.loc[df.country == "Canada", "amound_in_usd"] *= 0.71  
    df.loc[df.country == "Brazil", "amound_in_usd"] *= 0.18
    df.loc[df.country == "Mexico", "amound_in_usd"] *= 0.05
    return df

def amount_per_country(processed_data: pd.DataFrame) -> pd.DataFrame:
    """Sum the amount in USD per country"""
    return processed_data.groupby("country")["amound_in_usd"].sum().to_frame()

We build a new Driver with changing_external_cache_decorator_module, which includes the @cache decorator. Note that we don’t specify anything in .with_cache().

changing_external_decorator_dr = (
    driver.Builder()
    .with_modules(changing_external_decorator_module)
    .with_cache()
    .build()
)

changing_external_decorator_results = changing_external_decorator_dr.execute(
    ["amount_per_country"],
    inputs={"cutoff_date": "2024-09-01"}
)
print()
print(changing_external_decorator_results["amount_per_country"].head())
print()
changing_external_decorator_dr.cache.view_run()
raw_data::adapter::execute_node
processed_data::result_store::get_result::hit
amount_per_country::result_store::get_result::hit
         amound_in_usd
country               
Brazil         77.9004
Canada        941.9570
Mexico         46.2170
USA          2959.7600
../../_images/c10310d3db77ca1780ef83c2a2b8450cc67a86301d5a14d880c75d548bc8f310.svg

We see that raw_data was re-executed. Then, processed_data and amount_per_country can be retrieved because they were produced just before by the changing_external_with_cache_dr

When to use @cache vs. .with_cache()?¶

Specifying the caching behavior via .with_cache() or @cache is entirely equivalent. There are benefits to either approach:

  • @cache: specify behavior at the dataflow-level. The behavior is tied to the node and will be picked up by all Driver loading the module. This can prevent errors or unexpected behaviors for users of that dataflow.

  • .with_cache(): specify behavior at the Driver-level. Gives the flexiblity to change the behavior without modifying the dataflow code and committing changes. You might be ok with DEFAULT during development, but want to ensure RECOMPUTE in production.

Importantly, the behavior specified in .with_cache(...) overrides whatever is in @cache because it is closer to execution. For example, having .with_cache(default=["raw_data"]) @cache(behavior="recompute") would force DEFAULT behavior.

⛔ Important: Using the @cache decorator alone doesn’t enable caching; adding .with_cache() to the Builder does. The decorator is only a mean to specify special behaviors for a node.

Force recompute all¶

By specifying .with_cache(recompute=True), you are setting the behavior RECOMPUTE for all nodes. This forces recomputation, which is useful for producing a “cache refresh” with up-to-date values.

recompute_all_dr = (
    driver.Builder()
    .with_modules(changing_external_decorator_module)
    .with_cache(recompute=True)
    .build()
)

recompute_all_results = recompute_all_dr.execute(
    ["amount_per_country"],
    inputs={"cutoff_date": "2024-09-01"}
)
print()
print(recompute_all_results["amount_per_country"].head())
print()
recompute_all_dr.cache.view_run()
raw_data::adapter::execute_node
processed_data::adapter::execute_node
amount_per_country::adapter::execute_node
         amound_in_usd
country               
Brazil         77.9004
Canada        941.9570
Mexico         46.2170
USA          2959.7600
../../_images/07680997efcfb99a67e5c023937a860033cd2b884441c9f81fe04f687511ba15.svg

We see that all nodes were recomputed.

Setting default behavior¶

Once you enable caching using .with_cache(), it is a “opt-out” feature by default. This means all nodes are cached unless you set the DISABLE behavior via @cache or .with_cache(disable=[...]). This can become difficult to manage as the number of nodes increases.

You can make it an “opt-in” feature by setting default_behavior="disable" in .with_cache(). This way, you’re using caching, but only for nodes explicitly specified in @cache or .with_cache().

Here, we build a Driver with the changing_external_decorator_module, where raw_data was set to have behavior RECOMPUTE, and set the default behavior to DISABLE.

default_behavior_dr = (
    driver.Builder()
    .with_modules(changing_external_decorator_module)
    .with_cache(default_behavior="disable")
    .build()
)

default_behavior_results = default_behavior_dr.execute(
    ["amount_per_country"],
    inputs={"cutoff_date": "2024-09-01"}
)
print()
print(default_behavior_results["amount_per_country"].head())
print()
default_behavior_dr.cache.view_run()
raw_data::adapter::execute_node
processed_data::adapter::execute_node
amount_per_country::adapter::execute_node
         amound_in_usd
country               
Brazil         77.9004
Canada        941.9570
Mexico         46.2170
USA          2959.7600
../../_images/07680997efcfb99a67e5c023937a860033cd2b884441c9f81fe04f687511ba15.svg
default_behavior_dr.cache.behaviors[default_behavior_dr.cache.last_run_id]
{'amount_per_country': <CachingBehavior.DISABLE: 3>,
 'processed_data': <CachingBehavior.DISABLE: 3>,
 'raw_data': <CachingBehavior.RECOMPUTE: 2>,
 'cutoff_date': <CachingBehavior.DISABLE: 3>}

Materializers¶

NOTE. You can skip this section if you’re not using materializers.

DataLoader and DataSaver (collectively “materializers”) are special Hamilton nodes that connect your dataflow to external data (files, databases, etc.). These constructs are safe to use with caching and are complementary.

Caching

  • writing and reading shorter-term data to be used with the dataflow

  • strong connection between the code and the data

  • automatically handle multiple versions of the same dataset

Materializers

  • robust mechanism to read/write data from many sources

  • data isn’t necessarily meant to be used with Hamilton (e.g., loading from a warehouse, outputting a report).

  • typically outputs to a static destination; each write overwrites the previous stored dataset.

The next cell uses @dataloader and @datasaver decorators. In the visualization, we see the added raw_data.loader and saved_data nodes.

%%cell_to_module materializers_module -d
import pandas as pd
from hamilton.function_modifiers import dataloader, datasaver

DATA = {
    "cities": ["New York", "Los Angeles", "Chicago", "Montréal", "Vancouver", "Houston", "Phoenix", "Mexico City", "Chihuahua City", "Rio de Janeiro"],
    "date": ["2024-09-13", "2024-09-12", "2024-09-11", "2024-09-11", "2024-09-09", "2024-09-08", "2024-09-07", "2024-09-06", "2024-09-05", "2024-09-04"],
    "amount": [478.23, 251.67, 989.34, 742.14, 584.56, 321.85, 918.67, 135.22, 789.12, 432.78],
    "country": ["USA", "USA", "USA", "Canada", "Canada", "USA", "USA", "Mexico", "Mexico", "Brazil"],
    "currency": ["USD", "USD", "USD", "CAD", "CAD", "USD", "USD", "MXN", "MXN", "BRL"],
}

@dataloader()
def raw_data() -> tuple[pd.DataFrame, dict]:
    """Loading raw data. This simulates loading from a file, database, or external service."""
    data = pd.DataFrame(DATA)
    metadata = {"source": "notebook", "format": "json"}
    return data, metadata

def processed_data(raw_data: pd.DataFrame, cutoff_date: str) -> pd.DataFrame:
    """Filter out rows before cutoff date and convert currency to USD."""
    df = raw_data.loc[raw_data.date > cutoff_date].copy()
    df["amound_in_usd"] = df["amount"]
    df.loc[df.country == "Canada", "amound_in_usd"] *= 0.71  
    df.loc[df.country == "Brazil", "amound_in_usd"] *= 0.18
    df.loc[df.country == "Mexico", "amound_in_usd"] *= 0.05
    return df

def amount_per_country(processed_data: pd.DataFrame) -> pd.DataFrame:
    """Sum the amount in USD per country"""
    return processed_data.groupby("country")["amound_in_usd"].sum().to_frame()

@datasaver()
def saved_data(amount_per_country: pd.DataFrame) -> dict:
    amount_per_country.to_parquet("./saved_data.parquet")
    metadata = {"source": "notebook", "format": "parquet"}
    return metadata
../../_images/d5b1c3299cacd430c993398fa0cd643d6f0ea978b2f9933eb3b2fc2c37b1bb81.svg

Next, we build a Driver as usual.

materializers_dr = (
    driver.Builder()
    .with_modules(materializers_module)
    .with_cache()
    .build()
)

materializers_results = materializers_dr.execute(
    ["amount_per_country", "saved_data"],
    inputs={"cutoff_date": "2024-09-01"}
)
print()
print(materializers_results["amount_per_country"].head())
print()
materializers_dr.cache.view_run()
raw_data.loader::adapter::execute_node
raw_data::adapter::execute_node
processed_data::result_store::get_result::hit
amount_per_country::result_store::get_result::hit
saved_data::adapter::execute_node
         amound_in_usd
country               
Brazil         77.9004
Canada        941.9570
Mexico         46.2170
USA          2959.7600
../../_images/e7a0ef291e1a59efc83b5adb4ba842c91cf7cdb306aa04a0389ca2888867828d.svg

We execute the dataflow a second time to show that loaders and savers are just like any other node; they can be cached and retrieved.

materializers_results = materializers_dr.execute(
    ["amount_per_country", "saved_data"],
    inputs={"cutoff_date": "2024-09-01"}
)
print()
print(materializers_results["amount_per_country"].head())
print()
materializers_dr.cache.view_run()
raw_data.loader::result_store::get_result::hit
raw_data::result_store::get_result::hit
processed_data::result_store::get_result::hit
amount_per_country::result_store::get_result::hit
saved_data::result_store::get_result::hit
         amound_in_usd
country               
Brazil         77.9004
Canada        941.9570
Mexico         46.2170
USA          2959.7600
../../_images/99c1d46bf7e8abaf1b3a6e2c69718baa44d7df6f0111ea85047c04261204d103.svg

Usage patterns¶

Here are a few common scenarios:

Loading data is expensive: Your dataflow uses a DataLoader to get data from Snowflake. You want to load it once and cache it. When executing your dataflow, you want to use your cached copy to save query time, egress costs, etc.

  • Use the DEFAULT caching behavior for loaders.

Only save new data: You run the dataflow multiple times (maybe with different parameters or on a schedule) and only want to write to destination when the data changes.

  • Use the DEFAULT caching behavior for savers.

Always read the latest data: You want to use caching, but also ensure the dataflow always uses the latest data. This involves executing the DataLoader every time, get the data in-memory, version it, and then determine what needs to be executed (see Changing external data).

  • Use the RECOMPUTE caching behavior for loaders.

Use the parameters default_loader_behavior or default_saver_behavior of the .with_cache() clause to specify the behavior for all loaders or savers.

NOTE. The Caching + materializers tutorial notebook details how to achieve granular control over loader and saver behaviors.

materializers_dr_2 = (
    driver.Builder()
    .with_modules(materializers_module)
    .with_cache(
        default_loader_behavior="recompute",
        default_saver_behavior="disable"
    )
    .build()
)

materializers_results_2 = materializers_dr_2.execute(
    ["amount_per_country", "saved_data"],
    inputs={"cutoff_date": "2024-09-01"}
)
print()
print(materializers_results_2["amount_per_country"].head())
print()
materializers_dr_2.cache.view_run()
raw_data.loader::adapter::execute_node
raw_data::adapter::execute_node
processed_data::result_store::get_result::hit
amount_per_country::result_store::get_result::hit
saved_data::adapter::execute_node
         amound_in_usd
country               
Brazil         77.9004
Canada        941.9570
Mexico         46.2170
USA          2959.7600
../../_images/e7a0ef291e1a59efc83b5adb4ba842c91cf7cdb306aa04a0389ca2888867828d.svg
materializers_dr_2.cache.behaviors[materializers_dr_2.cache.last_run_id]
{'amount_per_country': <CachingBehavior.DEFAULT: 1>,
 'processed_data': <CachingBehavior.DEFAULT: 1>,
 'raw_data.loader': <CachingBehavior.RECOMPUTE: 2>,
 'raw_data': <CachingBehavior.RECOMPUTE: 2>,
 'saved_data': <CachingBehavior.DISABLE: 3>,
 'cutoff_date': <CachingBehavior.DEFAULT: 1>}

Changing the cache format¶

By default, results are stored in pickle format. It’s a convenient default but comes with caveats. You can use the @cache decorator to specify another file format for storing results.

By default this includes:

  • json

  • parquet

  • csv

  • excel

  • file

  • feather

  • orc

This feature uses DataLoader and DataSaver under the hood and supports all of the same formats (including your custom ones, as long as they take a path attribute).

This is an area of active development. Feel free to share suggestions and feedback!

The next cell sets processed_data to be cached using the parquet format.

%%cell_to_module cache_format_module
import pandas as pd
from hamilton.function_modifiers import cache

DATA = {
    "cities": ["New York", "Los Angeles", "Chicago", "Montréal", "Vancouver", "Houston", "Phoenix", "Mexico City", "Chihuahua City", "Rio de Janeiro"],
    "date": ["2024-09-13", "2024-09-12", "2024-09-11", "2024-09-11", "2024-09-09", "2024-09-08", "2024-09-07", "2024-09-06", "2024-09-05", "2024-09-04"],
    "amount": [478.23, 251.67, 989.34, 742.14, 584.56, 321.85, 918.67, 135.22, 789.12, 432.78],
    "country": ["USA", "USA", "USA", "Canada", "Canada", "USA", "USA", "Mexico", "Mexico", "Brazil"],
    "currency": ["USD", "USD", "USD", "CAD", "CAD", "USD", "USD", "MXN", "MXN", "BRL"],
}

def raw_data() -> pd.DataFrame:
    """Loading raw data. This simulates loading from a file, database, or external service."""
    return pd.DataFrame(DATA)

@cache(format="parquet")
def processed_data(raw_data: pd.DataFrame, cutoff_date: str) -> pd.DataFrame:
    """Filter out rows before cutoff date and convert currency to USD."""
    df = raw_data.loc[raw_data.date > cutoff_date].copy()
    df["amound_in_usd"] = df["amount"]
    df.loc[df.country == "Canada", "amound_in_usd"] *= 0.71  
    df.loc[df.country == "Brazil", "amound_in_usd"] *= 0.18
    df.loc[df.country == "Mexico", "amound_in_usd"] *= 0.05
    return df

def amount_per_country(processed_data: pd.DataFrame) -> pd.Series:
    """Sum the amount in USD per country"""
    return processed_data.groupby("country")["amound_in_usd"].sum()

When executing the dataflow, we see raw_data recomputed because it’s a dataloader. The result for processed_data will be retrieved, but it will be saved again as .parquet this time.

cache_format_dr = driver.Builder().with_modules(cache_format_module).with_cache().build()

cache_format_results = cache_format_dr.execute(["amount_per_country"], inputs={"cutoff_date": "2024-09-01"})
print()
print(cache_format_results["amount_per_country"].head())
print()
cache_format_dr.cache.view_run()
raw_data::result_store::get_result::hit
processed_data::adapter::execute_node
amount_per_country::adapter::execute_node
country
Canada     941.957
USA       1719.240
Name: amound_in_usd, dtype: float64
../../_images/9850128f869ecbd8f7874e39664dab865a795736f63314c2361b21895ce20f41.svg

Now, under the ./.hamilton_cache, there will be two results of the same name, one with the .parquet extension and one without. The one without is actually a pickeld DataLoader to retrieve the .parquet file.

You can access the path programmatically via the result_store._path_from_data_version(...) method.

data_version = cache_format_dr.cache.data_versions[cache_format_dr.cache.last_run_id]["processed_data"]
parquet_path = cache_format_dr.cache.result_store._path_from_data_version(data_version).with_suffix(".parquet")
parquet_path.exists()
True

Introspecting the cache¶

The Driver.cache stores information about all executions over its lifetime. Previous run_id are available through Driver.cache.run_ids and can be used in tandem without other utility functions:

  • Resolve the node caching behavior (e.g., “recompute”)

  • Access structured logs

  • Visualize the cache execution

Also, Driver.cache.last_run_id is a shortcut to the most recent execution.

cache_format_dr.cache.resolve_behaviors(cache_format_dr.cache.last_run_id)
{'amount_per_country': <CachingBehavior.DEFAULT: 1>,
 'processed_data': <CachingBehavior.DEFAULT: 1>,
 'raw_data': <CachingBehavior.DEFAULT: 1>,
 'cutoff_date': <CachingBehavior.DEFAULT: 1>}
run_logs = cache_format_dr.cache.logs(cache_format_dr.cache.last_run_id, level="debug")
for event in run_logs["processed_data"]:
    print(event)
processed_data::adapter::resolve_behavior
processed_data::adapter::set_cache_key
processed_data::adapter::get_cache_key::hit
processed_data::adapter::get_data_version::miss
processed_data::metadata_store::get_data_version::miss
processed_data::adapter::execute_node
processed_data::adapter::set_data_version
processed_data::metadata_store::set_data_version
processed_data::adapter::get_cache_key::hit
processed_data::adapter::get_data_version::hit
processed_data::result_store::set_result
processed_data::adapter::get_data_version::hit
processed_data::adapter::resolve_behavior
# for `.view_run()` passing no parameter is equivalent to the last `run_id`
cache_format_dr.cache.view_run(cache_format_dr.cache.last_run_id)
../../_images/9850128f869ecbd8f7874e39664dab865a795736f63314c2361b21895ce20f41.svg

Interactively explore runs¶

By using ipywidgets we can easily build a widget to iterate over run_id values and display cache information. Below, we create a Driver and execute it a few times to generate data then inspect it with a widget.

interactive_dr = driver.Builder().with_modules(cache_format_module).with_cache().build()

interactive_dr.execute(["amount_per_country"], inputs={"cutoff_date": "2024-09-01"})
interactive_dr.execute(["amount_per_country"], inputs={"cutoff_date": "2024-09-05"})
interactive_dr.execute(["amount_per_country"], inputs={"cutoff_date": "2024-09-10"})
interactive_dr.execute(["amount_per_country"], inputs={"cutoff_date": "2024-09-11"})
interactive_dr.execute(["amount_per_country"], inputs={"cutoff_date": "2024-09-13"})
raw_data::result_store::get_result::hit
processed_data::result_store::get_result::hit
amount_per_country::result_store::get_result::hit
raw_data::result_store::get_result::hit
processed_data::adapter::execute_node
amount_per_country::result_store::get_result::hit
raw_data::result_store::get_result::hit
processed_data::adapter::execute_node
amount_per_country::adapter::execute_node
raw_data::result_store::get_result::hit
processed_data::adapter::execute_node
amount_per_country::adapter::execute_node
raw_data::result_store::get_result::hit
processed_data::adapter::execute_node
amount_per_country::adapter::execute_node
{'amount_per_country': Series([], Name: amound_in_usd, dtype: float64)}

The following cell allows you to click-and-drag or use arrow-keys to navigate

from IPython.display import display
from ipywidgets import SelectionSlider, interact


@interact(run_id=SelectionSlider(options=interactive_dr.cache.run_ids))
def iterate_over_runs(run_id):
    display(interactive_dr.cache.data_versions[run_id])
    display(interactive_dr.cache.view_run(run_id=run_id))

Managing storage¶

Setting the cache path¶

By default, metadata and results are stored under ./.hamilton_cache, relative to the current directory at execution time. You can also manually set the directory via .with_cache(path=...) to isolate or centralize cache storage between dataflows or projects.

Running the next cell will create the directory ./my_other_cache.

manual_path_dr = driver.Builder().with_modules(cache_format_module).with_cache(path="./my_other_cache").build()

Instantiating the result_store and metadata_store¶

If you need to store metadata and results in separate locations, you can do so by instantiating the result_store and metadata_store manually with their own configuration. In this case, setting .with_cache(path=...) would be ignored.

from hamilton.caching.stores.file import FileResultStore
from hamilton.caching.stores.sqlite import SQLiteMetadataStore

result_store = FileResultStore(path="./results")
metadata_store = SQLiteMetadataStore(path="./metadata")

manual_stores_dr = (
    driver.Builder()
    .with_modules(cache_format_module)
    .with_cache(
        result_store=result_store,
        metadata_store=metadata_store,
    )
    .build()
)

Deleting data and recovering storage¶

As you use caching, you might be generating a lot of data that you don’t need anymore. One straightforward solution is to delete the entire directory where metadata and results are stored.

You can also programmatically call .delete_all() on the result_store and metadata_store, which should reclaim most storage. If you delete results, make sure to also delete metadata. The caching mechanism should figure it out, but it’s safer to keep them in sync.

manual_stores_dr.cache.metadata_store.delete_all()
manual_stores_dr.cache.result_store.delete_all()

Usage patterns¶

As demonstrated here, caching works great in a notebook environment.

  • In addition to iteration speed, caching allows you to restart your kernel or shutdown your computer for the day without worry. When you’ll come back, you will still be able to retrieve results from cache.

  • A similar benefit is the ability resume execution between environments. For example, you might be running Hamilton in a script, but when a bug happens you can reload these values in a notebook and investigate.

  • Caching works great with other adapters like the HamiltonTracker that powers the Hamilton UI and the MLFlowTracker for experiment tracking.

🚧 INTERNALS¶

If you’re curious the following sections provide details about the caching internals. These APIs are not public and may change without notice.

Manually retrieve results¶

Using the Driver.cache you can directly retrieve results from previous executions. The cache stores “data versions” which are keys for the result_store.

Here, we get the run_id for the 4th execution (index 3) and the data version for processed_data before retrieving its value.

run_id = interactive_dr.cache.run_ids[3]
data_version = interactive_dr.cache.data_versions[run_id]["processed_data"]
result = interactive_dr.cache.result_store.get(data_version)
print(result)
        cities        date  amount country currency  amound_in_usd
0     New York  2024-09-13  478.23     USA      USD         478.23
1  Los Angeles  2024-09-12  251.67     USA      USD         251.67

Decoding the cache_key¶

By now, you should have a better grasp on how Hamilton’s caching determines when to execute a node. Internally, it creates a cache_key from the code_version of the node and the data_version of each dependency. The cache keys are stored on the Driver.cache and can be decoded for introspection and debugging.

Here, we get the run_id for the 3rd execution (index 2) and the cache key for amount_per_country. We then use decode_key() to retrieve the node_name, code_version, and dependencies_data_versions.

from hamilton.caching.cache_key import decode_key

run_id = interactive_dr.cache.run_ids[2]
cache_key = interactive_dr.cache.cache_keys[run_id]["amount_per_country"]
decode_key(cache_key)
{'node_name': 'amount_per_country',
 'code_version': 'c2ccafa54280fbc969870b6baa445211277d7e8cfa98a0821836c175603ffda2',
 'dependencies_data_versions': {'processed_data': 'WgV5-4SfdKTfUY66x-msj_xXsKNPNTP2guRhfw=='}}

Indeed, this match the data version for processed_data for the 3rd execution.

interactive_dr.cache.data_versions[run_id]["processed_data"]
'WgV5-4SfdKTfUY66x-msj_xXsKNPNTP2guRhfw=='

Manually retrieve metadata¶

In addition to the result_store, there is a metadata_store that contains mapping between cache_key and data_version (cache keys are unique, but many can point to the same data).

Using the knowledge from the previous section, we can use the cache key for amount_per_country to retrieve its data_version and result. It’s also possible to decode its cache_key, and get the data_version for its dependencies, making the node execution reproducible.

run_id = interactive_dr.cache.run_ids[2]
cache_key = interactive_dr.cache.cache_keys[run_id]["amount_per_country"]
amount_data_version = interactive_dr.cache.metadata_store.get(cache_key)
amount_result = interactive_dr.cache.result_store.get(amount_data_version)
print(amount_result)
country
Canada     526.9194
USA       1719.2400
Name: amound_in_usd, dtype: float64
for dep_name, dependency_data_version in decode_key(cache_key)["dependencies_data_versions"].items():
    dep_result = interactive_dr.cache.result_store.get(dependency_data_version)
    print(dep_name)
    print(dep_result)
    print()
processed_data
        cities        date  amount country currency  amound_in_usd
0     New York  2024-09-13  478.23     USA      USD         478.23
1  Los Angeles  2024-09-12  251.67     USA      USD         251.67
2      Chicago  2024-09-11  989.34     USA      USD         989.34
3     Montréal  2024-09-11  742.14  Canada      CAD       526.9194