plugins.h_openlineage.OpenLineageAdapter¶
- class hamilton.plugins.h_openlineage.OpenLineageAdapter(client: OpenLineageClient, namespace: str, job_name: str)¶
This adapter emits OpenLineage events.
# create the openlineage client from openlineage.client import OpenLineageClient # write to file from openlineage.client.transport.file import FileConfig, FileTransport file_config = FileConfig( log_file_path="/path/to/your/file", append=False, ) client = OpenLineageClient(transport=FileTransport(file_config)) # write to HTTP, e.g. marquez client = OpenLineageClient(url="http://localhost:5000") # create the adapter adapter = OpenLineageAdapter(client, "my_namespace", "my_job_name") # add to Hamilton # import your pipeline code dr = driver.Builder().with_modules(YOUR_MODULES).with_adapters(adapter).build() # execute as normal -- and openlineage events will be emitted dr.execute(...)
Note for data lineage to be emitted, you must use the “materializer” abstraction to provide metadata. See https://hamilton.dagworks.io/en/latest/concepts/materialization/. This can be done via the @datasaver() and @dataloader() decorators, or using the @load_from or @save_to decorators, as well as passing in data savers and data loaders via .with_materializers() on the Driver Builder, or via .materialize() on the driver object.
- __init__(client: OpenLineageClient, namespace: str, job_name: str)¶
Constructor. You pass in the OLClient.
- Parameters:
self
client
namespace
job_name
- Returns:
- post_graph_execute(run_id: str, graph: FunctionGraph, success: bool, error: Exception | None, results: Dict[str, Any] | None)¶
Emits a Run COMPLETE or FAIL event.
- Parameters:
run_id
graph
success
error
results
- Returns:
- post_node_execute(run_id: str, node_: Node, kwargs: Dict[str, Any], success: bool, error: Exception | None, result: Any | None, task_id: str | None = None)¶
Run Event: will emit a RUNNING event with updates on input/outputs.
A Job Event will be emitted for graph execution, and additional SQLJob facet if data was loaded from a SQL source.
A Dataset Event will be emitted if a dataloader or datasaver was used:
input data set if loader
output data set if saver
appropriate facets will be added to the dataset where it makes sense.
TODO: attach statistics facets
- Parameters:
run_id
node
kwargs
success
error
result
task_id
- Returns:
- pre_graph_execute(run_id: str, graph: FunctionGraph, final_vars: List[str], inputs: Dict[str, Any], overrides: Dict[str, Any])¶
Emits a Run START event. Emits a Job Event with the sourceCode Facet for the entire DAG as the job.
- Parameters:
run_id
graph
final_vars
inputs
overrides
- Returns:
- pre_node_execute(run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None)¶
No event emitted.