Skip to content

feat(datahub-actions): Adding support for Metadata Change Log in DataHub Cloud Events Source #14497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
PLATFORM_EVENT_TOPIC_NAME = "PlatformEvent_v1"
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME = "MetadataChangeLog_Versioned_v1"
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME = "MetadataChangeLog_Timeseries_v1"
ENTITY_CHANGE_EVENT_NAME = "entityChangeEvent"
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@
import logging
import time
from dataclasses import dataclass
from typing import Iterable, List, Optional
from typing import Dict, Iterable, List, Optional, Union, cast

from datahub.configuration import ConfigModel
from datahub.emitter.serialization_helper import post_json_transform
from datahub.ingestion.graph.client import DataHubGraph

# DataHub imports.
from datahub.metadata.schema_classes import GenericPayloadClass
from datahub_actions.event.event_envelope import EventEnvelope
from datahub_actions.event.event_registry import (
ENTITY_CHANGE_EVENT_V1_TYPE,
METADATA_CHANGE_LOG_EVENT_V1_TYPE,
EntityChangeEvent,
MetadataChangeLogEvent,
)

# May or may not need these.
from datahub_actions.pipeline.pipeline_context import PipelineContext
from datahub_actions.plugin.source.acryl.constants import (
ENTITY_CHANGE_EVENT_NAME,
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME,
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
PLATFORM_EVENT_TOPIC_NAME,
)
from datahub_actions.plugin.source.acryl.datahub_cloud_events_ack_manager import (
Expand All @@ -42,8 +47,15 @@ def build_entity_change_event(payload: GenericPayloadClass) -> EntityChangeEvent
raise ValueError("Failed to parse into EntityChangeEvent") from e


def build_metadata_change_log_event(msg: ExternalEvent) -> MetadataChangeLogEvent:
try:
return cast(MetadataChangeLogEvent, MetadataChangeLogEvent.from_json(msg.value))
except Exception as e:
raise ValueError("Failed to parse into MetadataChangeLogEvent") from e


class DataHubEventsSourceConfig(ConfigModel):
topic: str = PLATFORM_EVENT_TOPIC_NAME
topics: Union[str, List[str]] = PLATFORM_EVENT_TOPIC_NAME
consumer_id: Optional[str] = None # Used to store offset for the consumer.
lookback_days: Optional[int] = None
reset_offsets: Optional[bool] = False
Expand Down Expand Up @@ -71,20 +83,79 @@ def _get_pipeline_urn(pipeline_name: str) -> str:
def __init__(self, config: DataHubEventsSourceConfig, ctx: PipelineContext):
self.ctx = ctx
self.source_config = config
self.consumer_id = DataHubEventSource._get_pipeline_urn(self.ctx.pipeline_name)
self.base_consumer_id = DataHubEventSource._get_pipeline_urn(
self.ctx.pipeline_name
)

# Convert topics to a list for consistent handling
if isinstance(self.source_config.topics, str):
self.topics_list = [self.source_config.topics]
else:
self.topics_list = self.source_config.topics

# Ensure a Graph Instance was provided.
assert self.ctx.graph is not None

self.datahub_events_consumer: DataHubEventsConsumer = DataHubEventsConsumer(
# TODO: This PipelineContext provides an Acryl Graph Instance
# Initialize topic consumers
self.topic_consumers = self._initialize_topic_consumers(
topics_list=self.topics_list,
base_consumer_id=self.base_consumer_id,
graph=self.ctx.graph.graph,
consumer_id=self.consumer_id,
lookback_days=self.source_config.lookback_days,
reset_offsets=self.source_config.reset_offsets,
)

self.ack_manager = AckManager()
self.safe_to_ack_offset: Optional[str] = None
self.safe_to_ack_offsets: Dict[str, Optional[str]] = {
topic: None for topic in self.topics_list
}

def _initialize_topic_consumers(
self,
topics_list: List[str],
base_consumer_id: str,
graph: DataHubGraph,
lookback_days: Optional[int],
reset_offsets: Optional[bool],
) -> Dict[str, DataHubEventsConsumer]:
"""
Initialize DataHub consumers for each topic with appropriate consumer IDs.

Maintains backward compatibility by using the legacy consumer ID format
for single PlatformEvent_v1 deployments, and topic-suffixed IDs for
multi-topic or other single-topic deployments.

Args:
topics_list: List of topic names to create consumers for
base_consumer_id: Base consumer ID for the pipeline
graph: DataHub graph instance
lookback_days: Number of days to look back for events
reset_offsets: Whether to reset consumer offsets

Returns:
Dictionary mapping topic names to their corresponding consumers
"""
topic_consumers: Dict[str, DataHubEventsConsumer] = {}

for topic in topics_list:
# Backward compatibility: if only PlatformEvent_v1, use legacy consumer ID format
if len(topics_list) == 1 and topic == PLATFORM_EVENT_TOPIC_NAME:
topic_consumer_id = (
base_consumer_id # Legacy format for existing deployments
)
else:
topic_consumer_id = (
f"{base_consumer_id}-{topic}" # New format for multi-topic
)

topic_consumers[topic] = DataHubEventsConsumer(
graph=graph,
consumer_id=topic_consumer_id,
lookback_days=lookback_days,
reset_offsets=reset_offsets,
)

return topic_consumers

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "EventSource":
Expand All @@ -93,7 +164,7 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "EventSource":

def events(self) -> Iterable[EventEnvelope]:
logger.info("Starting DataHub Cloud events source...")
logger.info(f"Subscribing to the following topic: {self.source_config.topic}")
logger.info(f"Subscribing to the following topics: {self.topics_list}")
self.running = True
yield from self._poll_and_process_events()

Expand All @@ -116,26 +187,37 @@ def _poll_and_process_events(self) -> Iterable[EventEnvelope]:
raise Exception(
f"Failed to process all events successfully after specified time {self.source_config.event_processing_time_max_duration_seconds}! If more time is required, please increase the timeout using this config. {self.ack_manager.acks.values()}",
)
logger.debug(
f"Successfully processed events up to offset id {self.safe_to_ack_offset}"
)
self.safe_to_ack_offset = self.datahub_events_consumer.offset_id
logger.debug(f"Safe to ack offset: {self.safe_to_ack_offset}")

events_response = self.datahub_events_consumer.poll_events(
topic=self.source_config.topic, poll_timeout_seconds=2
)
# Update safe-to-ack offsets for each topic
for topic in self.topics_list:
consumer = self.topic_consumers[topic]
self.safe_to_ack_offsets[topic] = consumer.offset_id
logger.debug(
f"Safe to ack offset for {topic}: {self.safe_to_ack_offsets[topic]}"
)

# Poll events from all topics using their respective consumers
all_events = []
total_events = 0

for topic in self.topics_list:
consumer = self.topic_consumers[topic]
events_response = consumer.poll_events(
topic=topic, poll_timeout_seconds=2
)
total_events += len(events_response.events)

# Process events from this topic
for msg in events_response.events:
all_events.append((topic, msg))

# Handle Idle Timeout
num_events = len(events_response.events)

if num_events == 0:
if total_events == 0:
if last_idle_response_timestamp == 0:
last_idle_response_timestamp = (
self._get_current_timestamp_seconds()
)
if self._should_idle_timeout(
num_events, last_idle_response_timestamp
total_events, last_idle_response_timestamp
):
logger.info("Exiting main loop due to idle timeout")
return
Expand All @@ -144,8 +226,9 @@ def _poll_and_process_events(self) -> Iterable[EventEnvelope]:
last_idle_response_timestamp = 0 # Reset the idle timeout

event_envelopes: List[EventEnvelope] = []
for msg in events_response.events:
for event_envelope in self.handle_pe(msg):
for topic, msg in all_events:
# Route events based on topic type
for event_envelope in self._route_event_by_topic(topic, msg):
event_envelope.meta = self.ack_manager.get_meta(event_envelope)
event_envelopes.append(event_envelope)

Expand All @@ -157,6 +240,20 @@ def _poll_and_process_events(self) -> Iterable[EventEnvelope]:

logger.info("DataHub Events consumer exiting main loop")

def _route_event_by_topic(
self, topic: str, msg: ExternalEvent
) -> Iterable[EventEnvelope]:
"""Route events to appropriate handlers based on topic type."""
if topic == PLATFORM_EVENT_TOPIC_NAME:
yield from self.handle_pe(msg)
elif topic in [
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME,
]:
yield from self.handle_mcl(msg)
else:
logger.warning(f"Unknown topic: {topic}, skipping event")

@staticmethod
def handle_pe(msg: ExternalEvent) -> Iterable[EventEnvelope]:
value: dict = json.loads(msg.value)
Expand All @@ -167,14 +264,19 @@ def handle_pe(msg: ExternalEvent) -> Iterable[EventEnvelope]:
event = build_entity_change_event(payload)
yield EventEnvelope(ENTITY_CHANGE_EVENT_V1_TYPE, event, {})

@staticmethod
def handle_mcl(msg: ExternalEvent) -> Iterable[EventEnvelope]:
event = build_metadata_change_log_event(msg)
yield EventEnvelope(METADATA_CHANGE_LOG_EVENT_V1_TYPE, event, {})

def close(self) -> None:
if self.datahub_events_consumer:
self.running = False
if self.safe_to_ack_offset:
self.datahub_events_consumer.commit_offsets(
offset_id=self.safe_to_ack_offset
)
self.datahub_events_consumer.close()
self.running = False
# Close and commit offsets for each topic consumer
for topic, consumer in self.topic_consumers.items():
safe_offset = self.safe_to_ack_offsets.get(topic)
if safe_offset:
consumer.commit_offsets(offset_id=safe_offset)
consumer.close()

def ack(self, event: EventEnvelope, processed: bool = True) -> None:
self.ack_manager.ack(event.meta, processed=processed)
Expand Down
Loading
Loading