From 667b2f1e407b5770a9b0793777cb69f6f17f3cbc Mon Sep 17 00:00:00 2001 From: Chris Lambrou Date: Wed, 9 May 2018 08:49:23 +0100 Subject: [PATCH 1/2] Ignore JetBrains settings folder --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 5e427a2..51b177b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,9 +8,10 @@ # Ignore Visual Studio cache/options folder. .vs/ -# Ignore ReSharper stuff. +# Ignore JetBrains stuff. _ReSharper*/ *.[Rr]e[Ss]harper +.idea/ # Ignore user-specific files. *.user From ff0c5b9318e6f927de28853640734c1784db3fdc Mon Sep 17 00:00:00 2001 From: Chris Lambrou Date: Sun, 6 May 2018 03:12:19 +0100 Subject: [PATCH 2/2] New ConcurrentForEach library --- Source/Redgate.MicroLibraries.sln | 6 + .../ULibs.ConcurrentForEach/AssemblyInfo.cs | 3 + Source/ULibs.ConcurrentForEach/Concurrent.cs | 137 ++++++++++++ Source/ULibs.ConcurrentForEach/README.md | 1 + .../ULibs.ConcurrentForEach/RELEASENOTES.md | 7 + .../ULibs.ConcurrentForEach.csproj | 12 ++ .../ULibs.ConcurrentForEach.nuspec | 19 ++ .../ConcurrentForEach/ConcurrentTests.cs | 202 ++++++++++++++++++ Source/Ulibs.Tests/Ulibs.Tests.csproj | 8 + 9 files changed, 395 insertions(+) create mode 100644 Source/ULibs.ConcurrentForEach/AssemblyInfo.cs create mode 100644 Source/ULibs.ConcurrentForEach/Concurrent.cs create mode 100644 Source/ULibs.ConcurrentForEach/README.md create mode 100644 Source/ULibs.ConcurrentForEach/RELEASENOTES.md create mode 100644 Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.csproj create mode 100644 Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.nuspec create mode 100644 Source/Ulibs.Tests/ConcurrentForEach/ConcurrentTests.cs diff --git a/Source/Redgate.MicroLibraries.sln b/Source/Redgate.MicroLibraries.sln index a8e04fd..8e695a4 100644 --- a/Source/Redgate.MicroLibraries.sln +++ b/Source/Redgate.MicroLibraries.sln @@ -15,6 +15,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ULibs.FullExceptionString", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ULibs.TinyJsonDeser", "ULibs.TinyJsonDeser\ULibs.TinyJsonDeser.csproj", "{F46791E7-5434-400B-B5B4-6659909C262E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ULibs.ConcurrentForEach", "ULibs.ConcurrentForEach\ULibs.ConcurrentForEach.csproj", "{B9950135-5A39-4B4C-8A61-D6105CA03396}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -45,6 +47,10 @@ Global {F46791E7-5434-400B-B5B4-6659909C262E}.Debug|Any CPU.Build.0 = Debug|Any CPU {F46791E7-5434-400B-B5B4-6659909C262E}.Release|Any CPU.ActiveCfg = Release|Any CPU {F46791E7-5434-400B-B5B4-6659909C262E}.Release|Any CPU.Build.0 = Release|Any CPU + {B9950135-5A39-4B4C-8A61-D6105CA03396}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B9950135-5A39-4B4C-8A61-D6105CA03396}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B9950135-5A39-4B4C-8A61-D6105CA03396}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B9950135-5A39-4B4C-8A61-D6105CA03396}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Source/ULibs.ConcurrentForEach/AssemblyInfo.cs b/Source/ULibs.ConcurrentForEach/AssemblyInfo.cs new file mode 100644 index 0000000..28325ef --- /dev/null +++ b/Source/ULibs.ConcurrentForEach/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Ulibs.Tests")] \ No newline at end of file diff --git a/Source/ULibs.ConcurrentForEach/Concurrent.cs b/Source/ULibs.ConcurrentForEach/Concurrent.cs new file mode 100644 index 0000000..f481ebc --- /dev/null +++ b/Source/ULibs.ConcurrentForEach/Concurrent.cs @@ -0,0 +1,137 @@ +using System; +using System.Collections.Generic; +/***using System.Diagnostics.CodeAnalysis;***/ +using System.Threading; +using System.Threading.Tasks; + +namespace /***$rootnamespace$.***/ULibs.ConcurrentForEach +{ + /***[ExcludeFromCodeCoverage]***/ + internal static class Concurrent + { + /// + /// Applies an asynchronous operation to each element in a sequence. + /// + /// The type of elements in the sequence. + /// The input sequence of elements. + /// The asynchronous operation applied to each element in the sequence. It's strongly + /// recommended that the operation should take care to handle its own expected exceptions. + /// + /// + /// The maximum number of concurrent operations. Must be greater than or equal to 1. + /// + /// + /// The number of simultaneous operations can vary depending on the use case. For cpu intensive + /// operations, consider using Environment.ProcessorCount. + /// For operations that invoke the same web service for each item, RFC 7230 suggests that the number + /// of simultaneous requests/connections should be limited (https://tools.ietf.org/html/rfc7230#section-6.4). + /// A search for the connection limits used by common web-browsers suggests that a value in the range 6-8 is + /// appropriate (any more, and you risk triggering abuse detection mechanisms). For operations that invoke a + /// different web service for each item, a search for the connection limits used by common web-browsers + /// suggests that a value in the range 10-20 is appropriate. + /// + /// + /// Used to cancel the operations. + /// A task that can be awaited upon for all operations to complete. Awaiting on the task will + /// raise an if any operation fails, or work is cancelled via the + /// . + public static Task ForEachAsync( + this IEnumerable source, + Func func, + int maxConcurrentTasks, + CancellationToken cancellationToken) + { + if (func == null) throw new ArgumentNullException(nameof(func)); + return source.ForEachAsync((item, _) => func(item), maxConcurrentTasks, cancellationToken); + } + + /// + /// Applies an asynchronous operation to each element in a sequence. + /// + /// The type of elements in the sequence. + /// The input sequence of elements. + /// The asynchronous operation applied to each element in the sequence. It's strongly + /// recommended that the operation should take care to handle its own expected exceptions. + /// + /// + /// The maximum number of concurrent operations. Must be greater than or equal to 1. + /// + /// + /// The number of simultaneous operations can vary depending on the use case. For cpu intensive + /// operations, consider using Environment.ProcessorCount. + /// For operations that invoke the same web service for each item, RFC 7230 suggests that the number + /// of simultaneous requests/connections should be limited (https://tools.ietf.org/html/rfc7230#section-6.4). + /// A search for the connection limits used by common web-browsers suggests that a value in the range 6-8 is + /// appropriate (any more, and you risk triggering abuse detection mechanisms). For operations that invoke a + /// different web service for each item, a search for the connection limits used by common web-browsers + /// suggests that a value in the range 10-20 is appropriate. + /// + /// + /// Used to cancel the operations. + /// A task that can be awaited upon for all operations to complete. Awaiting on the task will + /// raise an if any operation fails, or work is cancelled via the + /// . + public static async Task ForEachAsync( + this IEnumerable source, + Func func, + int maxConcurrentTasks, + CancellationToken cancellationToken) + { + if (maxConcurrentTasks < 1) + throw new ArgumentException("Value cannot be less than 1", nameof(maxConcurrentTasks)); + if (source == null) throw new ArgumentNullException(nameof(source)); + if (func == null) throw new ArgumentNullException(nameof(func)); + + using (var semaphore = new SemaphoreSlim(maxConcurrentTasks, maxConcurrentTasks)) + { + var tasks = new List(); + foreach (var item in source) + { + // Wait for the next available slot. + try + { + await semaphore.WaitAsync(cancellationToken); + } + catch (OperationCanceledException exception) + { + tasks.Add(Task.FromException(exception)); + break; + } + + // Discard completed tasks. Not strictly necessary, but keeps the list size down. + tasks.RemoveAll(task => task.IsCompleted); + + // Kick-off the next task. + tasks.Add(CreateTask(func, item, cancellationToken).ReleaseSemaphoreOnCompletion(semaphore)); + } + + await Task.WhenAll(tasks); + } + } + + private static Task CreateTask( + Func func, T item, CancellationToken cancellationToken) + { + try + { + return func(item, cancellationToken); + } + catch (Exception exception) + { + return Task.FromException(exception); + } + } + + private static async Task ReleaseSemaphoreOnCompletion(this Task task, SemaphoreSlim semaphore) + { + try + { + await task; + } + finally + { + semaphore.Release(); + } + } + } +} \ No newline at end of file diff --git a/Source/ULibs.ConcurrentForEach/README.md b/Source/ULibs.ConcurrentForEach/README.md new file mode 100644 index 0000000..432ce45 --- /dev/null +++ b/Source/ULibs.ConcurrentForEach/README.md @@ -0,0 +1 @@ +Provides a way to asynchronously apply an operation to each element in a sequence, whilst limiting the maximum number of concurrent operations. \ No newline at end of file diff --git a/Source/ULibs.ConcurrentForEach/RELEASENOTES.md b/Source/ULibs.ConcurrentForEach/RELEASENOTES.md new file mode 100644 index 0000000..d04244c --- /dev/null +++ b/Source/ULibs.ConcurrentForEach/RELEASENOTES.md @@ -0,0 +1,7 @@ +# ULibs.ConcurrentForEach release notes + +## 1.0.0 + +### Features + +- New `ForEachAsync` extension method for `IEnumerable` that applies an asynchronous operation to each element, whilst limiting the maximum number of concurrent operations. \ No newline at end of file diff --git a/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.csproj b/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.csproj new file mode 100644 index 0000000..befaf3e --- /dev/null +++ b/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.csproj @@ -0,0 +1,12 @@ + + + + netstandard2.0 + 2018 + + + + + + + diff --git a/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.nuspec b/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.nuspec new file mode 100644 index 0000000..fffbd2b --- /dev/null +++ b/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.nuspec @@ -0,0 +1,19 @@ + + + + + RedGate.ULibs.ConcurrentForEach.Sources + $version$ + red-gate + red-gate + false + + + + Copyright $copyrightYear$ Red Gate Software Ltd + https://github.com/red-gate/MicroLibraries + + + + + diff --git a/Source/Ulibs.Tests/ConcurrentForEach/ConcurrentTests.cs b/Source/Ulibs.Tests/ConcurrentForEach/ConcurrentTests.cs new file mode 100644 index 0000000..fa1b8f6 --- /dev/null +++ b/Source/Ulibs.Tests/ConcurrentForEach/ConcurrentTests.cs @@ -0,0 +1,202 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using ULibs.ConcurrentForEach; + +namespace Ulibs.Tests.ConcurrentForEach +{ + [TestFixture] + public class ConcurrentTests + { + private readonly Random _random = new Random(); + + private int GetRandomDelay() + { + lock (_random) + { + return 50 + _random.Next(50); + } + } + + [Test] + public void ForEachAsync_ArgChecks() + { + IEnumerable sequence = Enumerable.Range(0, 10); + IEnumerable nullSequence = null; + + Task Operation(int i) => Task.CompletedTask; + Func nullOperation = null; + + Task CancellableOperation(int i, CancellationToken t) => Task.CompletedTask; + Func nullCancellableOperation = null; + + Assert.That(() => sequence.ForEachAsync(nullOperation, 1, CancellationToken.None), Throws.ArgumentNullException); + Assert.That(() => sequence.ForEachAsync(nullCancellableOperation, 1, CancellationToken.None), Throws.ArgumentNullException); + Assert.That(() => nullSequence.ForEachAsync(Operation, 1, CancellationToken.None), Throws.ArgumentNullException); + Assert.That(() => nullSequence.ForEachAsync(CancellableOperation, 1, CancellationToken.None), Throws.ArgumentNullException); + Assert.That(() => sequence.ForEachAsync(Operation, 0, CancellationToken.None), Throws.ArgumentException); + Assert.That(() => sequence.ForEachAsync(CancellableOperation, 0, CancellationToken.None), Throws.ArgumentException); + } + + [Test] + public void ForEachAsync_ExecutesTheOperationAgainstEachElement() + { + // ARRANGE + var count = 0; + Task Operation(int value) => Task.Run(() => Interlocked.Increment(ref count)); + var sequence = Enumerable.Range(0, 10); + + // ACT + var task = sequence.ForEachAsync(Operation, 2, CancellationToken.None); + + // ASSERT + Assert.That(task.Wait(2000), Is.True); + Assert.That(count, Is.EqualTo(10)); + } + + [Test] + public void ForEachAsync_LimitsTheNumberOfSimultaneousOperations() + { + // ARRANGE + var currentOperationCount = 0; + var maxOperationCount = 0; + var monitor = new object(); + async Task Operation(int value) + { + lock (monitor) + { + currentOperationCount++; + maxOperationCount = Math.Max(maxOperationCount, currentOperationCount); + } + await Task.Delay(GetRandomDelay()); + lock (monitor) + { + currentOperationCount--; + } + } + var sequence = Enumerable.Range(0, 10); + + // ACT + var task = sequence.ForEachAsync(Operation, 2, CancellationToken.None); + + // ASSERT + Assert.That(task.Wait(2000), Is.True); + Assert.That(maxOperationCount, Is.EqualTo(2)); + } + + [Test] + public void ForEachAsync_ExecutesTheOperationAgainstAllElements_InSpiteOfExceptions_Async() + { + // ARRANGE + var count = 0; + async Task Operation(int value) + { + Interlocked.Increment(ref count); + if (value % 5 == 0) + { + // Sometimes the func should fail immediately. + throw new ApplicationException("Error"); + } + await Task.Delay(GetRandomDelay()); + if (value % 2 == 0) + { + // Sometimes the func should fail after the first await. + throw new ApplicationException("Error"); + } + } + var sequence = Enumerable.Range(0, 10); + + // ACT + var task = sequence.ForEachAsync(Operation, 2, CancellationToken.None); + + // ASSERT + Assert.That(() => Assert.That(task.Wait(2000), Is.True), + Throws.InstanceOf()); + Assert.That(count, Is.EqualTo(10)); + } + + [Test] + public void ForEachAsync_ExecutesTheOperationAgainstAllElements_InSpiteOfExceptions_Sync() + { + // ARRANGE + var count = 0; + Task Operation(int value) + { + Interlocked.Increment(ref count); + if (value % 5 == 0) + { + // Sometimes the func should fail immediately. + throw new ApplicationException("Error"); + } + + return Task.Run(async () => + { + await Task.Delay(GetRandomDelay()); + if (value % 2 == 0) + { + // Sometimes the func should fail after the first await. + throw new ApplicationException("Error"); + } + }); + } + var sequence = Enumerable.Range(0, 10); + + // ACT + var task = sequence.ForEachAsync(Operation, 2, CancellationToken.None); + + // ASSERT + Assert.That(() => Assert.That(task.Wait(2000), Is.True), + Throws.InstanceOf()); + Assert.That(count, Is.EqualTo(10)); + } + + [Test] + public void ForEachAsync_AbortsWhenTheCancellationTokenIsCancelled_WhereEachTaskReceivesTheCancellationSignal() + { + // ARRANGE + var count = 0; + Task Operation(int value, CancellationToken token) => Task.Run(async () => + { + Interlocked.Increment(ref count); + await Task.Delay(GetRandomDelay(), token); + }); + var sequence = Enumerable.Range(0, 10); + var cancellationTokenSource = new CancellationTokenSource(200); + var cancellationToken = cancellationTokenSource.Token; + + // ACT + var task = sequence.ForEachAsync(Operation, 2, cancellationToken); + + // ASSERT + Assert.That(() => Assert.That(task.Wait(2000), Is.True), + Throws.InstanceOf()); + Assert.That(count, Is.GreaterThan(0).And.LessThan(10)); + } + + [Test] + public void ForEachAsync_AbortsWhenTheCancellationTokenIsCancelled_WhereNoTaskReceivesTheCancellationSignal() + { + // ARRANGE + var count = 0; + Task Operation(int value) => Task.Run(async () => + { + Interlocked.Increment(ref count); + await Task.Delay(GetRandomDelay()); + }); + var sequence = Enumerable.Range(0, 10); + var cancellationTokenSource = new CancellationTokenSource(200); + var cancellationToken = cancellationTokenSource.Token; + + // ACT + var task = sequence.ForEachAsync(Operation, 2, cancellationToken); + + // ASSERT + Assert.That(() => Assert.That(task.Wait(2000), Is.True), + Throws.InstanceOf()); + Assert.That(count, Is.GreaterThan(0).And.LessThan(10)); + } + } +} \ No newline at end of file diff --git a/Source/Ulibs.Tests/Ulibs.Tests.csproj b/Source/Ulibs.Tests/Ulibs.Tests.csproj index ad3c616..6aa5b63 100644 --- a/Source/Ulibs.Tests/Ulibs.Tests.csproj +++ b/Source/Ulibs.Tests/Ulibs.Tests.csproj @@ -43,6 +43,7 @@ + @@ -53,6 +54,10 @@ + + {b9950135-5a39-4b4c-8a61-d6105ca03396} + ULibs.ConcurrentForEach + {40ea8064-a654-40d3-9849-9a0e52248de9} ULibs.FullExceptionString @@ -70,5 +75,8 @@ ULibs.TinyJsonSer + + + \ No newline at end of file