diff --git a/datahub-actions/src/datahub_actions/plugin/source/acryl/constants.py b/datahub-actions/src/datahub_actions/plugin/source/acryl/constants.py index 5540dd526d0d4..1bbe8ca786c4a 100644 --- a/datahub-actions/src/datahub_actions/plugin/source/acryl/constants.py +++ b/datahub-actions/src/datahub_actions/plugin/source/acryl/constants.py @@ -1,2 +1,4 @@ PLATFORM_EVENT_TOPIC_NAME = "PlatformEvent_v1" +METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME = "MetadataChangeLog_Versioned_v1" +METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME = "MetadataChangeLog_Timeseries_v1" ENTITY_CHANGE_EVENT_NAME = "entityChangeEvent" diff --git a/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py index 3e50633af7689..aed8128711bc1 100644 --- a/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py +++ b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py @@ -2,23 +2,28 @@ import logging import time from dataclasses import dataclass -from typing import Iterable, List, Optional +from typing import Dict, Iterable, List, Optional, Union, cast from datahub.configuration import ConfigModel from datahub.emitter.serialization_helper import post_json_transform +from datahub.ingestion.graph.client import DataHubGraph # DataHub imports. from datahub.metadata.schema_classes import GenericPayloadClass from datahub_actions.event.event_envelope import EventEnvelope from datahub_actions.event.event_registry import ( ENTITY_CHANGE_EVENT_V1_TYPE, + METADATA_CHANGE_LOG_EVENT_V1_TYPE, EntityChangeEvent, + MetadataChangeLogEvent, ) # May or may not need these. from datahub_actions.pipeline.pipeline_context import PipelineContext from datahub_actions.plugin.source.acryl.constants import ( ENTITY_CHANGE_EVENT_NAME, + METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME, + METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, PLATFORM_EVENT_TOPIC_NAME, ) from datahub_actions.plugin.source.acryl.datahub_cloud_events_ack_manager import ( @@ -42,8 +47,15 @@ def build_entity_change_event(payload: GenericPayloadClass) -> EntityChangeEvent raise ValueError("Failed to parse into EntityChangeEvent") from e +def build_metadata_change_log_event(msg: ExternalEvent) -> MetadataChangeLogEvent: + try: + return cast(MetadataChangeLogEvent, MetadataChangeLogEvent.from_json(msg.value)) + except Exception as e: + raise ValueError("Failed to parse into MetadataChangeLogEvent") from e + + class DataHubEventsSourceConfig(ConfigModel): - topic: str = PLATFORM_EVENT_TOPIC_NAME + topics: Union[str, List[str]] = PLATFORM_EVENT_TOPIC_NAME consumer_id: Optional[str] = None # Used to store offset for the consumer. lookback_days: Optional[int] = None reset_offsets: Optional[bool] = False @@ -71,20 +83,79 @@ def _get_pipeline_urn(pipeline_name: str) -> str: def __init__(self, config: DataHubEventsSourceConfig, ctx: PipelineContext): self.ctx = ctx self.source_config = config - self.consumer_id = DataHubEventSource._get_pipeline_urn(self.ctx.pipeline_name) + self.base_consumer_id = DataHubEventSource._get_pipeline_urn( + self.ctx.pipeline_name + ) + + # Convert topics to a list for consistent handling + if isinstance(self.source_config.topics, str): + self.topics_list = [self.source_config.topics] + else: + self.topics_list = self.source_config.topics # Ensure a Graph Instance was provided. assert self.ctx.graph is not None - self.datahub_events_consumer: DataHubEventsConsumer = DataHubEventsConsumer( - # TODO: This PipelineContext provides an Acryl Graph Instance + # Initialize topic consumers + self.topic_consumers = self._initialize_topic_consumers( + topics_list=self.topics_list, + base_consumer_id=self.base_consumer_id, graph=self.ctx.graph.graph, - consumer_id=self.consumer_id, lookback_days=self.source_config.lookback_days, reset_offsets=self.source_config.reset_offsets, ) + self.ack_manager = AckManager() - self.safe_to_ack_offset: Optional[str] = None + self.safe_to_ack_offsets: Dict[str, Optional[str]] = { + topic: None for topic in self.topics_list + } + + def _initialize_topic_consumers( + self, + topics_list: List[str], + base_consumer_id: str, + graph: DataHubGraph, + lookback_days: Optional[int], + reset_offsets: Optional[bool], + ) -> Dict[str, DataHubEventsConsumer]: + """ + Initialize DataHub consumers for each topic with appropriate consumer IDs. + + Maintains backward compatibility by using the legacy consumer ID format + for single PlatformEvent_v1 deployments, and topic-suffixed IDs for + multi-topic or other single-topic deployments. + + Args: + topics_list: List of topic names to create consumers for + base_consumer_id: Base consumer ID for the pipeline + graph: DataHub graph instance + lookback_days: Number of days to look back for events + reset_offsets: Whether to reset consumer offsets + + Returns: + Dictionary mapping topic names to their corresponding consumers + """ + topic_consumers: Dict[str, DataHubEventsConsumer] = {} + + for topic in topics_list: + # Backward compatibility: if only PlatformEvent_v1, use legacy consumer ID format + if len(topics_list) == 1 and topic == PLATFORM_EVENT_TOPIC_NAME: + topic_consumer_id = ( + base_consumer_id # Legacy format for existing deployments + ) + else: + topic_consumer_id = ( + f"{base_consumer_id}-{topic}" # New format for multi-topic + ) + + topic_consumers[topic] = DataHubEventsConsumer( + graph=graph, + consumer_id=topic_consumer_id, + lookback_days=lookback_days, + reset_offsets=reset_offsets, + ) + + return topic_consumers @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "EventSource": @@ -93,7 +164,7 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "EventSource": def events(self) -> Iterable[EventEnvelope]: logger.info("Starting DataHub Cloud events source...") - logger.info(f"Subscribing to the following topic: {self.source_config.topic}") + logger.info(f"Subscribing to the following topics: {self.topics_list}") self.running = True yield from self._poll_and_process_events() @@ -116,26 +187,37 @@ def _poll_and_process_events(self) -> Iterable[EventEnvelope]: raise Exception( f"Failed to process all events successfully after specified time {self.source_config.event_processing_time_max_duration_seconds}! If more time is required, please increase the timeout using this config. {self.ack_manager.acks.values()}", ) - logger.debug( - f"Successfully processed events up to offset id {self.safe_to_ack_offset}" - ) - self.safe_to_ack_offset = self.datahub_events_consumer.offset_id - logger.debug(f"Safe to ack offset: {self.safe_to_ack_offset}") - - events_response = self.datahub_events_consumer.poll_events( - topic=self.source_config.topic, poll_timeout_seconds=2 - ) + # Update safe-to-ack offsets for each topic + for topic in self.topics_list: + consumer = self.topic_consumers[topic] + self.safe_to_ack_offsets[topic] = consumer.offset_id + logger.debug( + f"Safe to ack offset for {topic}: {self.safe_to_ack_offsets[topic]}" + ) + + # Poll events from all topics using their respective consumers + all_events = [] + total_events = 0 + + for topic in self.topics_list: + consumer = self.topic_consumers[topic] + events_response = consumer.poll_events( + topic=topic, poll_timeout_seconds=2 + ) + total_events += len(events_response.events) + + # Process events from this topic + for msg in events_response.events: + all_events.append((topic, msg)) # Handle Idle Timeout - num_events = len(events_response.events) - - if num_events == 0: + if total_events == 0: if last_idle_response_timestamp == 0: last_idle_response_timestamp = ( self._get_current_timestamp_seconds() ) if self._should_idle_timeout( - num_events, last_idle_response_timestamp + total_events, last_idle_response_timestamp ): logger.info("Exiting main loop due to idle timeout") return @@ -144,8 +226,9 @@ def _poll_and_process_events(self) -> Iterable[EventEnvelope]: last_idle_response_timestamp = 0 # Reset the idle timeout event_envelopes: List[EventEnvelope] = [] - for msg in events_response.events: - for event_envelope in self.handle_pe(msg): + for topic, msg in all_events: + # Route events based on topic type + for event_envelope in self._route_event_by_topic(topic, msg): event_envelope.meta = self.ack_manager.get_meta(event_envelope) event_envelopes.append(event_envelope) @@ -157,6 +240,20 @@ def _poll_and_process_events(self) -> Iterable[EventEnvelope]: logger.info("DataHub Events consumer exiting main loop") + def _route_event_by_topic( + self, topic: str, msg: ExternalEvent + ) -> Iterable[EventEnvelope]: + """Route events to appropriate handlers based on topic type.""" + if topic == PLATFORM_EVENT_TOPIC_NAME: + yield from self.handle_pe(msg) + elif topic in [ + METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, + METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME, + ]: + yield from self.handle_mcl(msg) + else: + logger.warning(f"Unknown topic: {topic}, skipping event") + @staticmethod def handle_pe(msg: ExternalEvent) -> Iterable[EventEnvelope]: value: dict = json.loads(msg.value) @@ -167,14 +264,19 @@ def handle_pe(msg: ExternalEvent) -> Iterable[EventEnvelope]: event = build_entity_change_event(payload) yield EventEnvelope(ENTITY_CHANGE_EVENT_V1_TYPE, event, {}) + @staticmethod + def handle_mcl(msg: ExternalEvent) -> Iterable[EventEnvelope]: + event = build_metadata_change_log_event(msg) + yield EventEnvelope(METADATA_CHANGE_LOG_EVENT_V1_TYPE, event, {}) + def close(self) -> None: - if self.datahub_events_consumer: - self.running = False - if self.safe_to_ack_offset: - self.datahub_events_consumer.commit_offsets( - offset_id=self.safe_to_ack_offset - ) - self.datahub_events_consumer.close() + self.running = False + # Close and commit offsets for each topic consumer + for topic, consumer in self.topic_consumers.items(): + safe_offset = self.safe_to_ack_offsets.get(topic) + if safe_offset: + consumer.commit_offsets(offset_id=safe_offset) + consumer.close() def ack(self, event: EventEnvelope, processed: bool = True) -> None: self.ack_manager.ack(event.meta, processed=processed) diff --git a/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_event_source.py b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_event_source.py index 6a8d13ae65909..f538d61f94873 100644 --- a/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_event_source.py +++ b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_event_source.py @@ -1,5 +1,6 @@ # test_datahub_event_source.py +import json from typing import List, cast from unittest.mock import MagicMock, patch @@ -8,14 +9,22 @@ from datahub_actions.event.event_envelope import EventEnvelope from datahub_actions.event.event_registry import ( ENTITY_CHANGE_EVENT_V1_TYPE, + METADATA_CHANGE_LOG_EVENT_V1_TYPE, EntityChangeEvent, + MetadataChangeLogEvent, ) from datahub_actions.pipeline.pipeline_context import PipelineContext +from datahub_actions.plugin.source.acryl.constants import ( + METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME, + METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, + PLATFORM_EVENT_TOPIC_NAME, +) # Import your source + config classes from the correct module path. from datahub_actions.plugin.source.acryl.datahub_cloud_event_source import ( DataHubEventSource, DataHubEventsSourceConfig, + build_metadata_change_log_event, ) from datahub_actions.plugin.source.acryl.datahub_cloud_events_ack_manager import ( AckManager, @@ -50,7 +59,7 @@ def base_config_dict() -> dict: We will parse this into DataHubEventsSourceConfig in each test. """ return { - "topic": "PlatformEvent_v1", + "topics": "PlatformEvent_v1", "lookback_days": None, "reset_offsets": False, "kill_after_idle_timeout": True, @@ -67,8 +76,8 @@ def test_create_source( """ source = DataHubEventSource.create(base_config_dict, mock_pipeline_context) assert isinstance(source, DataHubEventSource) - # The consumer_id on the instance includes the action prefix from pipeline_name - assert source.consumer_id == "urn:li:dataHubAction:test-pipeline" + # The base_consumer_id on the instance includes the action prefix from pipeline_name + assert source.base_consumer_id == "urn:li:dataHubAction:test-pipeline" def test_get_pipeline_urn() -> None: @@ -88,12 +97,15 @@ def test_source_initialization( """ Validate that DataHubEventSource constructor sets up DataHubEventsConsumer and AckManager. """ - config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) source = DataHubEventSource(config_model, mock_pipeline_context) - assert source.consumer_id == "urn:li:dataHubAction:test-pipeline" - assert isinstance(source.datahub_events_consumer, DataHubEventsConsumer) + assert source.base_consumer_id == "urn:li:dataHubAction:test-pipeline" + assert isinstance(source.topic_consumers, dict) + assert len(source.topic_consumers) == 1 # Default single topic + assert "PlatformEvent_v1" in source.topic_consumers + assert isinstance(source.topic_consumers["PlatformEvent_v1"], DataHubEventsConsumer) assert isinstance(source.ack_manager, AckManager) - assert source.safe_to_ack_offset is None + assert source.safe_to_ack_offsets == {"PlatformEvent_v1": None} def test_events_with_no_events( @@ -101,12 +113,13 @@ def test_events_with_no_events( ) -> None: base_config_dict["idle_timeout_duration_seconds"] = 1 base_config_dict["kill_after_idle_timeout"] = True - config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) source = DataHubEventSource(config_model, mock_pipeline_context) mock_consumer = MagicMock(spec=DataHubEventsConsumer) mock_consumer.offset_id = "offset-100" # Set the mocked offset_id - source.datahub_events_consumer = mock_consumer + # Replace the consumer for the default topic + source.topic_consumers["PlatformEvent_v1"] = mock_consumer # We'll simulate that poll_events returns a response with 0 events repeatedly. empty_response = ExternalEventsResponse(offsetId="offset-100", count=0, events=[]) @@ -130,12 +143,13 @@ def test_events_with_some_events( """ If poll_events returns events, verify that the source yields them and resets idle timer. """ - config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) source = DataHubEventSource(config_model, mock_pipeline_context) mock_consumer = MagicMock(spec=DataHubEventsConsumer) mock_consumer.offset_id = "offset-100" - source.datahub_events_consumer = mock_consumer + # Replace the consumer for the default topic + source.topic_consumers["PlatformEvent_v1"] = mock_consumer mock_ack_manager = MagicMock(spec=AckManager) mock_ack_manager.outstanding_acks.side_effect = [0] @@ -165,7 +179,9 @@ def _side_effect(*args, **kwargs): assert emitted[0].event_type == ENTITY_CHANGE_EVENT_V1_TYPE assert isinstance(emitted[0].event, EntityChangeEvent) mock_ack_manager.get_meta.assert_called_once() - assert source.safe_to_ack_offset == "offset-100" # Previous offset. + assert ( + source.safe_to_ack_offsets["PlatformEvent_v1"] == "offset-100" + ) # Previous offset. assert mock_consumer.poll_events.call_count == 1 @@ -177,7 +193,7 @@ def test_outstanding_acks_timeout( due to event_processing_time_max_duration_seconds. """ base_config_dict["event_processing_time_max_duration_seconds"] = 2 - config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) source = DataHubEventSource(config_model, mock_pipeline_context) mock_ack_manager = MagicMock(spec=AckManager) @@ -188,7 +204,8 @@ def test_outstanding_acks_timeout( mock_consumer = MagicMock(spec=DataHubEventsConsumer) mock_consumer.offset_id = "offset-100" - source.datahub_events_consumer = mock_consumer + # Replace the consumer for the default topic + source.topic_consumers["PlatformEvent_v1"] = mock_consumer source.running = True @@ -223,7 +240,7 @@ def test_ack(mock_pipeline_context: PipelineContext, base_config_dict: dict) -> """ Verify that ack() calls ack_manager.ack with the event's metadata. """ - config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) source = DataHubEventSource(config_model, mock_pipeline_context) mock_ack_manager = MagicMock(spec=AckManager) @@ -242,13 +259,14 @@ def test_close(mock_pipeline_context: PipelineContext, base_config_dict: dict) - """ Verify that close() stops the source, commits offsets, and calls consumer.close(). """ - config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) source = DataHubEventSource(config_model, mock_pipeline_context) mock_consumer = MagicMock(spec=DataHubEventsConsumer) - source.datahub_events_consumer = mock_consumer + # Replace the consumer for the default topic + source.topic_consumers["PlatformEvent_v1"] = mock_consumer - source.safe_to_ack_offset = "some-offset-id" + source.safe_to_ack_offsets["PlatformEvent_v1"] = "some-offset-id" source.close() assert source.running is False @@ -263,7 +281,7 @@ def test_should_idle_timeout( Verify the idle timeout logic in _should_idle_timeout(). """ base_config_dict["idle_timeout_duration_seconds"] = 5 - config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict) + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) source = DataHubEventSource(config_model, mock_pipeline_context) # If events > 0 => always False @@ -288,3 +306,226 @@ def test_should_idle_timeout( is True ) assert source.running is False + + +def test_multiple_topics_config( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + """ + Test that the source properly handles multiple topics configuration. + """ + # Test with list of topics + base_config_dict["topics"] = [ + PLATFORM_EVENT_TOPIC_NAME, + METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, + METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME, + ] + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + assert source.topics_list == [ + PLATFORM_EVENT_TOPIC_NAME, + METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, + METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME, + ] + + # Each topic should have its own consumer + assert len(source.topic_consumers) == 3 + assert PLATFORM_EVENT_TOPIC_NAME in source.topic_consumers + assert METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME in source.topic_consumers + assert METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME in source.topic_consumers + + # Each topic should have its own offset tracking + assert len(source.safe_to_ack_offsets) == 3 + for topic in source.topics_list: + assert topic in source.safe_to_ack_offsets + assert source.safe_to_ack_offsets[topic] is None # Initially None + + +def test_single_topic_config_as_string( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + """ + Test that the source properly handles single topic configuration as string. + """ + # topics config as string should be converted to list + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + assert source.topics_list == [PLATFORM_EVENT_TOPIC_NAME] + + +def test_backward_compatibility_single_platform_event( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + """ + Test backward compatibility: single PlatformEvent_v1 topic uses legacy consumer ID format. + """ + # Default config has only PlatformEvent_v1 + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + # Should use legacy consumer ID format (no topic suffix) + platform_consumer = source.topic_consumers[PLATFORM_EVENT_TOPIC_NAME] + assert platform_consumer.consumer_id == "urn:li:dataHubAction:test-pipeline" + + +def test_new_format_for_multiple_topics( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + """ + Test that multiple topics use new consumer ID format with topic suffixes. + """ + base_config_dict["topics"] = [ + PLATFORM_EVENT_TOPIC_NAME, + METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, + ] + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + # Should use new consumer ID format with topic suffixes + platform_consumer = source.topic_consumers[PLATFORM_EVENT_TOPIC_NAME] + mcl_consumer = source.topic_consumers[METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME] + + assert ( + platform_consumer.consumer_id + == "urn:li:dataHubAction:test-pipeline-PlatformEvent_v1" + ) + assert ( + mcl_consumer.consumer_id + == "urn:li:dataHubAction:test-pipeline-MetadataChangeLog_Versioned_v1" + ) + + +def test_new_format_for_single_non_platform_topic( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + """ + Test that single non-PlatformEvent topic uses new consumer ID format. + """ + base_config_dict["topics"] = METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + # Should use new consumer ID format even for single topic (since it's not PlatformEvent_v1) + mcl_consumer = source.topic_consumers[METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME] + assert ( + mcl_consumer.consumer_id + == "urn:li:dataHubAction:test-pipeline-MetadataChangeLog_Versioned_v1" + ) + + +def test_handle_mcl() -> None: + """ + Test that handle_mcl properly processes MetadataChangeLogEvent with proper aspect encoding. + """ + # Create a realistic MCL event based on the documented format + mcl_value = { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)", + "entityKeyAspect": None, + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "value": '{"tags":[{"tag":"urn:li:tag:pii"}]}', # JSON string as per API format + "contentType": "application/json", + }, + "systemMetadata": { + "lastObserved": 1651516475595, + "runId": "test-run-id", + "registryName": "testRegistry", + "registryVersion": "1.0.0", + "properties": None, + }, + "previousAspectValue": None, + "previousSystemMetadata": None, + "created": { + "time": 1651516475594, + "actor": "urn:li:corpuser:datahub", + "impersonator": None, + }, + } + + msg = ExternalEvent(contentType="application/json", value=json.dumps(mcl_value)) + + envelopes: List[EventEnvelope] = list(DataHubEventSource.handle_mcl(msg)) + assert len(envelopes) == 1 + assert envelopes[0].event_type == METADATA_CHANGE_LOG_EVENT_V1_TYPE + assert isinstance(envelopes[0].event, MetadataChangeLogEvent) + + # Verify the event was parsed correctly + mcl_event = envelopes[0].event + assert mcl_event.entityUrn == "urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)" + assert mcl_event.entityType == "dataset" + assert mcl_event.aspectName == "globalTags" + assert mcl_event.changeType == "UPSERT" + + +def test_route_event_by_topic( + mock_pipeline_context: PipelineContext, base_config_dict: dict +) -> None: + """ + Test that _route_event_by_topic properly routes events based on topic. + """ + config_model = DataHubEventsSourceConfig.model_validate(base_config_dict) + source = DataHubEventSource(config_model, mock_pipeline_context) + + # Test platform event routing + pe_value = '{"header":{"timestampMillis":1737170481713},"name":"entityChangeEvent","payload":{"value":"{\\"auditStamp\\":{\\"actor\\":\\"urn:li:corpuser:test\\",\\"time\\":1737170481713},\\"entityUrn\\":\\"urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)\\",\\"entityType\\":\\"dataset\\",\\"modifier\\":\\"urn:li:tag:test\\",\\"category\\":\\"TAG\\",\\"operation\\":\\"ADD\\",\\"version\\":0}","contentType":"application/json"}}' + pe_msg = ExternalEvent(contentType="application/json", value=pe_value) + + pe_envelopes = list(source._route_event_by_topic(PLATFORM_EVENT_TOPIC_NAME, pe_msg)) + assert len(pe_envelopes) == 1 + assert pe_envelopes[0].event_type == ENTITY_CHANGE_EVENT_V1_TYPE + + # Test MCL event routing with mocked handler + mcl_msg = ExternalEvent(contentType="application/json", value='{"test": "mcl"}') + + with patch.object(source, "handle_mcl") as mock_handle_mcl: + mock_envelope = EventEnvelope( + METADATA_CHANGE_LOG_EVENT_V1_TYPE, MagicMock(), {} + ) + mock_handle_mcl.return_value = [mock_envelope] + + mcl_envelopes = list( + source._route_event_by_topic( + METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, mcl_msg + ) + ) + assert len(mcl_envelopes) == 1 + assert mcl_envelopes[0].event_type == METADATA_CHANGE_LOG_EVENT_V1_TYPE + mock_handle_mcl.assert_called_once_with(mcl_msg) + + # Test unknown topic (should return no events) + unknown_envelopes = list(source._route_event_by_topic("unknown_topic", pe_msg)) + assert len(unknown_envelopes) == 0 + + +def test_build_metadata_change_log_event() -> None: + """ + Test that build_metadata_change_log_event properly creates MetadataChangeLogEvent. + """ + # Create a realistic MCL event based on documented format + mcl_value = { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,test_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": '{"rowCount": 1000, "columnCount": 5}', # JSON string + "contentType": "application/json", + }, + "systemMetadata": {"lastObserved": 1651516475595, "runId": "test-run"}, + "created": {"time": 1651516475594, "actor": "urn:li:corpuser:datahub"}, + } + + msg = ExternalEvent(contentType="application/json", value=json.dumps(mcl_value)) + event = build_metadata_change_log_event(msg) + + assert isinstance(event, MetadataChangeLogEvent) + assert ( + event.entityUrn == "urn:li:dataset:(urn:li:dataPlatform:hive,test_dataset,PROD)" + ) + assert event.entityType == "dataset" + assert event.aspectName == "datasetProfile" + assert event.changeType == "UPSERT" diff --git a/metadata-service/events-service/src/main/java/io/datahubproject/event/ExternalEventsService.java b/metadata-service/events-service/src/main/java/io/datahubproject/event/ExternalEventsService.java index 6a0a9553459ab..3bb7cb608afa2 100644 --- a/metadata-service/events-service/src/main/java/io/datahubproject/event/ExternalEventsService.java +++ b/metadata-service/events-service/src/main/java/io/datahubproject/event/ExternalEventsService.java @@ -28,7 +28,16 @@ public class ExternalEventsService { public static final String PLATFORM_EVENT_TOPIC_NAME = "PlatformEvent_v1"; - private static final Set ALLOWED_TOPICS = Set.of(PLATFORM_EVENT_TOPIC_NAME); + public static final String METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME = + "MetadataChangeLog_Versioned_v1"; + public static final String METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME = + "MetadataChangeLog_Timeseries_v1"; + + private static final Set ALLOWED_TOPICS = + Set.of( + PLATFORM_EVENT_TOPIC_NAME, + METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, + METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME); private final KafkaConsumerPool consumerPool; private final ObjectMapper objectMapper; private final Map @@ -80,6 +89,8 @@ public ExternalEvents poll( long timeout = (pollTimeoutSeconds != null ? pollTimeoutSeconds : defaultPollTimeoutSeconds) * 1000L; + System.out.println(String.format("Final topic is %s", finalTopic)); + try { List partitions = consumer.partitionsFor(finalTopic).stream() @@ -88,9 +99,10 @@ public ExternalEvents poll( consumer.assign(partitions); Map partitionOffsets = - getPartitionOffsets(topic, offsetId, consumer, partitions, lookbackWindowDays); + getPartitionOffsets(finalTopic, offsetId, consumer, partitions, lookbackWindowDays); for (Map.Entry entry : partitionOffsets.entrySet()) { + System.out.println(String.format("Seeking to topic is %s", entry.getKey().topic())); consumer.seek(entry.getKey(), entry.getValue()); } diff --git a/metadata-service/events-service/src/test/java/io/datahubproject/event/ExternalEventsServiceTest.java b/metadata-service/events-service/src/test/java/io/datahubproject/event/ExternalEventsServiceTest.java index 8400ce68a31fe..a0b2d9cadef01 100644 --- a/metadata-service/events-service/src/test/java/io/datahubproject/event/ExternalEventsServiceTest.java +++ b/metadata-service/events-service/src/test/java/io/datahubproject/event/ExternalEventsServiceTest.java @@ -37,6 +37,12 @@ public void setUp() throws Exception { MockitoAnnotations.initMocks(this); when(consumerPool.borrowConsumer()).thenReturn(kafkaConsumer); topicNames.put(ExternalEventsService.PLATFORM_EVENT_TOPIC_NAME, "CustomerSpecificTopicName"); + topicNames.put( + ExternalEventsService.METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, + "CustomerSpecificVersionedTopicName"); + topicNames.put( + ExternalEventsService.METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME, + "CustomerSpecificTimeseriesTopicName"); service = new ExternalEventsService(consumerPool, objectMapper, topicNames, 10, 100); // Setup to simulate fetching records from Kafka @@ -99,6 +105,38 @@ public void testPollValidTopic() throws Exception { verify(kafkaConsumer, atLeastOnce()).poll(any()); } + @Test + public void testPollValidMetadataChangeLogVersionedTopic() throws Exception { + // Mocking Kafka and ObjectMapper behaviors + when(kafkaConsumer.partitionsFor(anyString())).thenReturn(Collections.emptyList()); + when(objectMapper.writeValueAsString(any())).thenReturn("encodedString"); + + // Execute + ExternalEvents events = + service.poll( + ExternalEventsService.METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, null, 10, 5, null); + + // Validate + assertNotNull(events); + verify(kafkaConsumer, atLeastOnce()).poll(any()); + } + + @Test + public void testPollValidMetadataChangeLogTimeseriesTopic() throws Exception { + // Mocking Kafka and ObjectMapper behaviors + when(kafkaConsumer.partitionsFor(anyString())).thenReturn(Collections.emptyList()); + when(objectMapper.writeValueAsString(any())).thenReturn("encodedString"); + + // Execute + ExternalEvents events = + service.poll( + ExternalEventsService.METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME, null, 10, 5, null); + + // Validate + assertNotNull(events); + verify(kafkaConsumer, atLeastOnce()).poll(any()); + } + @Test(expectedExceptions = UnsupportedTopicException.class) public void testPollInvalidTopic() throws Exception { service.poll("InvalidTopic", null, 10, 5, null); diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/event/ExternalEventsServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/event/ExternalEventsServiceFactory.java index 3ef96877573dd..4a543c009d63b 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/event/ExternalEventsServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/event/ExternalEventsServiceFactory.java @@ -1,5 +1,7 @@ package com.linkedin.gms.factory.event; +import static io.datahubproject.event.ExternalEventsService.METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME; +import static io.datahubproject.event.ExternalEventsService.METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME; import static io.datahubproject.event.ExternalEventsService.PLATFORM_EVENT_TOPIC_NAME; import com.fasterxml.jackson.databind.ObjectMapper; @@ -39,6 +41,12 @@ public ExternalEventsService externalEventsService() { private Map buildTopicNameMappings() { final Map topicNames = new HashMap<>(); topicNames.put(PLATFORM_EVENT_TOPIC_NAME, topicConvention.getPlatformEventTopicName()); + topicNames.put( + METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, + topicConvention.getMetadataChangeLogVersionedTopicName()); + topicNames.put( + METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME, + topicConvention.getMetadataChangeLogTimeseriesTopicName()); return topicNames; } } diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v1/event/ExternalEventsController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v1/event/ExternalEventsController.java index deb9c1f25acac..816bc61a525bf 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v1/event/ExternalEventsController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v1/event/ExternalEventsController.java @@ -80,7 +80,7 @@ public ResponseEntity poll( name = "topic", required = true, description = - "The topic to read events for. Currently only supports PlatformEvent_v1, which provides Platform Events such as EntityChangeEvent and NotificationRequestEvent.") + "The topic to read events for. Currently only supports PlatformEvent_v1, which provides Platform Events such as EntityChangeEvent and NotificationRequestEvents and MetadataChangeLog_v1, which provides all aspect updates.") @RequestParam(name = "topic", required = true) String topic, @Parameter(name = "offsetId", description = "The offset to start reading the topic from") @@ -143,6 +143,12 @@ private boolean isAuthorizedToGetEvents( if (Topics.PLATFORM_EVENT.equals(topic)) { return AuthUtil.isAPIAuthorized(opContext, PoliciesConfig.GET_PLATFORM_EVENTS_PRIVILEGE); } + if (Topics.METADATA_CHANGE_LOG_VERSIONED.equals(topic)) { + return AuthUtil.isAPIAuthorized(opContext, PoliciesConfig.GET_METADATA_CHANGE_LOG_EVENTS); + } + if (Topics.METADATA_CHANGE_LOG_TIMESERIES.equals(topic)) { + return AuthUtil.isAPIAuthorized(opContext, PoliciesConfig.GET_METADATA_CHANGE_LOG_EVENTS); + } return false; } diff --git a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v1/event/ExternalEventsControllerTest.java b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v1/event/ExternalEventsControllerTest.java index 6f54f9bae5171..e5e69ef736a6e 100644 --- a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v1/event/ExternalEventsControllerTest.java +++ b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v1/event/ExternalEventsControllerTest.java @@ -1,5 +1,7 @@ package io.datahubproject.openapi.v1.event; +import static io.datahubproject.event.ExternalEventsService.METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME; +import static io.datahubproject.event.ExternalEventsService.METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME; import static io.datahubproject.event.ExternalEventsService.PLATFORM_EVENT_TOPIC_NAME; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -299,6 +301,250 @@ public void testPollWithUnsupportedTopic() throws Exception { .andExpect(status().isForbidden()); } + @Test + public void testPollMetadataChangeLogVersionedTopic() throws Exception { + // Setup mock authorization + when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class))) + .thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, "")); + + // Setup mock response + List events = new ArrayList<>(); + ExternalEvent event = new ExternalEvent(); + event.setValue(TEST_CONTENT); + event.setContentType(TEST_CONTENT_TYPE); + events.add(event); + + ExternalEvents externalEvents = new ExternalEvents(); + externalEvents.setEvents(events); + externalEvents.setOffsetId(TEST_OFFSET_ID); + externalEvents.setCount(1L); + + when(mockEventsService.poll( + eq(METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME), + nullable(String.class), + anyInt(), + anyInt(), + nullable(Integer.class))) + .thenReturn(externalEvents); + + // Execute test + mockMvc + .perform( + MockMvcRequestBuilders.get("/openapi/v1/events/poll") + .param("topic", METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME) + .param("limit", "100") + .param("pollTimeoutSeconds", "10") + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1)) + .andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value(TEST_OFFSET_ID)) + .andExpect(MockMvcResultMatchers.jsonPath("$.events[0].value").value(TEST_CONTENT)) + .andExpect( + MockMvcResultMatchers.jsonPath("$.events[0].contentType").value(TEST_CONTENT_TYPE)); + } + + @Test + public void testPollMetadataChangeLogTimeseriesTopic() throws Exception { + // Setup mock authorization + when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class))) + .thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, "")); + + // Setup mock response + List events = new ArrayList<>(); + ExternalEvent event = new ExternalEvent(); + event.setValue(TEST_CONTENT); + event.setContentType(TEST_CONTENT_TYPE); + events.add(event); + + ExternalEvents externalEvents = new ExternalEvents(); + externalEvents.setEvents(events); + externalEvents.setOffsetId(TEST_OFFSET_ID); + externalEvents.setCount(1L); + + when(mockEventsService.poll( + eq(METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME), + nullable(String.class), + anyInt(), + anyInt(), + nullable(Integer.class))) + .thenReturn(externalEvents); + + // Execute test + mockMvc + .perform( + MockMvcRequestBuilders.get("/openapi/v1/events/poll") + .param("topic", METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME) + .param("limit", "100") + .param("pollTimeoutSeconds", "10") + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1)) + .andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value(TEST_OFFSET_ID)) + .andExpect(MockMvcResultMatchers.jsonPath("$.events[0].value").value(TEST_CONTENT)) + .andExpect( + MockMvcResultMatchers.jsonPath("$.events[0].contentType").value(TEST_CONTENT_TYPE)); + } + + @Test + public void testPollMetadataChangeLogVersionedTopicWithOffset() throws Exception { + // Setup mock authorization + when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class))) + .thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, "")); + + // Setup mock response + List events = new ArrayList<>(); + ExternalEvent event = new ExternalEvent(); + event.setValue(TEST_CONTENT); + event.setContentType(TEST_CONTENT_TYPE); + events.add(event); + + ExternalEvents externalEvents = new ExternalEvents(); + externalEvents.setEvents(events); + externalEvents.setOffsetId("new-offset-id"); + externalEvents.setCount(1L); + + when(mockEventsService.poll( + eq(METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME), + eq(TEST_OFFSET_ID), + anyInt(), + anyInt(), + nullable(Integer.class))) + .thenReturn(externalEvents); + + // Execute test + mockMvc + .perform( + MockMvcRequestBuilders.get("/openapi/v1/events/poll") + .param("topic", METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME) + .param("offsetId", TEST_OFFSET_ID) + .param("limit", "100") + .param("pollTimeoutSeconds", "10") + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1)) + .andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value("new-offset-id")); + } + + @Test + public void testPollMetadataChangeLogTimeseriesTopicWithOffset() throws Exception { + // Setup mock authorization + when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class))) + .thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, "")); + + // Setup mock response + List events = new ArrayList<>(); + ExternalEvent event = new ExternalEvent(); + event.setValue(TEST_CONTENT); + event.setContentType(TEST_CONTENT_TYPE); + events.add(event); + + ExternalEvents externalEvents = new ExternalEvents(); + externalEvents.setEvents(events); + externalEvents.setOffsetId("new-offset-id"); + externalEvents.setCount(1L); + + when(mockEventsService.poll( + eq(METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME), + eq(TEST_OFFSET_ID), + anyInt(), + anyInt(), + nullable(Integer.class))) + .thenReturn(externalEvents); + + // Execute test + mockMvc + .perform( + MockMvcRequestBuilders.get("/openapi/v1/events/poll") + .param("topic", METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME) + .param("offsetId", TEST_OFFSET_ID) + .param("limit", "100") + .param("pollTimeoutSeconds", "10") + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1)) + .andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value("new-offset-id")); + } + + @Test + public void testPollMetadataChangeLogVersionedTopicWithLookbackWindow() throws Exception { + // Setup mock authorization + when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class))) + .thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, "")); + + // Setup mock response + List events = new ArrayList<>(); + ExternalEvent event = new ExternalEvent(); + event.setValue(TEST_CONTENT); + event.setContentType(TEST_CONTENT_TYPE); + events.add(event); + + ExternalEvents externalEvents = new ExternalEvents(); + externalEvents.setEvents(events); + externalEvents.setOffsetId(TEST_OFFSET_ID); + externalEvents.setCount(1L); + + when(mockEventsService.poll( + eq(METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME), + nullable(String.class), + anyInt(), + anyInt(), + eq(7))) + .thenReturn(externalEvents); + + // Execute test + mockMvc + .perform( + MockMvcRequestBuilders.get("/openapi/v1/events/poll") + .param("topic", METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME) + .param("lookbackWindowDays", "7") + .param("limit", "100") + .param("pollTimeoutSeconds", "10") + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1)) + .andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value(TEST_OFFSET_ID)); + } + + @Test + public void testPollMetadataChangeLogTimeseriesTopicWithLookbackWindow() throws Exception { + // Setup mock authorization + when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class))) + .thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, "")); + + // Setup mock response + List events = new ArrayList<>(); + ExternalEvent event = new ExternalEvent(); + event.setValue(TEST_CONTENT); + event.setContentType(TEST_CONTENT_TYPE); + events.add(event); + + ExternalEvents externalEvents = new ExternalEvents(); + externalEvents.setEvents(events); + externalEvents.setOffsetId(TEST_OFFSET_ID); + externalEvents.setCount(1L); + + when(mockEventsService.poll( + eq(METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME), + nullable(String.class), + anyInt(), + anyInt(), + eq(7))) + .thenReturn(externalEvents); + + // Execute test + mockMvc + .perform( + MockMvcRequestBuilders.get("/openapi/v1/events/poll") + .param("topic", METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME) + .param("lookbackWindowDays", "7") + .param("limit", "100") + .param("pollTimeoutSeconds", "10") + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1)) + .andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value(TEST_OFFSET_ID)); + } + @Test public void testAuditEventsSearch() throws Exception { when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class))) diff --git a/metadata-service/war/src/main/resources/boot/policies.json b/metadata-service/war/src/main/resources/boot/policies.json index 4c76d06bf2a42..c98df8e577d0d 100644 --- a/metadata-service/war/src/main/resources/boot/policies.json +++ b/metadata-service/war/src/main/resources/boot/policies.json @@ -41,7 +41,8 @@ "MANAGE_FEATURES", "MANAGE_SYSTEM_OPERATIONS", "GET_PLATFORM_EVENTS", - "MANAGE_HOME_PAGE_TEMPLATES" + "MANAGE_HOME_PAGE_TEMPLATES", + "GET_METADATA_CHANGE_LOG_EVENTS" ], "displayName": "Root User - All Platform Privileges", "description": "Grants all platform privileges to root user.", @@ -198,7 +199,8 @@ "MANAGE_FEATURES", "MANAGE_SYSTEM_OPERATIONS", "GET_PLATFORM_EVENTS", - "MANAGE_HOME_PAGE_TEMPLATES" + "MANAGE_HOME_PAGE_TEMPLATES", + "GET_METADATA_CHANGE_LOG_EVENTS" ], "displayName": "Admins - Platform Policy", "description": "Admins have all platform privileges.", diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/authorization/PoliciesConfig.java b/metadata-utils/src/main/java/com/linkedin/metadata/authorization/PoliciesConfig.java index 68212aed39c57..2ae729c5d3d33 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/authorization/PoliciesConfig.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/authorization/PoliciesConfig.java @@ -212,6 +212,12 @@ public class PoliciesConfig { "Get Platform Events", "The ability to use the Events API to read Platform Events - Entity Change Events and Notification Request Events."); + public static final Privilege GET_METADATA_CHANGE_LOG_EVENTS = + Privilege.of( + "GET_METADATA_CHANGE_LOG_EVENTS", + "Get Metadata Change Log Events", + "The ability to use the Events API to read Metadata Change Log, or all low-level Metadata Change Events."); + public static final Privilege MANAGE_HOME_PAGE_TEMPLATES_PRIVILEGE = Privilege.of( "MANAGE_HOME_PAGE_TEMPLATES", @@ -256,6 +262,7 @@ public class PoliciesConfig { MANAGE_FEATURES_PRIVILEGE, MANAGE_SYSTEM_OPERATIONS_PRIVILEGE, GET_PLATFORM_EVENTS_PRIVILEGE, + GET_METADATA_CHANGE_LOG_EVENTS, MANAGE_HOME_PAGE_TEMPLATES_PRIVILEGE); // Resource Privileges //