h_async.AsyncGraphAdapter#

class hamilton.experimental.h_async.AsyncGraphAdapter(result_builder: ResultMixin = None)#

Graph adapter for use with the AsyncDriver class.

__init__(result_builder: ResultMixin = None)#

Creates an AsyncGraphAdapter class. Note this will only work with the AsyncDriver class.

Some things to note:

  1. This executes everything at the end (recursively). E.G. the final DAG nodes are awaited

  2. This does not work with decorators when the async function is being decorated. That is because that function is called directly within the decorator, so we cannot await it.

static build_dataframe_with_dataframes(outputs: Dict[str, Any]) DataFrame#

Builds a dataframe from the outputs in an “outer join” manner based on index.

The behavior of pd.Dataframe(outputs) is that it will do an outer join based on indexes of the Series passed in. To handle dataframes, we unpack the dataframe into a dict of series, check to ensure that no columns are redefined in a rolling fashion going in order of the outputs requested. This then results in an “enlarged” outputs dict that is then passed to pd.Dataframe(outputs) to get the final dataframe.

Parameters:

outputs – The outputs to build the dataframe from.

Returns:

A dataframe with the outputs.

build_result(**outputs: Dict[str, Any]) Any#

Currently this is a no-op – it just delegates to the resultsbuilder. That said, we could make it async, but it feels wrong – this will just be called after raw_execute.

Parameters:

outputs – Outputs (awaited) from the graph.

Returns:

The final results.

static check_input_type(node_type: Type, input_value: Any) bool#

Used to check whether the user inputs match what the execution strategy & functions can handle.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

static check_node_type_equivalence(node_type: Type, input_type: Type) bool#

Used to check whether two types are equivalent.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

static check_pandas_index_types_match(all_index_types: Dict[str, List[str]], time_indexes: Dict[str, List[str]], no_indexes: Dict[str, List[str]]) bool#

Checks that pandas index types match.

This only logs warning errors, and if debug is enabled, a debug statement to list index types.

execute_node(node: Node, kwargs: Dict[str, Any]) Any#

Executes a node. Note this doesn’t actually execute it – rather, it returns a task. This does not use async def, as we want it to be awaited on later – this await is done in processing parameters of downstream functions/final results. We can ensure that as we also run the driver that this corresponds to.

Note that this assumes that everything is awaitable, even if it isn’t. In that case, it just wraps it in one.

Parameters:
  • node – Node to wrap

  • kwargs – Keyword arguments (either coroutines or raw values) to call it with

Returns:

A task

input_types() List[Type[Type]]#

Currently this just shoves anything into a dataframe. We should probably tighten this up.

output_type() Type#

Returns the output type of this result builder :return: the type that this creates

static pandas_index_types(outputs: Dict[str, Any]) Tuple[Dict[str, List[str]], Dict[str, List[str]], Dict[str, List[str]]]#

This function creates three dictionaries according to whether there is an index type or not.

The three dicts we create are: 1. Dict of index type to list of outputs that match it. 2. Dict of time series / categorical index types to list of outputs that match it. 3. Dict of no-index key to list of outputs with no index type.

Parameters:

outputs – the dict we’re trying to create a result from.

Returns:

dict of all index types, dict of time series/categorical index types, dict if there is no index