with_columns#
** Overview **
This is part of the hamilton pyspark integration. To install, run:
pip install sf-hamilton[pyspark]
Reference Documentation
- class hamilton.plugins.h_spark.with_columns(*load_from: Callable | module, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, select: List[str] = None, namespace: str = None, mode: str = 'append')#
- __init__(*load_from: Callable | module, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, select: List[str] = None, namespace: str = None, mode: str = 'append')#
- Initializes a with_columns decorator for spark. This allows you to efficiently run
groups of map operations on a dataframe, represented as pandas/primitives UDFs. This effectively “linearizes” compute – meaning that a DAG of map operations can be run as a set of .withColumn operations on a single dataframe – ensuring that you don’t have to do a complex extract then join process on spark, which can be inefficient.
Here’s an example of calling it – if you’ve seen @subdag, you should be familiar with the concepts:
# my_module.py def a(a_from_df: pd.Series) -> pd.Series: return _process(a) def b(b_from_df: pd.Series) -> pd.Series: return _process(b) def a_plus_b(a_from_df: pd.Series, b_from_df: pd.Series) -> pd.Series: return a + b # the with_columns call @with_columns( load_from=[my_module], # Load from any module columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to # the subdag select=["a", "b", "a_plus_b"], # The columns to select from the dataframe ) def final_df(initial_df: ps.DataFrame) -> ps.DataFrame: # process, or just return unprocessed ...
You can think of the above as a series of withColumn calls on the dataframe, where the operations are applied in topological order. This is significantly more efficient than extracting out the columns, applying the maps, then joining, but also allows you to express the operations individually, making it easy to unit-test and reuse.
Note that the operation is “append”, meaning that the columns that are selected are appended onto the dataframe. We will likely add an option to have this be either “select” or “append” mode.
If the function takes multiple dataframes, the dataframe input to process will always be the first one. This will be passed to the subdag, transformed, and passed back to the functions. This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code above, the dataframe that is passed to the subdag is initial_df. That is transformed by the subdag, and then returned as the final dataframe.
You can read it as:
“final_df is a function that transforms the upstream dataframe initial_df, running the transformations from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it.”
- Parameters:
load_from – The functions that will be used to generate the group of map operations.
select – Columns to select from the transformation. If this is left blank it will keep all columns in the subdag.
columns_to_pass – The initial schema of the dataframe. This is used to determine which upstream inputs should be taken from the dataframe, and which shouldn’t. Note that, if this is left empty (and external_inputs is as well), we will assume that all dependencies come from the dataframe. This cannot be used in conjunction with pass_dataframe_as.
pass_dataframe_as – The name of the dataframe that we’re modifying, as known to the subdag. If you pass this in, you are responsible for extracting columns out. If not provided, you have to pass columns_to_pass in, and we will extract the columns out for you.
namespace – The namespace of the nodes, so they don’t clash with the global namespace and so this can be reused. If its left out, there will be no namespace (in which case you’ll want to be careful about repeating it/reusing the nodes in other parts of the DAG.)
mode – The mode of the operation. This can be either “append” or “select”. If it is “append”, it will keep all columns in the dataframe. If it is “select”, it will only keep the columns in the dataframe from the select parameter. Note that, if the select parameter is left blank, it will keep all columns in the dataframe that are in the subdag (as that is the behavior of the select parameter. This defaults to append