Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal class BlobLogListener
private readonly StorageAnalyticsLogParser _parser;
private readonly ILogger<BlobListener> _logger;

private BlobLogListener(BlobServiceClient blobClient, ILogger<BlobListener> logger)
public BlobLogListener(BlobServiceClient blobClient, ILogger<BlobListener> logger)
{
_blobClient = blobClient;

Expand Down Expand Up @@ -77,6 +77,53 @@ public async Task<IEnumerable<BlobWithContainer<BlobBaseClient>>> GetRecentBlobW
return blobs;
}

public async Task<bool> HasBlobWritesAsync(CancellationToken cancellationToken, int hoursWindow = DefaultScanHoursWindow)
{
if (hoursWindow <= 0)
{
return false;
}

DateTime hourCursor = DateTime.UtcNow;
BlobContainerClient containerClient = _blobClient.GetBlobContainerClient(LogContainer);

int processedCount = 0;

for (int hourIndex = 0; hourIndex < hoursWindow; hourIndex++)
{
cancellationToken.ThrowIfCancellationRequested();

string prefix = GetSearchPrefix("blob", hourCursor, hourCursor);

await foreach (var page in containerClient
.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: prefix, states: BlobStates.None, cancellationToken: cancellationToken)
.AsPages(pageSizeHint: 200)
.ConfigureAwait(false))
{
cancellationToken.ThrowIfCancellationRequested();

foreach (BlobItem blob in page.Values)
{
// Increment only when we actually look at a blob item.
processedCount++;

// Examine metadata only if present.
if (blob.Metadata is not null &&
blob.Metadata.TryGetValue(LogType, out string logType) &&
!string.IsNullOrEmpty(logType) &&
logType.IndexOf("write", StringComparison.OrdinalIgnoreCase) >= 0)
{
return true;
}
}
}

hourCursor = hourCursor.AddHours(-1);
}

return false;
}

internal static IEnumerable<BlobPath> GetPathsForValidBlobWrites(IEnumerable<StorageAnalyticsLogEntry> entries)
{
IEnumerable<BlobPath> parsedBlobPaths = from entry in entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ public ZeroToOneScaleMonitor(string functionId, BlobServiceClient blobServiceCli
_logger = loggerFactory.CreateLogger<ZeroToOneScaleMonitor>();
}

#pragma warning disable 0649
// For tests, in PROD the value is always null
private BlobWithContainer<BlobBaseClient> _recentWrite;
#pragma warning restore 0649

public ScaleMonitorDescriptor Descriptor => _scaleMonitorDescriptor;

public async Task<ScaleMetrics> GetMetricsAsync()
Expand All @@ -77,26 +72,15 @@ public async Task<ScaleMetrics> GetMetricsAsync()
// if new blob were detected we want to GetScaleStatus return scale out vote at least once
if (Interlocked.Equals(_threadSafeWritesDetectedValue, 1))
{
_logger.LogInformation($"New writes were detectd but GetScaleStatus was not called. Waiting GetScaleStatus to call.");
_logger.LogInformation($"Recent writes were detected but GetScaleStatus was not called. Waiting GetScaleStatus to call.");
return new ScaleMetrics();
}

var blobLogListener = await _blobLogListener.Value.ConfigureAwait(false);
BlobWithContainer<BlobBaseClient>[] recentWrites = _recentWrite == null ? (await blobLogListener.GetRecentBlobWritesAsync(CancellationToken.None).ConfigureAwait(false)).ToArray()
: new BlobWithContainer<BlobBaseClient>[] { _recentWrite };
if (recentWrites.Length > 0)
bool hasBlobWrites = await blobLogListener.HasBlobWritesAsync(CancellationToken.None).ConfigureAwait(false);
if (hasBlobWrites)
{
StringBuilder stringBuilder = new StringBuilder();
foreach (var write in recentWrites)
{
stringBuilder.Append($"'{write.BlobClient.Name}', ");
if (stringBuilder.Length > 1000)
{
stringBuilder.Append("[truncated]");
break;
}
}
_logger.LogInformation($"'{recentWrites.Length}' recent writes were detected for '{_scaleMonitorDescriptor.FunctionId}': {stringBuilder}");
_logger.LogInformation($"Recent writes were detected for '{_scaleMonitorDescriptor.FunctionId}'");
Interlocked.CompareExchange(ref _threadSafeWritesDetectedValue, 1, 0);
}
else
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,105 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
using NUnit.Framework;

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners
{
public class BlobLogListenerTests
{
[TestCase("write", 1, null, 0, true, TestName = "HasBlobWritesAsync_WriteLogPresent_ReturnsTrue")]
[TestCase("read", 1, null, 0, false, TestName = "HasBlobWritesAsync_NonWriteLogPresent_ReturnsFalse")]
[TestCase(null, 1, null, 0, false, TestName = "HasBlobWritesAsync_NoBlobs_ReturnsFalse")]
[TestCase("read", 100, "write", 1, true, TestName = "HasBlobWritesAsync_WriteLogPresentMultipleLogBlobs_ReturnsTrue")]
[TestCase("delete", 100, "read", 100, false, TestName = "HasBlobWritesAsync_NonWriteLogPresentMultipleLogBlobs_ReturnsFalse")]
public async Task HasBlobWritesAsync_VariousCases(string logType1, int logType1Count, string logType2, int logType2Count, bool expected)
{
// Arrange
var blobServiceClientMock = new Mock<BlobServiceClient>(MockBehavior.Strict);
var containerClientMock = new Mock<BlobContainerClient>(MockBehavior.Strict);

TestAsyncPageable<BlobItem> pageable;
var blobItems = new List<BlobItem>();
if (logType1 != null)
{
for (int i = 0; i < logType1Count; i++)
{
var blobItem = BlobItemFactory.Create(
name: Guid.NewGuid().ToString(),
metadata: new Dictionary<string, string> { { "LogType", logType1 } });
blobItems.Add(blobItem);
}
}
if (logType2 != null)
{
for (int i = 0; i < logType2Count; i++)
{
var blobItem = BlobItemFactory.Create(
name: Guid.NewGuid().ToString(),
metadata: new Dictionary<string, string> { { "LogType", logType2 } });
blobItems.Add(blobItem);
}
}

if (blobItems.Count == 0)
{
pageable = new TestAsyncPageable<BlobItem>(Enumerable.Empty<BlobItem>());
}
else
{
pageable = new TestAsyncPageable<BlobItem>(blobItems);
}

containerClientMock
.Setup(c => c.GetBlobsAsync(
It.IsAny<BlobTraits>(),
It.IsAny<BlobStates>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Returns(pageable);

blobServiceClientMock
.Setup(c => c.GetBlobContainerClient("$logs"))
.Returns(containerClientMock.Object);

var listener = new BlobLogListener(blobServiceClientMock.Object, NullLogger<BlobListener>.Instance);

// Act
bool result = await listener.HasBlobWritesAsync(CancellationToken.None, hoursWindow: 1);

// Assert
Assert.AreEqual(expected, result);
}

[Test]
public void GetPathsForValidBlobWrites_Returns_ValidBlobWritesOnly()
{
StorageAnalyticsLogEntry[] entries = new[]
{
// This is a valid write entry with a valid path
new StorageAnalyticsLogEntry
{
ServiceType = StorageServiceType.Blob,
OperationType = StorageServiceOperationType.PutBlob,
RequestedObjectKey = @"/storagesample/sample-container/""0x8D199A96CB71468""/sample-blob.txt"
},

// This is an invalid path and will be filtered out
new StorageAnalyticsLogEntry
{
ServiceType = StorageServiceType.Blob,
OperationType = StorageServiceOperationType.PutBlob,
RequestedObjectKey = "/"
},

// This does not constitute a write and will be filtered out
new StorageAnalyticsLogEntry
{
ServiceType = StorageServiceType.Blob,
Expand All @@ -45,5 +114,52 @@ public void GetPathsForValidBlobWrites_Returns_ValidBlobWritesOnly()
Assert.AreEqual("sample-container", singlePath.ContainerName);
Assert.AreEqual(@"""0x8D199A96CB71468""/sample-blob.txt", singlePath.BlobName);
}

private static class BlobItemFactory
{
private static readonly Type BlobItemType = typeof(BlobItem);
private static readonly System.Reflection.PropertyInfo NameProp = BlobItemType.GetProperty(nameof(BlobItem.Name))!;
private static readonly System.Reflection.PropertyInfo MetadataProp = BlobItemType.GetProperty(nameof(BlobItem.Metadata))!;

public static BlobItem Create(string name, IDictionary<string, string> metadata)
{
var ctor = BlobItemType.GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic, binder: null, types: Type.EmptyTypes, modifiers: null)
?? throw new InvalidOperationException("BlobItem internal constructor not found.");
object raw = ctor.Invoke(null);

NameProp.SetValue(raw, name);
var dict = new Dictionary<string, string>(metadata, StringComparer.OrdinalIgnoreCase);
MetadataProp.SetValue(raw, dict);

return (BlobItem)raw;
}
}

private sealed class TestAsyncPageable<T> : AsyncPageable<T>
{
private readonly IReadOnlyList<Page<T>> _pages;

public TestAsyncPageable(IEnumerable<T> items)
{
var list = items.ToList();
var responseMock = new Mock<Response>();
responseMock.Setup(r => r.Status).Returns(200);
responseMock.Setup(r => r.ClientRequestId).Returns(Guid.NewGuid().ToString());

_pages = new[]
{
Page<T>.FromValues(list, continuationToken: null, response: responseMock.Object)
};
}

public override async IAsyncEnumerable<Page<T>> AsPages(string continuationToken = null, int? pageSizeHint = null)
{
foreach (var p in _pages)
{
yield return p;
await Task.Yield();
}
}
}
}
}
}