pipe#

Reference Documentation

class hamilton.function_modifiers.macros.pipe(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, collapse=False, _chain=False)#

Decorator to represent a chained set of transformations. This specifically solves the “node redefinition” problem, and is meant to represent a pipeline of chaining/redefinitions. This is similar (and can happily be used in conjunction with) pipe in pandas. In Pyspark this is akin to the common operation of redefining a dataframe with new columns. While it is generally reasonable to contain these constructs within a node’s function, you should consider pipe for any of the following reasons:

1. You want the transformations to display as nodes in the DAG, with the possibility of storing or visualizing the result 2. You want to pull in functions from an external repository, and build the DAG a little more procedurally 3. You want to use the same function multiple times, but with different parameters – while @does/@parameterize can do this, this presents an easier way to do this, especially in a chain.

To demonstrate the rules for chaining nodes, we’ll be using the following example. This is using primitives to demonstrate, but as hamilton is just functions of any python objects, this works perfectly with dataframes, series, etc…

from hamilton.function_modifiers import step, pipe, value, source

def _add_one(x: int) -> int:
    return x + 1

def _sum(x: int, y: int) -> int:
    return x + y

def _multiply(x: int, y: int, z: int=10) -> int:
    return x * y * z

@pipe(
    step(_add_one),
    step(_multiply, y=2),
    step(_sum, y=value(3)),
    step(_multiply, y=source("upstream_node_to_multiply")),
)
def final_result(upstream_int: int) -> int:
    return upstream_int
upstream_int = ... # result from upstream
upstream_node_to_multiply = ... # result from upstream

output = final_result(
    _multiply(
        _sum(
            _multiply(
                _add_one(upstream_int),
                y=2
            ),
            y=3
        ),
        y=upstream_node_to_multiply
    )
)
upstream_int = ... # result from upstream
upstream_node_to_multiply = ... # result from upstream

one_added = _add_one(upstream_int)
multiplied = _multiply(one_added, y=2)
summed = _sum(multiplied, y=3)
multiplied_again = _multiply(summed, y=upstream_node_to_multiply)
output = final_result(multiplied_again)

Note that functions must have no position-only arguments (this is rare in python, but hamilton does not handle these). This basically means that the functions must be defined similarly to def fn(x, y, z=10) and not def fn(x, y, /, z=10). In fact, all arguments must be named and “kwarg-friendly”, meaning that the function can happily be called with **kwargs, where kwargs are some set of resolved upstream values. So, no *args are allowed, and **kwargs (variable keyword-only) are not permitted. Note that this is not a design limitation, rather an implementation detail – if you feel like you need this, please reach out.

Furthermore, the function should be typed, as a Hamilton function would be.

One has two ways to tune the shape/implementation of the subsequent nodes:

  1. when/when_not/when_in/when_not_in – these are used to filter the application of the function. This is valuable to reflect

    if/else conditions in the structure of the DAG, pulling it out of functions, rather than buried within the logic itself. It is functionally equivalent to @config.when.

    For instance, if you want to include a function in the chain only when a config parameter is set to a certain value, you can do:

    @pipe(
        step(_add_one).when(foo="bar"),
        step(_add_two, y=source("other_node_to_add").when(foo="baz"),
    )
    def final_result(upstream_int: int) -> int:
        return upstream_int
    

    This will only apply the first function when the config parameter foo is set to bar, and the second when it is set to baz.

  2. named – this is used to name the node. This is useful if you want to refer to intermediate results. If this is left out,

    hamilton will automatically name the functions in a globally unique manner. The names of these functions will not necessarily be stable/guaranteed by the API, so if you want to refer to them, you should use named. The default namespace will always be the name of the decorated function (which will be the last node in the chain).

    named takes in two parameters – required is the name – this will assign the nodes with a single name and no global namespace. For instance:

    @pipe(
        step(_add_one).named("a"),
        step(_add_two, y=source("upstream_node")).named("b"),
    )
    def final_result(upstream_int: int) -> int:
        return upstream_int
    

    The above will create two nodes, a and b. a will be the result of _add_one, and b will be the result of _add_two. final_result will then be called with the output of b. Note that, if these are part of a namespaced operation (a subdag, in particular), they will get the same namespace as the subdag.

    The second parameter is namespace. This is used to specify a namespace for the node. This is useful if you want to either (a) ensure that the nodes are namespaced but share a common one to avoid name clashes (usual case), or (b) if you want a custom namespace (unusual case). To indicate a custom namespace, one need simply pass in a string.

    To indicate that a node should share a namespace with the rest of the step(…) operations in a pipe, one can pass in (the ellipsis).

      @pipe(
          step(_add_one).named("a", namespace="foo"), # foo.a
          step(_add_two, y=source("upstream_node")).named("b", namespace=...), # final_result.b
      )
      def final_result(upstream_int: int) -> int:
          return upstream_int
    

    Note that if you pass a namespace argument to the pipe function, it will set the namespace on each step operation. This is useful if you want to ensure that all the nodes in a pipe have a common namespace, but you want to rename them.

    @pipe(
        step(_add_one).named("a"), # a
        step(_add_two, y=source("upstream_node")).named("b"), # foo.b
        namespace=..., # default -- final_result.a and final_result.b, OR
        namespace=None, # no namespace -- a and b are exposed as that, OR
        namespace="foo", # foo.a and foo.b
    )
    def final_result(upstream_int: int) -> int:
        return upstream_int
    

    In all likelihood, you should not be using this, and this is only here in case you want to expose a node for consumption/output later. Setting the namespace in individual nodes as well as in pipe is not yet supported.

__init__(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, collapse=False, _chain=False)#

Instantiates a @pipe decorator.

Parameters:
  • transforms – step transformations to be applied, in order

  • namespace – namespace to apply to all nodes in the pipe. This can be “…” (the default), which resolves to the name of the decorated function, None (which means no namespace), or a string (which means that all nodes will be namespaced with that string). Note that you can either use this or namespaces inside pipe()…

  • collapse – Whether to collapse this into a single node. This is not currently supported.

  • _chain – Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. @flow will make use of this.