Skip to main content

DataHub Actions Quickstart

Prerequisites

The DataHub Actions CLI commands are an extension of the base datahub CLI commands. We recommend first installing the datahub CLI:

python3 -m pip install --upgrade pip wheel setuptools
python3 -m pip install --upgrade acryl-datahub
datahub --version

Note that the Actions Framework requires a version of acryl-datahub >= v0.8.34

Installation

To install DataHub Actions, you need to install the acryl-datahub-actions package from PyPi

python3 -m pip install --upgrade pip wheel setuptools
python3 -m pip install --upgrade acryl-datahub-actions

# Verify the installation by checking the version.
datahub actions version

Hello World

DataHub ships with a "Hello World" Action which logs all events it receives to the console. To run this action, simply create a new Action configuration file:

# hello_world.yaml
name: "hello_world"
source:
type: "kafka"
config:
connection:
bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}
schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081}
action:
type: "hello_world"

and then run it using the datahub actions command:

datahub actions -c hello_world.yaml

You should the see the following output if the Action has been started successfully:

Action Pipeline with name 'hello_world' is now running.

Now, navigate to the instance of DataHub that you've connected to and perform an Action such as

  • Adding / removing a Tag
  • Adding / removing a Glossary Term
  • Adding / removing a Domain

If all is well, you should see some events being logged to the console

Hello world! Received event:
{
"event_type": "EntityChangeEvent_v1",
"event": {
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)",
"category": "TAG",
"operation": "ADD",
"modifier": "urn:li:tag:pii",
"parameters": {},
"auditStamp": {
"time": 1651082697703,
"actor": "urn:li:corpuser:datahub",
"impersonator": null
},
"version": 0,
"source": null
},
"meta": {
"kafka": {
"topic": "PlatformEvent_v1",
"offset": 1262,
"partition": 0
}
}
}

An example of an event emitted when a 'pii' tag has been added to a Dataset.

Woohoo! You've successfully started using the Actions framework. Now, let's see how we can get fancy.

Filtering events

If we know which Event types we'd like to consume, we can optionally add a filter configuration, which will prevent events that do not match the filter from being forwarded to the action.

# hello_world.yaml
name: "hello_world"
source:
type: "kafka"
config:
connection:
bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}
schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081}
filter:
event_type: "EntityChangeEvent_v1"
action:
type: "hello_world"

Filtering for events of type EntityChangeEvent_v1 only

Advanced Filtering

Beyond simply filtering by event type, we can also filter events by matching against the values of their fields. To do so, use the event block. Each field provided will be compared against the real event's value. An event that matches all of the fields will be forwarded to the action.

# hello_world.yaml
name: "hello_world"
source:
type: "kafka"
config:
connection:
bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}
schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081}
filter:
event_type: "EntityChangeEvent_v1"
event:
category: "TAG"
operation: "ADD"
modifier: "urn:li:tag:pii"
action:
type: "hello_world"

This filter only matches events representing "PII" tag additions to an entity.

And more, we can achieve "OR" semantics on a particular field by providing an array of values.

# hello_world.yaml
name: "hello_world"
source:
type: "kafka"
config:
connection:
bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}
schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081}
filter:
event_type: "EntityChangeEvent_v1"
event:
category: "TAG"
operation: [ "ADD", "REMOVE" ]
modifier: "urn:li:tag:pii"
action:
type: "hello_world"

This filter only matches events representing "PII" tag additions to OR removals from an entity. How fancy!