SimplePythonGraphAdapter

class hamilton.base.SimplePythonGraphAdapter(result_builder: ResultMixin = None)

This class allows you to swap out the build_result very easily.

This executes the Hamilton dataflow locally on a machine in a single threaded, single process fashion. It allows you to specify a ResultBuilder to control the return type of what execute() returns.

Currently this extends SimplePythonDataFrameGraphAdapter, although that’s largely for legacy reasons (and can probably be changed).

TODO – change this to extend the right class.

__init__(result_builder: ResultMixin = None)

Allows you to swap out the build_result very easily.

Parameters:

result_builder – A ResultMixin object that will be used to build the result.

static build_dataframe_with_dataframes(outputs: Dict[str, Any]) DataFrame

Builds a dataframe from the outputs in an β€œouter join” manner based on index.

The behavior of pd.Dataframe(outputs) is that it will do an outer join based on indexes of the Series passed in. To handle dataframes, we unpack the dataframe into a dict of series, check to ensure that no columns are redefined in a rolling fashion going in order of the outputs requested. This then results in an β€œenlarged” outputs dict that is then passed to pd.Dataframe(outputs) to get the final dataframe.

Parameters:

outputs – The outputs to build the dataframe from.

Returns:

A dataframe with the outputs.

build_result(**outputs: Dict[str, Any]) Any

Delegates to the result builder function supplied.

static check_input_type(node_type: Type, input_value: Any) bool

Used to check whether the user inputs match what the execution strategy & functions can handle.

Static purely for legacy reasons.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

True if the input is valid, False otherwise.

static check_node_type_equivalence(node_type: Type, input_type: Type) bool

Used to check whether two types are equivalent.

Static, purely for legacy reasons.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

True if the types are equivalent, False otherwise.

static check_pandas_index_types_match(all_index_types: Dict[str, List[str]], time_indexes: Dict[str, List[str]], no_indexes: Dict[str, List[str]]) bool

Checks that pandas index types match.

This only logs warning errors, and if debug is enabled, a debug statement to list index types.

do_build_result(outputs: Dict[str, Any]) Any

Implements the do_build_result method from the BaseDoBuildResult class. This is kept from the user as the public-facing API is build_result, allowing us to change the API/implementation of the internal set of hooks

do_check_edge_types_match(type_from: type, type_to: type) bool

Method that checks whether two types are equivalent. This is used when the function graph is being created.

Parameters:
  • type_from – The type of the node that is the source of the edge.

  • type_to – The type of the node that is the destination of the edge.

Return bool:

Whether or not they are equivalent

do_node_execute(run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None) Any

Method that is called to implement node execution. This can replace the execution of a node with something all together, augment it, or delegate it.

Parameters:
  • run_id – ID of the run, unique in scope of the driver.

  • node – Node that is being executed

  • kwargs – Keyword arguments that are being passed into the node

  • task_id – ID of the task, defaults to None if not in a task setting

do_validate_input(node_type: type, input_value: Any) bool

Method that an input value maches an expected type.

Parameters:
  • node_type – The type of the node.

  • input_value – The value that we want to validate.

Returns:

Whether or not the input value matches the expected type.

execute_node(node: Node, kwargs: Dict[str, Any]) Any

Given a node that represents a hamilton function, execute it. Note, in some adapters this might just return some type of β€œfuture”.

Parameters:
  • node – the Hamilton Node

  • kwargs – the kwargs required to exercise the node function.

Returns:

the result of exercising the node.

input_types() List[Type[Type]]

Currently this just shoves anything into a dataframe. We should probably tighten this up.

output_type() Type

Returns the output type of this result builder :return: the type that this creates

static pandas_index_types(outputs: Dict[str, Any]) Tuple[Dict[str, List[str]], Dict[str, List[str]], Dict[str, List[str]]]

This function creates three dictionaries according to whether there is an index type or not.

The three dicts we create are: 1. Dict of index type to list of outputs that match it. 2. Dict of time series / categorical index types to list of outputs that match it. 3. Dict of no-index key to list of outputs with no index type.

Parameters:

outputs – the dict we’re trying to create a result from.

Returns:

dict of all index types, dict of time series/categorical index types, dict if there is no index