diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 02c8f71a0935..6e37b11e57cb 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -5,6 +5,8 @@ ### Features Added - Customer Facing Statsbeat: Added remaining drop codes to base ([#42382](https://github.com/Azure/azure-sdk-for-python/pull/42382)) +- Refactored the put methods in storage.py for LocalFileBlob and LocalFileStorage + ([#42502](https://github.com/Azure/azure-sdk-for-python/pull/42502)) ### Breaking Changes diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_storage.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_storage.py index 280af2f87d55..025b64718cb7 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_storage.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_storage.py @@ -8,6 +8,7 @@ import random import subprocess import errno +from typing import Union from enum import Enum from azure.monitor.opentelemetry.exporter._utils import PeriodicTask @@ -37,6 +38,7 @@ def _seconds(seconds): return datetime.timedelta(seconds=seconds) class StorageExportResult(Enum): + LOCAL_FILE_BLOB_SUCCESS = 0 CLIENT_STORAGE_DISABLED = 1 CLIENT_PERSISTENCE_CAPACITY_REACHED = 2 CLIENT_READONLY = 3 @@ -60,9 +62,7 @@ def get(self): pass # keep silent return None - def put(self, data, lease_period=0): - #TODO: Modify method to remove the return of self as it is not being used anywhere. - # Add typing to method + def put(self, data, lease_period=0) -> Union[StorageExportResult, str]: try: fullpath = self.fullpath + ".tmp" with open(fullpath, "w", encoding="utf-8") as file: @@ -76,7 +76,7 @@ def put(self, data, lease_period=0): timestamp = _now() + _seconds(lease_period) self.fullpath += "@{}.lock".format(_fmt(timestamp)) os.rename(fullpath, self.fullpath) - return self + return StorageExportResult.LOCAL_FILE_BLOB_SUCCESS except Exception as ex: return str(ex) @@ -194,10 +194,7 @@ def get(self): pass return None - def put(self, data, lease_period=None): - # TODO: Remove the blob.put result as we are not using it anywhere and use StorageExportResult instead, - # Should still capture exceptions returned from LocalFileBlob.put - # Add typing for method + def put(self, data, lease_period=None) -> Union[StorageExportResult, str]: try: if not self._enabled: if get_local_storage_setup_state_readonly(): diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py index 73af4eeb72ee..e34ecd6e52ea 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py @@ -223,7 +223,7 @@ def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result: # For any exceptions occurred in put method of either LocalFileStorage or LocalFileBlob, track dropped item with reason _track_dropped_items(self._customer_statsbeat_metrics, envelopes, DropCode.CLIENT_EXCEPTION, result_from_storage_put) else: - # LocalFileBlob.put returns either an exception(failure, handled above) or the file path(success), eventually that will be removed since this value is not being utilized elsewhere + # LocalFileBlob.put returns StorageExportResult.LOCAL_FILE_BLOB_SUCCESS here. Don't need to track anything in this case. pass elif result == ExportResult.SUCCESS: # Try to send any cached events diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py index 9e2b28cdfca4..87d92aaee28a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py @@ -1066,6 +1066,32 @@ def invalid_get_token(): ValueError, _get_auth_policy, credential=InvalidTestCredential(), default_auth_policy=TEST_AUTH_POLICY ) + @mock.patch("azure.monitor.opentelemetry.exporter.statsbeat._utils._track_dropped_items") + @mock.patch("azure.monitor.opentelemetry.exporter.statsbeat._utils._track_dropped_items") + def test_handle_transmit_from_storage_success_result(self, mock_track_dropped1, mock_track_dropped2): + """Test that when storage.put() returns StorageExportResult.LOCAL_FILE_BLOB_SUCCESS, + the method continues without any special handling.""" + exporter = BaseExporter(disable_offline_storage=False) + mock_customer_statsbeat = mock.Mock() + exporter._customer_statsbeat_metrics = mock_customer_statsbeat + exporter._should_collect_customer_statsbeat = mock.Mock(return_value=True) + + # Mock storage.put() to return success + exporter.storage = mock.Mock() + exporter.storage.put.return_value = StorageExportResult.LOCAL_FILE_BLOB_SUCCESS + + test_envelopes = [TelemetryItem(name="test", time=datetime.now())] + serialized_envelopes = [envelope.as_dict() for envelope in test_envelopes] + exporter._handle_transmit_from_storage(test_envelopes, ExportResult.FAILED_RETRYABLE) + + # Verify storage.put was called with the serialized envelopes + exporter.storage.put.assert_called_once_with(serialized_envelopes) + # Verify that no dropped items were tracked (since it was a success) + mock_track_dropped1.assert_not_called() + mock_track_dropped2.assert_not_called() + # Verify that the customer statsbeat wasn't invoked + mock_customer_statsbeat.assert_not_called() + def test_get_auth_policy_audience(self): class TestCredential: def get_token(): @@ -2162,6 +2188,36 @@ def test_handle_transmit_from_storage_success_path_validation(self, mock_track_d }, ) @mock.patch('azure.monitor.opentelemetry.exporter.export._base._track_dropped_items') + @mock.patch.dict( + os.environ, + { + "APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL": "true", + "APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW": "true", + }, + ) + @mock.patch('azure.monitor.opentelemetry.exporter.export._base._track_dropped_items') + def test_handle_transmit_from_storage_unexpected_return_value(self, mock_track_dropped1, mock_track_dropped2): + """Test that when storage.put() returns an unexpected value type (not StorageExportResult or str), + the method continues without any special handling.""" + exporter = BaseExporter(disable_offline_storage=False) + mock_customer_statsbeat = mock.Mock() + exporter._customer_statsbeat_metrics = mock_customer_statsbeat + exporter._should_collect_customer_statsbeat = mock.Mock(return_value=True) + + # Mock storage.put() to return an unexpected value type (int) + exporter.storage = mock.Mock() + exporter.storage.put.return_value = 42 # Neither StorageExportResult nor str + + test_envelopes = [TelemetryItem(name="test", time=datetime.now())] + exporter._handle_transmit_from_storage(test_envelopes, ExportResult.FAILED_RETRYABLE) + + # Verify that no dropped items were tracked (since return value isn't handled) + mock_track_dropped1.assert_not_called() + mock_track_dropped2.assert_not_called() + # Verify that the customer statsbeat wasn't invoked + mock_customer_statsbeat.assert_not_called() + + @mock.patch("azure.monitor.opentelemetry.exporter.export._base._track_dropped_items") def test_handle_transmit_from_storage_string_return_values_trigger_exception_tracking(self, mock_track_dropped): """Test that string return values from storage.put() trigger CLIENT_EXCEPTION tracking""" # Save original state diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_storage.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_storage.py index 934e2140fef7..a07b5f654eba 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_storage.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_storage.py @@ -70,9 +70,9 @@ def test_put_success_returns_self(self): test_input = [1, 2, 3] result = blob.put(test_input) # Should return the blob itself (self) on success - self.assertIsInstance(result, LocalFileBlob) - self.assertEqual(result, blob) - + self.assertIsInstance(result, StorageExportResult) + self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS) + def test_put_file_write_error_returns_string(self): blob = LocalFileBlob(os.path.join(TEST_FOLDER, "write_error_blob")) test_input = [1, 2, 3] @@ -129,8 +129,8 @@ def test_put_with_lease_period_success(self): lease_period = 60 result = blob.put(test_input, lease_period=lease_period) - self.assertIsInstance(result, LocalFileBlob) - self.assertEqual(result, blob) + self.assertIsInstance(result, StorageExportResult) + self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS) # File should have .lock extension due to lease period self.assertTrue(blob.fullpath.endswith(".lock")) @@ -150,8 +150,8 @@ def test_put_empty_data_success(self): empty_data = [] result = blob.put(empty_data) - self.assertIsInstance(result, LocalFileBlob) - self.assertEqual(result, blob) + self.assertIsInstance(result, StorageExportResult) + self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS) def test_put_large_data_success(self): blob = LocalFileBlob(os.path.join(TEST_FOLDER, "large_data_blob")) @@ -159,8 +159,8 @@ def test_put_large_data_success(self): large_data = [{"id": i, "value": f"data_{i}"} for i in range(1000)] result = blob.put(large_data) - self.assertIsInstance(result, LocalFileBlob) - self.assertEqual(result, blob) + self.assertIsInstance(result, StorageExportResult) + self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS) # Verify data can be retrieved retrieved_data = blob.get() @@ -174,13 +174,25 @@ def test_put_return_type_consistency(self): # Test successful case result_success = blob.put(test_input) - self.assertTrue(isinstance(result_success, LocalFileBlob) or isinstance(result_success, str)) + self.assertTrue(isinstance(result_success, StorageExportResult) or isinstance(result_success, str)) # Test error case blob2 = LocalFileBlob(os.path.join(TEST_FOLDER, "consistency_blob2")) with mock.patch("os.rename", side_effect=Exception("Test error")): result_error = blob2.put(test_input) self.assertIsInstance(result_error, str) + + def test_put_invalid_return_type(self): + blob = LocalFileBlob(os.path.join(TEST_FOLDER, "invalid_return_blob")) + test_input = [1, 2, 3] + + # This tests that even if os.rename somehow returns something unexpected, + # the put method still maintains its type contract + with mock.patch("os.rename", return_value=42): + result = blob.put(test_input) + # Should either convert to string or return StorageExportResult + self.assertTrue(isinstance(result, (StorageExportResult, str)), + f"Expected StorageExportResult or str, got {type(result)}") @unittest.skip("transient storage") def test_put(self): @@ -334,7 +346,7 @@ def test_put_success_returns_localfileblob(self): test_input = (1, 2, 3) with LocalFileStorage(os.path.join(TEST_FOLDER, "success_test")) as stor: result = stor.put(test_input, lease_period=0) # No lease period so file is immediately available - self.assertIsInstance(result, LocalFileBlob) + self.assertIsInstance(result, StorageExportResult) self.assertEqual(stor.get().get(), test_input) def test_put_blob_put_failure_returns_string(self): @@ -379,19 +391,19 @@ def test_put_with_lease_period(self): with LocalFileStorage(os.path.join(TEST_FOLDER, "lease_test")) as stor: result = stor.put(test_input, lease_period=custom_lease_period) - self.assertIsInstance(result, LocalFileBlob) + self.assertIsInstance(result, StorageExportResult) # Verify the file was created with lease period - self.assertTrue(result.fullpath.endswith(".lock")) + self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS) def test_put_default_lease_period(self): test_input = (1, 2, 3) with LocalFileStorage(os.path.join(TEST_FOLDER, "default_lease_test"), lease_period=90) as stor: result = stor.put(test_input) - self.assertIsInstance(result, LocalFileBlob) + self.assertIsInstance(result, StorageExportResult) # File should be created with lease (since default lease_period > 0) - self.assertTrue(result.fullpath.endswith(".lock")) - + self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS) + def test_check_and_set_folder_permissions_oserror_sets_exception_state(self): test_input = (1, 2, 3) test_error_message = "OSError: Permission denied creating directory" @@ -671,7 +683,7 @@ def test_exception_state_cleared_and_storage_recovery(self): with LocalFileStorage(os.path.join(TEST_FOLDER, "recovery_test2")) as stor2: if stor2._enabled: # Storage should be enabled now result = stor2.put(test_input, lease_period=0) - self.assertIsInstance(result, LocalFileBlob) + self.assertIsInstance(result, StorageExportResult) retrieved_data = stor2.get().get() self.assertEqual(retrieved_data, test_input) @@ -740,6 +752,17 @@ def test_readonly_state_interaction_with_storage_put_method(self): # When storage is disabled and readonly state is set, put() should return CLIENT_READONLY result = stor.put(test_input) self.assertEqual(result, StorageExportResult.CLIENT_READONLY) + + def test_storage_put_invalid_return_type(self): + test_input = (1, 2, 3) + + with LocalFileStorage(os.path.join(TEST_FOLDER, "invalid_return_test")) as stor: + # Mock _check_storage_size to return a non-boolean value + with mock.patch.object(stor, '_check_storage_size', return_value=42): + result = stor.put(test_input) + # Should maintain return type contract despite invalid internal return + self.assertTrue(isinstance(result, (StorageExportResult, str)), + f"Expected StorageExportResult or str, got {type(result)}") def test_readonly_state_priority_over_exception_state(self): test_input = (1, 2, 3)