lifecycle.GracefulErrorAdapter¶
- class hamilton.lifecycle.default.GracefulErrorAdapter(error_to_catch: Type[Exception], sentinel_value: Any = None, try_all_parallel: bool = True, allow_injection: bool = True)¶
Gracefully handles errors in a graph’s execution. This allows you to proceed despite failure, dynamically pruning branches. While it still runs every node, it replaces them with no-ops if any upstream required dependencies fail (including optional dependencies).
- __init__(error_to_catch: Type[Exception], sentinel_value: Any = None, try_all_parallel: bool = True, allow_injection: bool = True)¶
Initializes the adapter. Allows you to customize the error to catch (which exception your graph will throw to indicate failure), as well as the sentinel value to use in place of a node’s result if it fails (this defaults to
None
).Note that this is currently only compatible with the dict-based result builder (use at your own risk with pandas series, etc…).
Be careful using
None
as the default – feel free to replace it with a sentinel value of your choice (this could negatively impact your graph’s execution if you actually do intend to useNone
return values).You can use this as follows:
# my_module.py # custom exception class DoNotProceed(Exception): pass def wont_proceed() -> int: raise DoNotProceed() def will_proceed() -> int: return 1 def never_reached(wont_proceed: int) -> int: return 1 # this should not be reached dr = ( driver.Builder() .with_modules(my_module) .with_adapters( default.GracefulErrorAdapter( error_to_catch=DoNotProceed, sentinel_value=None ) ) .build() ) dr.execute( ["will_proceed", "never_reached"] ) # will return {'will_proceed': 1, 'never_reached': None}
Note you can customize the error you want it to fail on and the sentinel value to use in place of a node’s result if it fails.
For Parallelizable nodes, this adapter will attempt to iterate over the node outputs. If an error occurs, the sentinel value is returned and no more iterations over the node will occur. Meaning if item (3) fails out of 1,2,3,4,5, 4/5 will not run. If you set
try_all_parallel
to be False, it only sends one sentinel value into the parallelize sub-dag.Here’s an example for parallelizable to demonstrate try_all_parallel:
# parallel_module.py # custom exception class DoNotProceed(Exception): pass def start_point() -> Parallelizable[int]: for i in range(5): if i == 3: raise DoNotProceed() yield i def inner(start_point: int) -> int: return start_point def gather(inner: Collect[int]) -> list[int]: return inner dr = ( driver.Builder() .with_modules(parallel_module) .with_adapters( default.GracefulErrorAdapter( error_to_catch=DoNotProceed, sentinel_value=None, try_all_parallel=True, ) ) .build() ) dr.execute(["gather"]) # will return {'gather': [0,1,2,None]} dr = ( driver.Builder() .with_modules(parallel_module) .with_adapters( default.GracefulErrorAdapter( error_to_catch=DoNotProceed, sentinel_value=None, try_all_parallel=False, ) ) .build() ) dr.execute(["gather"]) # will return {'gather': [None]}
- Parameters:
error_to_catch – The error to catch
sentinel_value – The sentinel value to use in place of a node’s result if it fails
try_all_parallel – Gather parallelizable outputs until a failure, then add a Sentinel.
allow_injection – Flag for considering the
accept_error_sentinels
tag. Defaults to True.
- default.accept_error_sentinels()¶
Tag a function to allow passing in error sentinels.
For use with
GracefulErrorAdapter
. The standard adapter behavior is to skip a node when an error sentinel is one of its inputs. This decorator will cause the node to run, and place the error sentinel into the appropriate input.Take care to ensure your sentinels are easily distinguishable if you do this - see the note in the GracefulErrorAdapater docstring.
A use case is any data or computation aggregation step that still wants partial results, or considers a failure interesting enough to log or notify.
SENTINEL = object() ... @accept_error_sentinels def results_gathering(result_1: float, result_2: float) -> dict[str, Any]: answer = {} for name, res in zip(["result 1", "result 2"], [result_1, result_2]) answer[name] = res if res is SENTINEL: answer[name] = "Node failure: no result" # You may want side-effects for a failure. _send_text_that_your_runs_errored() return answer ... adapter = GracefulErrorAdapter(sentinel_value=SENTINEL) ...