Skip to content

Refactored storage.py put methods #42502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
### Features Added
- Customer Facing Statsbeat: Added remaining drop codes to base
([#42382](https://github.com/Azure/azure-sdk-for-python/pull/42382))
- Refactored the put methods in storage.py for LocalFileBlob and LocalFileStorage
([#42502](https://github.com/Azure/azure-sdk-for-python/pull/42502))

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import random
import subprocess
import errno
from typing import Union
from enum import Enum

from azure.monitor.opentelemetry.exporter._utils import PeriodicTask
Expand Down Expand Up @@ -37,6 +38,7 @@ def _seconds(seconds):
return datetime.timedelta(seconds=seconds)

class StorageExportResult(Enum):
LOCAL_FILE_BLOB_SUCCESS = 0
CLIENT_STORAGE_DISABLED = 1
CLIENT_PERSISTENCE_CAPACITY_REACHED = 2
CLIENT_READONLY = 3
Expand All @@ -60,9 +62,7 @@ def get(self):
pass # keep silent
return None

def put(self, data, lease_period=0):
#TODO: Modify method to remove the return of self as it is not being used anywhere.
# Add typing to method
def put(self, data, lease_period=0) -> Union[StorageExportResult, str]:
try:
fullpath = self.fullpath + ".tmp"
with open(fullpath, "w", encoding="utf-8") as file:
Expand All @@ -76,7 +76,7 @@ def put(self, data, lease_period=0):
timestamp = _now() + _seconds(lease_period)
self.fullpath += "@{}.lock".format(_fmt(timestamp))
os.rename(fullpath, self.fullpath)
return self
return StorageExportResult.LOCAL_FILE_BLOB_SUCCESS
except Exception as ex:
return str(ex)

Expand Down Expand Up @@ -194,10 +194,7 @@ def get(self):
pass
return None

def put(self, data, lease_period=None):
# TODO: Remove the blob.put result as we are not using it anywhere and use StorageExportResult instead,
# Should still capture exceptions returned from LocalFileBlob.put
# Add typing for method
def put(self, data, lease_period=None) -> Union[StorageExportResult, str]:
try:
if not self._enabled:
if get_local_storage_setup_state_readonly():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result:
# For any exceptions occurred in put method of either LocalFileStorage or LocalFileBlob, track dropped item with reason
_track_dropped_items(self._customer_statsbeat_metrics, envelopes, DropCode.CLIENT_EXCEPTION, result_from_storage_put)
else:
# LocalFileBlob.put returns either an exception(failure, handled above) or the file path(success), eventually that will be removed since this value is not being utilized elsewhere
# LocalFileBlob.put returns StorageExportResult.LOCAL_FILE_BLOB_SUCCESS here. Don't need to track anything in this case.
pass
elif result == ExportResult.SUCCESS:
# Try to send any cached events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,32 @@ def invalid_get_token():
ValueError, _get_auth_policy, credential=InvalidTestCredential(), default_auth_policy=TEST_AUTH_POLICY
)

@mock.patch("azure.monitor.opentelemetry.exporter.statsbeat._utils._track_dropped_items")
@mock.patch("azure.monitor.opentelemetry.exporter.statsbeat._utils._track_dropped_items")
def test_handle_transmit_from_storage_success_result(self, mock_track_dropped1, mock_track_dropped2):
"""Test that when storage.put() returns StorageExportResult.LOCAL_FILE_BLOB_SUCCESS,
the method continues without any special handling."""
exporter = BaseExporter(disable_offline_storage=False)
mock_customer_statsbeat = mock.Mock()
exporter._customer_statsbeat_metrics = mock_customer_statsbeat
exporter._should_collect_customer_statsbeat = mock.Mock(return_value=True)

# Mock storage.put() to return success
exporter.storage = mock.Mock()
exporter.storage.put.return_value = StorageExportResult.LOCAL_FILE_BLOB_SUCCESS

test_envelopes = [TelemetryItem(name="test", time=datetime.now())]
serialized_envelopes = [envelope.as_dict() for envelope in test_envelopes]
exporter._handle_transmit_from_storage(test_envelopes, ExportResult.FAILED_RETRYABLE)

# Verify storage.put was called with the serialized envelopes
exporter.storage.put.assert_called_once_with(serialized_envelopes)
# Verify that no dropped items were tracked (since it was a success)
mock_track_dropped1.assert_not_called()
mock_track_dropped2.assert_not_called()
# Verify that the customer statsbeat wasn't invoked
mock_customer_statsbeat.assert_not_called()

def test_get_auth_policy_audience(self):
class TestCredential:
def get_token():
Expand Down Expand Up @@ -2162,6 +2188,36 @@ def test_handle_transmit_from_storage_success_path_validation(self, mock_track_d
},
)
@mock.patch('azure.monitor.opentelemetry.exporter.export._base._track_dropped_items')
@mock.patch.dict(
os.environ,
{
"APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL": "true",
"APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW": "true",
},
)
@mock.patch('azure.monitor.opentelemetry.exporter.export._base._track_dropped_items')
def test_handle_transmit_from_storage_unexpected_return_value(self, mock_track_dropped1, mock_track_dropped2):
"""Test that when storage.put() returns an unexpected value type (not StorageExportResult or str),
the method continues without any special handling."""
exporter = BaseExporter(disable_offline_storage=False)
mock_customer_statsbeat = mock.Mock()
exporter._customer_statsbeat_metrics = mock_customer_statsbeat
exporter._should_collect_customer_statsbeat = mock.Mock(return_value=True)

# Mock storage.put() to return an unexpected value type (int)
exporter.storage = mock.Mock()
exporter.storage.put.return_value = 42 # Neither StorageExportResult nor str

test_envelopes = [TelemetryItem(name="test", time=datetime.now())]
exporter._handle_transmit_from_storage(test_envelopes, ExportResult.FAILED_RETRYABLE)

# Verify that no dropped items were tracked (since return value isn't handled)
mock_track_dropped1.assert_not_called()
mock_track_dropped2.assert_not_called()
# Verify that the customer statsbeat wasn't invoked
mock_customer_statsbeat.assert_not_called()

@mock.patch("azure.monitor.opentelemetry.exporter.export._base._track_dropped_items")
def test_handle_transmit_from_storage_string_return_values_trigger_exception_tracking(self, mock_track_dropped):
"""Test that string return values from storage.put() trigger CLIENT_EXCEPTION tracking"""
# Save original state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ def test_put_success_returns_self(self):
test_input = [1, 2, 3]
result = blob.put(test_input)
# Should return the blob itself (self) on success
self.assertIsInstance(result, LocalFileBlob)
self.assertEqual(result, blob)
self.assertIsInstance(result, StorageExportResult)
self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS)

def test_put_file_write_error_returns_string(self):
blob = LocalFileBlob(os.path.join(TEST_FOLDER, "write_error_blob"))
test_input = [1, 2, 3]
Expand Down Expand Up @@ -129,8 +129,8 @@ def test_put_with_lease_period_success(self):
lease_period = 60

result = blob.put(test_input, lease_period=lease_period)
self.assertIsInstance(result, LocalFileBlob)
self.assertEqual(result, blob)
self.assertIsInstance(result, StorageExportResult)
self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS)
# File should have .lock extension due to lease period
self.assertTrue(blob.fullpath.endswith(".lock"))

Expand All @@ -150,17 +150,17 @@ def test_put_empty_data_success(self):
empty_data = []

result = blob.put(empty_data)
self.assertIsInstance(result, LocalFileBlob)
self.assertEqual(result, blob)
self.assertIsInstance(result, StorageExportResult)
self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS)

def test_put_large_data_success(self):
blob = LocalFileBlob(os.path.join(TEST_FOLDER, "large_data_blob"))
# Create a large list of data
large_data = [{"id": i, "value": f"data_{i}"} for i in range(1000)]

result = blob.put(large_data)
self.assertIsInstance(result, LocalFileBlob)
self.assertEqual(result, blob)
self.assertIsInstance(result, StorageExportResult)
self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS)

# Verify data can be retrieved
retrieved_data = blob.get()
Expand All @@ -174,13 +174,25 @@ def test_put_return_type_consistency(self):

# Test successful case
result_success = blob.put(test_input)
self.assertTrue(isinstance(result_success, LocalFileBlob) or isinstance(result_success, str))
self.assertTrue(isinstance(result_success, StorageExportResult) or isinstance(result_success, str))

# Test error case
blob2 = LocalFileBlob(os.path.join(TEST_FOLDER, "consistency_blob2"))
with mock.patch("os.rename", side_effect=Exception("Test error")):
result_error = blob2.put(test_input)
self.assertIsInstance(result_error, str)

def test_put_invalid_return_type(self):
blob = LocalFileBlob(os.path.join(TEST_FOLDER, "invalid_return_blob"))
test_input = [1, 2, 3]

# This tests that even if os.rename somehow returns something unexpected,
# the put method still maintains its type contract
with mock.patch("os.rename", return_value=42):
result = blob.put(test_input)
# Should either convert to string or return StorageExportResult
self.assertTrue(isinstance(result, (StorageExportResult, str)),
f"Expected StorageExportResult or str, got {type(result)}")

@unittest.skip("transient storage")
def test_put(self):
Expand Down Expand Up @@ -334,7 +346,7 @@ def test_put_success_returns_localfileblob(self):
test_input = (1, 2, 3)
with LocalFileStorage(os.path.join(TEST_FOLDER, "success_test")) as stor:
result = stor.put(test_input, lease_period=0) # No lease period so file is immediately available
self.assertIsInstance(result, LocalFileBlob)
self.assertIsInstance(result, StorageExportResult)
self.assertEqual(stor.get().get(), test_input)

def test_put_blob_put_failure_returns_string(self):
Expand Down Expand Up @@ -379,19 +391,19 @@ def test_put_with_lease_period(self):

with LocalFileStorage(os.path.join(TEST_FOLDER, "lease_test")) as stor:
result = stor.put(test_input, lease_period=custom_lease_period)
self.assertIsInstance(result, LocalFileBlob)
self.assertIsInstance(result, StorageExportResult)
# Verify the file was created with lease period
self.assertTrue(result.fullpath.endswith(".lock"))
self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS)

def test_put_default_lease_period(self):
test_input = (1, 2, 3)

with LocalFileStorage(os.path.join(TEST_FOLDER, "default_lease_test"), lease_period=90) as stor:
result = stor.put(test_input)
self.assertIsInstance(result, LocalFileBlob)
self.assertIsInstance(result, StorageExportResult)
# File should be created with lease (since default lease_period > 0)
self.assertTrue(result.fullpath.endswith(".lock"))
self.assertEqual(result, StorageExportResult.LOCAL_FILE_BLOB_SUCCESS)

def test_check_and_set_folder_permissions_oserror_sets_exception_state(self):
test_input = (1, 2, 3)
test_error_message = "OSError: Permission denied creating directory"
Expand Down Expand Up @@ -671,7 +683,7 @@ def test_exception_state_cleared_and_storage_recovery(self):
with LocalFileStorage(os.path.join(TEST_FOLDER, "recovery_test2")) as stor2:
if stor2._enabled: # Storage should be enabled now
result = stor2.put(test_input, lease_period=0)
self.assertIsInstance(result, LocalFileBlob)
self.assertIsInstance(result, StorageExportResult)
retrieved_data = stor2.get().get()
self.assertEqual(retrieved_data, test_input)

Expand Down Expand Up @@ -740,6 +752,17 @@ def test_readonly_state_interaction_with_storage_put_method(self):
# When storage is disabled and readonly state is set, put() should return CLIENT_READONLY
result = stor.put(test_input)
self.assertEqual(result, StorageExportResult.CLIENT_READONLY)

def test_storage_put_invalid_return_type(self):
test_input = (1, 2, 3)

with LocalFileStorage(os.path.join(TEST_FOLDER, "invalid_return_test")) as stor:
# Mock _check_storage_size to return a non-boolean value
with mock.patch.object(stor, '_check_storage_size', return_value=42):
result = stor.put(test_input)
# Should maintain return type contract despite invalid internal return
self.assertTrue(isinstance(result, (StorageExportResult, str)),
f"Expected StorageExportResult or str, got {type(result)}")

def test_readonly_state_priority_over_exception_state(self):
test_input = (1, 2, 3)
Expand Down