lifecycle.GracefulErrorAdapter¶

class hamilton.lifecycle.default.GracefulErrorAdapter(error_to_catch: Type[Exception], sentinel_value: Any = None, try_all_parallel: bool = True, allow_injection: bool = True)¶

Gracefully handles errors in a graph’s execution. This allows you to proceed despite failure, dynamically pruning branches. While it still runs every node, it replaces them with no-ops if any upstream required dependencies fail (including optional dependencies).

__init__(error_to_catch: Type[Exception], sentinel_value: Any = None, try_all_parallel: bool = True, allow_injection: bool = True)¶

Initializes the adapter. Allows you to customize the error to catch (which exception your graph will throw to indicate failure), as well as the sentinel value to use in place of a node’s result if it fails (this defaults to None).

Note that this is currently only compatible with the dict-based result builder (use at your own risk with pandas series, etc…).

Be careful using None as the default – feel free to replace it with a sentinel value of your choice (this could negatively impact your graph’s execution if you actually do intend to use None return values).

You can use this as follows:

# my_module.py
# custom exception
class DoNotProceed(Exception):
    pass


def wont_proceed() -> int:
    raise DoNotProceed()


def will_proceed() -> int:
    return 1


def never_reached(wont_proceed: int) -> int:
    return 1  # this should not be reached


dr = (
    driver.Builder()
    .with_modules(my_module)
    .with_adapters(
        default.GracefulErrorAdapter(
            error_to_catch=DoNotProceed,
            sentinel_value=None
        )
    )
    .build()
)
dr.execute(
    ["will_proceed", "never_reached"]
)  # will return {'will_proceed': 1, 'never_reached': None}

Note you can customize the error you want it to fail on and the sentinel value to use in place of a node’s result if it fails.

For Parallelizable nodes, this adapter will attempt to iterate over the node outputs. If an error occurs, the sentinel value is returned and no more iterations over the node will occur. Meaning if item (3) fails out of 1,2,3,4,5, 4/5 will not run. If you set try_all_parallel to be False, it only sends one sentinel value into the parallelize sub-dag.

Here’s an example for parallelizable to demonstrate try_all_parallel:

# parallel_module.py
# custom exception
class DoNotProceed(Exception):
    pass


def start_point() -> Parallelizable[int]:
    for i in range(5):
        if i == 3:
            raise DoNotProceed()
        yield i


def inner(start_point: int) -> int:
    return start_point


def gather(inner: Collect[int]) -> list[int]:
    return inner


dr = (
    driver.Builder()
    .with_modules(parallel_module)
    .with_adapters(
        default.GracefulErrorAdapter(
            error_to_catch=DoNotProceed,
            sentinel_value=None,
            try_all_parallel=True,
        )
    )
    .build()
)
dr.execute(["gather"])  # will return {'gather': [0,1,2,None]}

dr = (
    driver.Builder()
    .with_modules(parallel_module)
    .with_adapters(
        default.GracefulErrorAdapter(
            error_to_catch=DoNotProceed,
            sentinel_value=None,
            try_all_parallel=False,
        )
    )
    .build()
)
dr.execute(["gather"])  # will return {'gather': [None]}
Parameters:
  • error_to_catch – The error to catch

  • sentinel_value – The sentinel value to use in place of a node’s result if it fails

  • try_all_parallel – Gather parallelizable outputs until a failure, then add a Sentinel.

  • allow_injection – Flag for considering the accept_error_sentinels tag. Defaults to True.

default.accept_error_sentinels()¶

Tag a function to allow passing in error sentinels.

For use with GracefulErrorAdapter. The standard adapter behavior is to skip a node when an error sentinel is one of its inputs. This decorator will cause the node to run, and place the error sentinel into the appropriate input.

Take care to ensure your sentinels are easily distinguishable if you do this - see the note in the GracefulErrorAdapater docstring.

A use case is any data or computation aggregation step that still wants partial results, or considers a failure interesting enough to log or notify.

SENTINEL = object()

...

@accept_error_sentinels
def results_gathering(result_1: float, result_2: float) -> dict[str, Any]:
    answer = {}
    for name, res in zip(["result 1", "result 2"], [result_1, result_2])
        answer[name] = res
        if res is SENTINEL:
            answer[name] = "Node failure: no result"
            # You may want side-effects for a failure.
            _send_text_that_your_runs_errored()
    return answer

...
adapter = GracefulErrorAdapter(sentinel_value=SENTINEL)
...