subdag

Reference Documentation

class hamilton.function_modifiers.subdag(*load_from: ModuleType | Callable, inputs: Dict[str, ParametrizedDependency] = None, config: Dict[str, Any] = None, namespace: str = None, final_node_name: str = None, external_inputs: List[str] = None)

The @subdag decorator enables you to rerun components of your DAG with varying parameters. That is, it enables you to β€œchain” what you could express with a driver into a single DAG.

That is, instead of using Hamilton within itself:

def feature_engineering(source_path: str) -> pd.DataFrame:
    '''You could recursively use Hamilton within itself.'''
    dr = driver.Driver({}, feature_modules)
    df = dr.execute(["feature_df"], inputs={"path": source_path})
    return df

You instead can use the @subdag decorator to do the same thing, with the added benefit of visibility into the whole DAG:

@subdag(
    feature_modules,
    inputs={"path": source("source_path")},
    config={}
)
def feature_engineering(feature_df: pd.DataFrame) -> pd.DataFrame:
    return feature_df

Note that this is immensely powerful – if we draw analogies from Hamilton to standard procedural programming paradigms, we might have the following correspondence:

  • config.when + friends – if/else statements

  • parameterize/extract_columns – for loop

  • does – effectively macros

And so on. @subdag takes this one step further:

  • @subdag – subroutine definition

E.G. take a certain set of nodes, and run them with specified parameters.

@subdag declares parameters that are outputs of its subdags. Note that, if you want to use outputs of other components of the DAG, you can use the external_inputs parameter to declare the parameters that do not come from the subDAG.

Why might you want to use this? Let’s take a look at some examples:

  1. You have a feature engineering pipeline that you want to run on multiple datasets. If its exactly the same, this is perfect. If not, this works perfectly as well, you just have to utilize different functions in each or the config.when + config parameter to rerun it.

  2. You want to train multiple models in the same DAG that share some logic (in features or training) – this allows you to reuse and continually add more.

  3. You want to combine multiple similar DAGs (e.g. one for each business line) into one so you can build a cross-business line model.

This basically bridges the gap between the flexibility of non-declarative pipelining frameworks with the readability/maintainability of declarative ones.

__init__(*load_from: ModuleType | Callable, inputs: Dict[str, ParametrizedDependency] = None, config: Dict[str, Any] = None, namespace: str = None, final_node_name: str = None, external_inputs: List[str] = None)

Adds a subDAG to the main DAG.

Parameters:
  • load_from – The functions that will be used to generate this subDAG.

  • inputs – Parameterized dependencies to inject into all sources of this subDAG. This should not be an intermediate node in the subDAG.

  • config – A configuration dictionary for just this subDAG. Note that this passed in value takes precedence over the DAG’s config.

  • namespace – Namespace with which to prefix nodes. This is optional – if not included, this will default to the function name.

  • final_node_name – Name of the final node in the subDAG. This is optional – if not included, this will default to the function name.

  • external_inputs – Parameters in the function that are not produced by the functions passed to the subdag. This is useful if you want to perform some logic with other inputs in the subdag’s processing function. Note that this is currently required to differentiate and clarify the inputs to the subdag.