pipe family¶

We have a family of decorators that represent a chained set of transformations. This specifically solves the “node redefinition” problem, and is meant to represent a pipeline of chaining/redefinitions. This is similar (and can happily be used in conjunction with) pipe in pandas. In Pyspark this is akin to the common operation of redefining a dataframe with new columns.

For some examples have a look at: https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/scikit-learn/species_distribution_modeling

While it is generally reasonable to contain constructs within a node’s function, you should consider the pipe family for any of the following reasons:

1. You want the transformations to display as nodes in the DAG, with the possibility of storing or visualizing the result.

  1. You want to pull in functions from an external repository, and build the DAG a little more procedurally.

3. You want to use the same function multiple times, but with different parameters – while @does / @parameterize can do this, this presents an easier way to do this, especially in a chain.


Reference Documentation

pipe¶

DeprecationWarning from 2.0.0: use pipe_input instead

class hamilton.function_modifiers.macros.pipe(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_input: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶
__init__(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_input: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶

Instantiates a @pipe_input decorator.

Parameters:
  • transforms – step transformations to be applied, in order

  • namespace – namespace to apply to all nodes in the pipe. This can be “…” (the default), which resolves to the name of the decorated function, None (which means no namespace), or a string (which means that all nodes will be namespaced with that string). Note that you can either use this or namespaces inside pipe_input()…

  • on_input – setting the target parameter for all steps in the pipe. Leave empty to select only the first argument.

  • collapse – Whether to collapse this into a single node. This is not currently supported.

  • _chain – Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. @flow will make use of this.

pipe_input¶

class hamilton.function_modifiers.macros.pipe_input(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_input: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶

Running a series of transformations on the input of the function.

To demonstrate the rules for chaining nodes, we’ll be using the following example. This is using primitives to demonstrate, but as hamilton is just functions of any python objects, this works perfectly with dataframes, series, etc…

from hamilton.function_modifiers import step, pipe_input, value, source


def _add_one(x: int) -> int:
    return x + 1


def _sum(x: int, y: int) -> int:
    return x + y


def _multiply(x: int, y: int, z: int = 10) -> int:
    return x * y * z


@pipe_input(
    step(_add_one),
    step(_multiply, y=2),
    step(_sum, y=value(3)),
    step(_multiply, y=source("upstream_node_to_multiply")),
)
def final_result(upstream_int: int) -> int:
    return upstream_int
upstream_int = ...  # result from upstream
upstream_node_to_multiply = ...  # result from upstream

output = final_result(
    _multiply(
        _sum(
            _multiply(
                _add_one(upstream_int),
                y=2
            ),
            y=3
        ),
        y=upstream_node_to_multiply
    )
)
upstream_int = ...  # result from upstream
upstream_node_to_multiply = ...  # result from upstream

one_added = _add_one(upstream_int)
multiplied = _multiply(one_added, y=2)
summed = _sum(multiplied, y=3)
multiplied_again = _multiply(summed, y=upstream_node_to_multiply)
output = final_result(multiplied_again)

Note that functions must have no position-only arguments (this is rare in python, but hamilton does not handle these). This basically means that the functions must be defined similarly to def fn(x, y, z=10) and not def fn(x, y, /, z=10). In fact, all arguments must be named and “kwarg-friendly”, meaning that the function can happily be called with **kwargs, where kwargs are some set of resolved upstream values. So, no *args are allowed, and **kwargs (variable keyword-only) are not permitted. Note that this is not a design limitation, rather an implementation detail – if you feel like you need this, please reach out.

Furthermore, the function should be typed, as a Hamilton function would be.

One has three ways to tune the shape/implementation of the subsequent nodes:

  1. when/when_not/when_in/when_not_in – these are used to filter the application of the function.

    This is valuable to reflect if/else conditions in the structure of the DAG, pulling it out of functions, rather than buried within the logic itself. It is functionally equivalent to @config.when.

    For instance, if you want to include a function in the chain only when a config parameter is set to a certain value, you can do:

    @pipe_input(
        step(_add_one).when(foo="bar"),
        step(_add_two, y=source("other_node_to_add").when(foo="baz"),
    )
    def final_result(upstream_int: int) -> int:
        return upstream_int
    

    This will only apply the first function when the config parameter foo is set to bar, and the second when it is set to baz.

  2. named – this is used to name the node. This is useful if you want to refer to intermediate results.

    If this is left out, hamilton will automatically name the functions in a globally unique manner. The names of these functions will not necessarily be stable/guaranteed by the API, so if you want to refer to them, you should use named. The default namespace will always be the name of the decorated function (which will be the last node in the chain).

    named takes in two parameters – required is the name – this will assign the nodes with a single name and no global namespace. For instance:

    @pipe_input(
        step(_add_one).named("a"),
        step(_add_two, y=source("upstream_node")).named("b"),
    )
    def final_result(upstream_int: int) -> int:
        return upstream_int
    

    The above will create two nodes, a and b. a will be the result of _add_one, and b will be the result of _add_two. final_result will then be called with the output of b. Note that, if these are part of a namespaced operation (a subdag, in particular), they will get the same namespace as the subdag.

    The second parameter is namespace. This is used to specify a namespace for the node. This is useful if you want to either (a) ensure that the nodes are namespaced but share a common one to avoid name clashes (usual case), or (b) if you want a custom namespace (unusual case). To indicate a custom namespace, one need simply pass in a string.

    To indicate that a node should share a namespace with the rest of the step(…) operations in a pipe, one can pass in ... (the ellipsis).

      @pipe_input(
          step(_add_one).named("a", namespace="foo"),  # foo.a
          step(_add_two, y=source("upstream_node")).named("b", namespace=...),  # final_result.b
      )
      def final_result(upstream_int: int) -> int:
          return upstream_int
    

    Note that if you pass a namespace argument to the pipe_input function, it will set the namespace on each step operation. This is useful if you want to ensure that all the nodes in a pipe have a common namespace, but you want to rename them.

    @pipe_input(
        step(_add_one).named("a"), # a
        step(_add_two, y=source("upstream_node")).named("b"), # foo.b
        namespace=..., # default -- final_result.a and final_result.b, OR
        namespace=None, # no namespace -- a and b are exposed as that, OR
        namespace="foo", # foo.a and foo.b
    )
    def final_result(upstream_int: int) -> int:
        return upstream_int
    

    In all likelihood, you should not be using this, and this is only here in case you want to expose a node for consumption/output later. Setting the namespace in individual nodes as well as in pipe_input is not yet supported.

  3. on_input – this selects which input we will run the pipeline on.

    In case on_input is set to None (default), we apply pipe_input on the first parameter. Let us know if you wish to expand to other use-cases. You can track the progress on this topic via: https://github.com/DAGWorks-Inc/hamilton/issues/1177

    The following would apply function _add_one and _add_two to p2:

    @pipe_input(
        step(_add_one)
        step(_add_two, y=source("upstream_node")),
        on_input = "p2"
    )
    def final_result(p1: int, p2: int, p3: int) -> int:
        return upstream_int
    
__init__(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_input: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶

Instantiates a @pipe_input decorator.

Parameters:
  • transforms – step transformations to be applied, in order

  • namespace – namespace to apply to all nodes in the pipe. This can be “…” (the default), which resolves to the name of the decorated function, None (which means no namespace), or a string (which means that all nodes will be namespaced with that string). Note that you can either use this or namespaces inside pipe_input()…

  • on_input – setting the target parameter for all steps in the pipe. Leave empty to select only the first argument.

  • collapse – Whether to collapse this into a single node. This is not currently supported.

  • _chain – Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. @flow will make use of this.

pipe_output¶

class hamilton.function_modifiers.macros.pipe_output(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_output: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶

Running a series of transformation on the output of the function.

The decorated function declares the dependency, the body of the function gets executed, and then we run a series of transformations on the result of the function specified by pipe_output.

If we have nodes A –> B –> C in the DAG and decorate B with pipe_output like

@pipe_output(
    step(B1),
    step(B2)
)
def B(...):
    return ...

we obtain the new DAG A –> B.raw –> B1 –> B2 –> B –> C, where we can think of the B.raw –> B1 –> B2 –> B as a “pipe” that takes the raw output of B, applies to it B1, takes the output of B1 applies to it B2 and then gets renamed to B to re-connect to the rest of the DAG.

The rules for chaining nodes are the same as for pipe_input.

For extra control in case of multiple output nodes, for example after extract_field/ extract_columns we can also specify the output node that we wish to mutate. The following apply A to all fields while B only to field_1

@extract_columns("col_1", "col_2")
def A(...):
    return ...

def B(...):
    return ...


 @pipe_output(
    step(A),
    step(B).on_output("field_1"),
)
@extract_fields(
        {"field_1":int, "field_2":int, "field_3":int}
)
def foo(a:int)->Dict[str,int]:
    return {"field_1":1, "field_2":2, "field_3":3}

We can also do this on the global level (but cannot do on both levels at the same time). The following would apply function A and function B to only field_1 and field_2

@pipe_output(
    step(A),
    step(B),
    on_output = ["field_1","field_2]
)
@extract_fields(
        {"field_1":int, "field_2":int, "field_3":int}
)
def foo(a:int)->Dict[str,int]:
    return {"field_1":1, "field_2":2, "field_3":3}
__init__(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_output: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶

Instantiates a @pipe_output decorator.

Warning: if there is a global pipe_output target, the individual step(...).target would only choose from the subset pre-selected from the global pipe_output target. We have disabled this for now to avoid confusion. Leave global pipe_output target empty if you want to choose between all the nodes on the individual step level.

Parameters:
  • transforms – step transformations to be applied, in order

  • namespace – namespace to apply to all nodes in the pipe. This can be “…” (the default), which resolves to the name of the decorated function, None (which means no namespace), or a string (which means that all nodes will be namespaced with that string). Note that you can either use this or namespaces inside pipe_output()…

  • on_output – setting the target node for all steps in the pipe. Leave empty to select all the output nodes.

  • collapse – Whether to collapse this into a single node. This is not currently supported.

  • _chain – Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. @flow will make use of this.

mutate¶

class hamilton.function_modifiers.macros.mutate(*target_functions: Applicable | Callable, collapse: bool = False, _chain: bool = False, **mutating_function_kwargs: SingleDependency | Any)¶

Running a transformation on the outputs of a series of functions.

This is closely related to pipe_output as it effectively allows you to run transformations on the output of a node without touching that node. We choose which target functions we wish to mutate by the transformation we are decorating. For now, the target functions, that will be mutated, have to be in the same module (come speak to us if you need this capability over multiple modules).

We suggest you define them with an prefixed underscore to only have them displayed in the transform pipeline of the target node.

If we wish to apply _transform1 to the output of A and B and _transform2 only to the output of node B, we can do this like

def A(...):
    return ...

def B(...):
    return ...

@mutate(A, B)
def _transform1(...):
    return ...

@mutate(B)
def _transform2(...):
    return ...

we obtain the new pipe-like subDAGs A.raw –> _transform1 –> A and B.raw –> _transform1 –> _transform2 –> B, where the behavior is the same as pipe_output.

While it is generally reasonable to use pipe_output, you should consider mutate in the following scenarios:

  1. Loading data and applying pre-cleaning step.

  2. Feature engineering via joining, filtering, sorting, etc.

  3. Experimenting with different transformations across nodes by selectively turning transformations on / off.

We assume the first argument of the decorated function to be the output of the function we are targeting. For transformations with multiple arguments you can use key word arguments coupled with step or value the same as with other pipe-family decorators

@mutate(A, B, arg2=step('upstream_node'), arg3=value(some_literal), ...)
def _transform1(output_from_target:correct_type, arg2:arg2_type, arg3:arg3_type,...):
    return ...

You can also select individual args that will be applied to each target node by adding apply_to(...)

@mutate(
        apply_to(A, arg2=step('upstream_node_1'), arg3=value(some_literal_1)),
        apply_to(B, arg2=step('upstream_node_2'), arg3=value(some_literal_2)),
        )
def _transform1(output_from_target:correct_type, arg2:arg2_type, arg3:arg3_type, ...):
    return ...

In case of multiple output nodes, for example after extract_field / extract_columns we can also specify the output node that we wish to mutate. The following would mutate all columns of A individually while in the case of function B only field_1

@extract_columns("col_1", "col_2")
def A(...):
    return ...

@extract_fields(
        {"field_1":int, "field_2":int, "field_3":int}
)
def B(...):
    return ...

@mutate(
    apply_to(A),
    apply_to(B).on_output("field_1"),
    )

def foo(a:int)->Dict[str,int]:
    return {"field_1":1, "field_2":2, "field_3":3}
__init__(*target_functions: Applicable | Callable, collapse: bool = False, _chain: bool = False, **mutating_function_kwargs: SingleDependency | Any)¶

Instantiates a mutate decorator.

We assume the first argument of the decorated function to be the output of the function we are targeting.

Parameters:
  • target_functions – functions we wish to mutate the output of

  • collapse – Whether to collapse this into a single node. This is not currently supported.

  • _chain – Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. @flow will make use of this.

  • **mutating_function_kwargs – other kwargs that the decorated function has. Must be validly called as f(**kwargs), and have a 1-to-1 mapping of kwargs to parameters. This will be applied for all target_functions, unless apply_to already has the mutator function kwargs, in which case it takes those.