pipe family¶
We have a family of decorators that represent a chained set of transformations. This specifically solves the “node redefinition”
problem, and is meant to represent a pipeline of chaining/redefinitions. This is similar (and can happily be
used in conjunction with) pipe
in pandas. In Pyspark this is akin to the common operation of redefining a dataframe
with new columns.
For some examples have a look at: https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/scikit-learn/species_distribution_modeling
While it is generally reasonable to contain constructs within a node’s function, you should consider the pipe family for any of the following reasons:
1. You want the transformations to display as nodes in the DAG, with the possibility of storing or visualizing the result.
You want to pull in functions from an external repository, and build the DAG a little more procedurally.
3. You want to use the same function multiple times, but with different parameters – while @does
/ @parameterize
can
do this, this presents an easier way to do this, especially in a chain.
Reference Documentation
pipe¶
DeprecationWarning from 2.0.0: use pipe_input instead
- class hamilton.function_modifiers.macros.pipe(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_input: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶
- __init__(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_input: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶
Instantiates a
@pipe_input
decorator.- Parameters:
transforms – step transformations to be applied, in order
namespace – namespace to apply to all nodes in the pipe. This can be “…” (the default), which resolves to the name of the decorated function, None (which means no namespace), or a string (which means that all nodes will be namespaced with that string). Note that you can either use this or namespaces inside
pipe_input()
…on_input – setting the target parameter for all steps in the pipe. Leave empty to select only the first argument.
collapse – Whether to collapse this into a single node. This is not currently supported.
_chain – Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed.
@flow
will make use of this.
pipe_input¶
- class hamilton.function_modifiers.macros.pipe_input(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_input: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶
Running a series of transformations on the input of the function.
To demonstrate the rules for chaining nodes, we’ll be using the following example. This is using primitives to demonstrate, but as hamilton is just functions of any python objects, this works perfectly with dataframes, series, etc…
from hamilton.function_modifiers import step, pipe_input, value, source def _add_one(x: int) -> int: return x + 1 def _sum(x: int, y: int) -> int: return x + y def _multiply(x: int, y: int, z: int = 10) -> int: return x * y * z @pipe_input( step(_add_one), step(_multiply, y=2), step(_sum, y=value(3)), step(_multiply, y=source("upstream_node_to_multiply")), ) def final_result(upstream_int: int) -> int: return upstream_int
upstream_int = ... # result from upstream upstream_node_to_multiply = ... # result from upstream output = final_result( _multiply( _sum( _multiply( _add_one(upstream_int), y=2 ), y=3 ), y=upstream_node_to_multiply ) )
upstream_int = ... # result from upstream upstream_node_to_multiply = ... # result from upstream one_added = _add_one(upstream_int) multiplied = _multiply(one_added, y=2) summed = _sum(multiplied, y=3) multiplied_again = _multiply(summed, y=upstream_node_to_multiply) output = final_result(multiplied_again)
Note that functions must have no position-only arguments (this is rare in python, but hamilton does not handle these). This basically means that the functions must be defined similarly to
def fn(x, y, z=10)
and notdef fn(x, y, /, z=10)
. In fact, all arguments must be named and “kwarg-friendly”, meaning that the function can happily be called with**kwargs
, where kwargs are some set of resolved upstream values. So, no*args
are allowed, and**kwargs
(variable keyword-only) are not permitted. Note that this is not a design limitation, rather an implementation detail – if you feel like you need this, please reach out.Furthermore, the function should be typed, as a Hamilton function would be.
One has three ways to tune the shape/implementation of the subsequent nodes:
when
/when_not
/when_in
/when_not_in
– these are used to filter the application of the function.This is valuable to reflect if/else conditions in the structure of the DAG, pulling it out of functions, rather than buried within the logic itself. It is functionally equivalent to
@config.when
.For instance, if you want to include a function in the chain only when a config parameter is set to a certain value, you can do:
@pipe_input( step(_add_one).when(foo="bar"), step(_add_two, y=source("other_node_to_add").when(foo="baz"), ) def final_result(upstream_int: int) -> int: return upstream_int
This will only apply the first function when the config parameter
foo
is set tobar
, and the second when it is set tobaz
.
named
– this is used to name the node. This is useful if you want to refer to intermediate results.If this is left out, hamilton will automatically name the functions in a globally unique manner. The names of these functions will not necessarily be stable/guaranteed by the API, so if you want to refer to them, you should use
named
. The default namespace will always be the name of the decorated function (which will be the last node in the chain).named
takes in two parameters – required is thename
– this will assign the nodes with a single name and no global namespace. For instance:@pipe_input( step(_add_one).named("a"), step(_add_two, y=source("upstream_node")).named("b"), ) def final_result(upstream_int: int) -> int: return upstream_int
The above will create two nodes,
a
andb
.a
will be the result of_add_one
, andb
will be the result of_add_two
.final_result
will then be called with the output ofb
. Note that, if these are part of a namespaced operation (a subdag, in particular), they will get the same namespace as the subdag.The second parameter is
namespace
. This is used to specify a namespace for the node. This is useful if you want to either (a) ensure that the nodes are namespaced but share a common one to avoid name clashes (usual case), or (b) if you want a custom namespace (unusual case). To indicate a custom namespace, one need simply pass in a string.To indicate that a node should share a namespace with the rest of the step(…) operations in a pipe, one can pass in
...
(the ellipsis).@pipe_input( step(_add_one).named("a", namespace="foo"), # foo.a step(_add_two, y=source("upstream_node")).named("b", namespace=...), # final_result.b ) def final_result(upstream_int: int) -> int: return upstream_int
Note that if you pass a namespace argument to the
pipe_input
function, it will set the namespace on each step operation. This is useful if you want to ensure that all the nodes in a pipe have a common namespace, but you want to rename them.@pipe_input( step(_add_one).named("a"), # a step(_add_two, y=source("upstream_node")).named("b"), # foo.b namespace=..., # default -- final_result.a and final_result.b, OR namespace=None, # no namespace -- a and b are exposed as that, OR namespace="foo", # foo.a and foo.b ) def final_result(upstream_int: int) -> int: return upstream_int
In all likelihood, you should not be using this, and this is only here in case you want to expose a node for consumption/output later. Setting the namespace in individual nodes as well as in
pipe_input
is not yet supported.
on_input
– this selects which input we will run the pipeline on.In case
on_input
is set to None (default), we applypipe_input
on the first parameter. Let us know if you wish to expand to other use-cases. You can track the progress on this topic via: https://github.com/DAGWorks-Inc/hamilton/issues/1177The following would apply function _add_one and _add_two to
p2
:@pipe_input( step(_add_one) step(_add_two, y=source("upstream_node")), on_input = "p2" ) def final_result(p1: int, p2: int, p3: int) -> int: return upstream_int
- __init__(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_input: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶
Instantiates a
@pipe_input
decorator.- Parameters:
transforms – step transformations to be applied, in order
namespace – namespace to apply to all nodes in the pipe. This can be “…” (the default), which resolves to the name of the decorated function, None (which means no namespace), or a string (which means that all nodes will be namespaced with that string). Note that you can either use this or namespaces inside
pipe_input()
…on_input – setting the target parameter for all steps in the pipe. Leave empty to select only the first argument.
collapse – Whether to collapse this into a single node. This is not currently supported.
_chain – Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed.
@flow
will make use of this.
pipe_output¶
- class hamilton.function_modifiers.macros.pipe_output(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_output: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶
Running a series of transformation on the output of the function.
The decorated function declares the dependency, the body of the function gets executed, and then we run a series of transformations on the result of the function specified by
pipe_output
.If we have nodes A –> B –> C in the DAG and decorate B with
pipe_output
like@pipe_output( step(B1), step(B2) ) def B(...): return ...
we obtain the new DAG A –> B.raw –> B1 –> B2 –> B –> C, where we can think of the B.raw –> B1 –> B2 –> B as a “pipe” that takes the raw output of B, applies to it B1, takes the output of B1 applies to it B2 and then gets renamed to B to re-connect to the rest of the DAG.
The rules for chaining nodes are the same as for
pipe_input
.For extra control in case of multiple output nodes, for example after
extract_field
/extract_columns
we can also specify the output node that we wish to mutate. The following apply A to all fields while B only tofield_1
@extract_columns("col_1", "col_2") def A(...): return ... def B(...): return ... @pipe_output( step(A), step(B).on_output("field_1"), ) @extract_fields( {"field_1":int, "field_2":int, "field_3":int} ) def foo(a:int)->Dict[str,int]: return {"field_1":1, "field_2":2, "field_3":3}
We can also do this on the global level (but cannot do on both levels at the same time). The following would apply function A and function B to only
field_1
andfield_2
@pipe_output( step(A), step(B), on_output = ["field_1","field_2] ) @extract_fields( {"field_1":int, "field_2":int, "field_3":int} ) def foo(a:int)->Dict[str,int]: return {"field_1":1, "field_2":2, "field_3":3}
- __init__(*transforms: Applicable, namespace: str | ellipsis | None = Ellipsis, on_output: str | Collection[str] | None | ellipsis = None, collapse=False, _chain=False)¶
Instantiates a
@pipe_output
decorator.Warning: if there is a global pipe_output target, the individual
step(...).target
would only choose from the subset pre-selected from the global pipe_output target. We have disabled this for now to avoid confusion. Leave global pipe_output target empty if you want to choose between all the nodes on the individual step level.- Parameters:
transforms – step transformations to be applied, in order
namespace – namespace to apply to all nodes in the pipe. This can be “…” (the default), which resolves to the name of the decorated function, None (which means no namespace), or a string (which means that all nodes will be namespaced with that string). Note that you can either use this or namespaces inside
pipe_output()
…on_output – setting the target node for all steps in the pipe. Leave empty to select all the output nodes.
collapse – Whether to collapse this into a single node. This is not currently supported.
_chain – Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed.
@flow
will make use of this.
mutate¶
- class hamilton.function_modifiers.macros.mutate(*target_functions: Applicable | Callable, collapse: bool = False, _chain: bool = False, **mutating_function_kwargs: SingleDependency | Any)¶
Running a transformation on the outputs of a series of functions.
This is closely related to
pipe_output
as it effectively allows you to run transformations on the output of a node without touching that node. We choose which target functions we wish to mutate by the transformation we are decorating. For now, the target functions, that will be mutated, have to be in the same module (come speak to us if you need this capability over multiple modules).We suggest you define them with an prefixed underscore to only have them displayed in the transform pipeline of the target node.
If we wish to apply
_transform1
to the output of A and B and_transform2
only to the output of node B, we can do this likedef A(...): return ... def B(...): return ... @mutate(A, B) def _transform1(...): return ... @mutate(B) def _transform2(...): return ...
we obtain the new pipe-like subDAGs A.raw –> _transform1 –> A and B.raw –> _transform1 –> _transform2 –> B, where the behavior is the same as
pipe_output
.While it is generally reasonable to use
pipe_output
, you should considermutate
in the following scenarios:Loading data and applying pre-cleaning step.
Feature engineering via joining, filtering, sorting, etc.
Experimenting with different transformations across nodes by selectively turning transformations on / off.
We assume the first argument of the decorated function to be the output of the function we are targeting. For transformations with multiple arguments you can use key word arguments coupled with
step
orvalue
the same as with otherpipe
-family decorators@mutate(A, B, arg2=step('upstream_node'), arg3=value(some_literal), ...) def _transform1(output_from_target:correct_type, arg2:arg2_type, arg3:arg3_type,...): return ...
You can also select individual args that will be applied to each target node by adding
apply_to(...)
@mutate( apply_to(A, arg2=step('upstream_node_1'), arg3=value(some_literal_1)), apply_to(B, arg2=step('upstream_node_2'), arg3=value(some_literal_2)), ) def _transform1(output_from_target:correct_type, arg2:arg2_type, arg3:arg3_type, ...): return ...
In case of multiple output nodes, for example after
extract_field
/extract_columns
we can also specify the output node that we wish to mutate. The following would mutate all columns of A individually while in the case of function B onlyfield_1
@extract_columns("col_1", "col_2") def A(...): return ... @extract_fields( {"field_1":int, "field_2":int, "field_3":int} ) def B(...): return ... @mutate( apply_to(A), apply_to(B).on_output("field_1"), ) def foo(a:int)->Dict[str,int]: return {"field_1":1, "field_2":2, "field_3":3}
- __init__(*target_functions: Applicable | Callable, collapse: bool = False, _chain: bool = False, **mutating_function_kwargs: SingleDependency | Any)¶
Instantiates a
mutate
decorator.We assume the first argument of the decorated function to be the output of the function we are targeting.
- Parameters:
target_functions – functions we wish to mutate the output of
collapse – Whether to collapse this into a single node. This is not currently supported.
_chain – Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed.
@flow
will make use of this.**mutating_function_kwargs – other kwargs that the decorated function has. Must be validly called as
f(**kwargs)
, and have a 1-to-1 mapping of kwargs to parameters. This will be applied for alltarget_functions
, unlessapply_to
already has the mutator function kwargs, in which case it takes those.