plugins.h_openlineage.OpenLineageAdapter

class hamilton.plugins.h_openlineage.OpenLineageAdapter(client: OpenLineageClient, namespace: str, job_name: str)

This adapter emits OpenLineage events.

# create the openlineage client
from openlineage.client import OpenLineageClient

# write to file
from openlineage.client.transport.file import FileConfig, FileTransport
file_config = FileConfig(
    log_file_path="/path/to/your/file",
    append=False,
)
client = OpenLineageClient(transport=FileTransport(file_config))

# write to HTTP, e.g. marquez
client = OpenLineageClient(url="http://localhost:5000")

# create the adapter
adapter = OpenLineageAdapter(client, "my_namespace", "my_job_name")

# add to Hamilton
# import your pipeline code
dr = driver.Builder().with_modules(YOUR_MODULES).with_adapters(adapter).build()
# execute as normal -- and openlineage events will be emitted
dr.execute(...)

Note for data lineage to be emitted, you must use the “materializer” abstraction to provide metadata. See https://hamilton.dagworks.io/en/latest/concepts/materialization/. This can be done via the @datasaver() and @dataloader() decorators, or using the @load_from or @save_to decorators, as well as passing in data savers and data loaders via .with_materializers() on the Driver Builder, or via .materialize() on the driver object.

__init__(client: OpenLineageClient, namespace: str, job_name: str)

Constructor. You pass in the OLClient.

Parameters:
  • self

  • client

  • namespace

  • job_name

Returns:

post_graph_execute(run_id: str, graph: FunctionGraph, success: bool, error: Exception | None, results: Dict[str, Any] | None)

Emits a Run COMPLETE or FAIL event.

Parameters:
  • run_id

  • graph

  • success

  • error

  • results

Returns:

post_node_execute(run_id: str, node_: Node, kwargs: Dict[str, Any], success: bool, error: Exception | None, result: Any | None, task_id: str | None = None)

Run Event: will emit a RUNNING event with updates on input/outputs.

A Job Event will be emitted for graph execution, and additional SQLJob facet if data was loaded from a SQL source.

A Dataset Event will be emitted if a dataloader or datasaver was used:

  • input data set if loader

  • output data set if saver

  • appropriate facets will be added to the dataset where it makes sense.

TODO: attach statistics facets

Parameters:
  • run_id

  • node

  • kwargs

  • success

  • error

  • result

  • task_id

Returns:

pre_graph_execute(run_id: str, graph: FunctionGraph, final_vars: List[str], inputs: Dict[str, Any], overrides: Dict[str, Any])

Emits a Run START event. Emits a Job Event with the sourceCode Facet for the entire DAG as the job.

Parameters:
  • run_id

  • graph

  • final_vars

  • inputs

  • overrides

Returns:

pre_node_execute(run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None)

No event emitted.