diff --git a/.gitignore b/.gitignore
index 5e427a2..51b177b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,9 +8,10 @@
# Ignore Visual Studio cache/options folder.
.vs/
-# Ignore ReSharper stuff.
+# Ignore JetBrains stuff.
_ReSharper*/
*.[Rr]e[Ss]harper
+.idea/
# Ignore user-specific files.
*.user
diff --git a/Source/Redgate.MicroLibraries.sln b/Source/Redgate.MicroLibraries.sln
index a8e04fd..8e695a4 100644
--- a/Source/Redgate.MicroLibraries.sln
+++ b/Source/Redgate.MicroLibraries.sln
@@ -15,6 +15,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ULibs.FullExceptionString",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ULibs.TinyJsonDeser", "ULibs.TinyJsonDeser\ULibs.TinyJsonDeser.csproj", "{F46791E7-5434-400B-B5B4-6659909C262E}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ULibs.ConcurrentForEach", "ULibs.ConcurrentForEach\ULibs.ConcurrentForEach.csproj", "{B9950135-5A39-4B4C-8A61-D6105CA03396}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -45,6 +47,10 @@ Global
{F46791E7-5434-400B-B5B4-6659909C262E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F46791E7-5434-400B-B5B4-6659909C262E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F46791E7-5434-400B-B5B4-6659909C262E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B9950135-5A39-4B4C-8A61-D6105CA03396}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B9950135-5A39-4B4C-8A61-D6105CA03396}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B9950135-5A39-4B4C-8A61-D6105CA03396}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B9950135-5A39-4B4C-8A61-D6105CA03396}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/Source/ULibs.ConcurrentForEach/AssemblyInfo.cs b/Source/ULibs.ConcurrentForEach/AssemblyInfo.cs
new file mode 100644
index 0000000..28325ef
--- /dev/null
+++ b/Source/ULibs.ConcurrentForEach/AssemblyInfo.cs
@@ -0,0 +1,3 @@
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("Ulibs.Tests")]
\ No newline at end of file
diff --git a/Source/ULibs.ConcurrentForEach/Concurrent.cs b/Source/ULibs.ConcurrentForEach/Concurrent.cs
new file mode 100644
index 0000000..f481ebc
--- /dev/null
+++ b/Source/ULibs.ConcurrentForEach/Concurrent.cs
@@ -0,0 +1,137 @@
+using System;
+using System.Collections.Generic;
+/***using System.Diagnostics.CodeAnalysis;***/
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace /***$rootnamespace$.***/ULibs.ConcurrentForEach
+{
+ /***[ExcludeFromCodeCoverage]***/
+ internal static class Concurrent
+ {
+ ///
+ /// Applies an asynchronous operation to each element in a sequence.
+ ///
+ /// The type of elements in the sequence.
+ /// The input sequence of elements.
+ /// The asynchronous operation applied to each element in the sequence. It's strongly
+ /// recommended that the operation should take care to handle its own expected exceptions.
+ ///
+ ///
+ /// The maximum number of concurrent operations. Must be greater than or equal to 1.
+ ///
+ ///
+ /// The number of simultaneous operations can vary depending on the use case. For cpu intensive
+ /// operations, consider using Environment.ProcessorCount.
+ /// For operations that invoke the same web service for each item, RFC 7230 suggests that the number
+ /// of simultaneous requests/connections should be limited (https://tools.ietf.org/html/rfc7230#section-6.4).
+ /// A search for the connection limits used by common web-browsers suggests that a value in the range 6-8 is
+ /// appropriate (any more, and you risk triggering abuse detection mechanisms). For operations that invoke a
+ /// different web service for each item, a search for the connection limits used by common web-browsers
+ /// suggests that a value in the range 10-20 is appropriate.
+ ///
+ ///
+ /// Used to cancel the operations.
+ /// A task that can be awaited upon for all operations to complete. Awaiting on the task will
+ /// raise an if any operation fails, or work is cancelled via the
+ /// .
+ public static Task ForEachAsync(
+ this IEnumerable source,
+ Func func,
+ int maxConcurrentTasks,
+ CancellationToken cancellationToken)
+ {
+ if (func == null) throw new ArgumentNullException(nameof(func));
+ return source.ForEachAsync((item, _) => func(item), maxConcurrentTasks, cancellationToken);
+ }
+
+ ///
+ /// Applies an asynchronous operation to each element in a sequence.
+ ///
+ /// The type of elements in the sequence.
+ /// The input sequence of elements.
+ /// The asynchronous operation applied to each element in the sequence. It's strongly
+ /// recommended that the operation should take care to handle its own expected exceptions.
+ ///
+ ///
+ /// The maximum number of concurrent operations. Must be greater than or equal to 1.
+ ///
+ ///
+ /// The number of simultaneous operations can vary depending on the use case. For cpu intensive
+ /// operations, consider using Environment.ProcessorCount.
+ /// For operations that invoke the same web service for each item, RFC 7230 suggests that the number
+ /// of simultaneous requests/connections should be limited (https://tools.ietf.org/html/rfc7230#section-6.4).
+ /// A search for the connection limits used by common web-browsers suggests that a value in the range 6-8 is
+ /// appropriate (any more, and you risk triggering abuse detection mechanisms). For operations that invoke a
+ /// different web service for each item, a search for the connection limits used by common web-browsers
+ /// suggests that a value in the range 10-20 is appropriate.
+ ///
+ ///
+ /// Used to cancel the operations.
+ /// A task that can be awaited upon for all operations to complete. Awaiting on the task will
+ /// raise an if any operation fails, or work is cancelled via the
+ /// .
+ public static async Task ForEachAsync(
+ this IEnumerable source,
+ Func func,
+ int maxConcurrentTasks,
+ CancellationToken cancellationToken)
+ {
+ if (maxConcurrentTasks < 1)
+ throw new ArgumentException("Value cannot be less than 1", nameof(maxConcurrentTasks));
+ if (source == null) throw new ArgumentNullException(nameof(source));
+ if (func == null) throw new ArgumentNullException(nameof(func));
+
+ using (var semaphore = new SemaphoreSlim(maxConcurrentTasks, maxConcurrentTasks))
+ {
+ var tasks = new List();
+ foreach (var item in source)
+ {
+ // Wait for the next available slot.
+ try
+ {
+ await semaphore.WaitAsync(cancellationToken);
+ }
+ catch (OperationCanceledException exception)
+ {
+ tasks.Add(Task.FromException(exception));
+ break;
+ }
+
+ // Discard completed tasks. Not strictly necessary, but keeps the list size down.
+ tasks.RemoveAll(task => task.IsCompleted);
+
+ // Kick-off the next task.
+ tasks.Add(CreateTask(func, item, cancellationToken).ReleaseSemaphoreOnCompletion(semaphore));
+ }
+
+ await Task.WhenAll(tasks);
+ }
+ }
+
+ private static Task CreateTask(
+ Func func, T item, CancellationToken cancellationToken)
+ {
+ try
+ {
+ return func(item, cancellationToken);
+ }
+ catch (Exception exception)
+ {
+ return Task.FromException(exception);
+ }
+ }
+
+ private static async Task ReleaseSemaphoreOnCompletion(this Task task, SemaphoreSlim semaphore)
+ {
+ try
+ {
+ await task;
+ }
+ finally
+ {
+ semaphore.Release();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/Source/ULibs.ConcurrentForEach/README.md b/Source/ULibs.ConcurrentForEach/README.md
new file mode 100644
index 0000000..432ce45
--- /dev/null
+++ b/Source/ULibs.ConcurrentForEach/README.md
@@ -0,0 +1 @@
+Provides a way to asynchronously apply an operation to each element in a sequence, whilst limiting the maximum number of concurrent operations.
\ No newline at end of file
diff --git a/Source/ULibs.ConcurrentForEach/RELEASENOTES.md b/Source/ULibs.ConcurrentForEach/RELEASENOTES.md
new file mode 100644
index 0000000..d04244c
--- /dev/null
+++ b/Source/ULibs.ConcurrentForEach/RELEASENOTES.md
@@ -0,0 +1,7 @@
+# ULibs.ConcurrentForEach release notes
+
+## 1.0.0
+
+### Features
+
+- New `ForEachAsync` extension method for `IEnumerable` that applies an asynchronous operation to each element, whilst limiting the maximum number of concurrent operations.
\ No newline at end of file
diff --git a/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.csproj b/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.csproj
new file mode 100644
index 0000000..befaf3e
--- /dev/null
+++ b/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.csproj
@@ -0,0 +1,12 @@
+
+
+
+ netstandard2.0
+ 2018
+
+
+
+
+
+
+
diff --git a/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.nuspec b/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.nuspec
new file mode 100644
index 0000000..fffbd2b
--- /dev/null
+++ b/Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.nuspec
@@ -0,0 +1,19 @@
+
+
+
+
+ RedGate.ULibs.ConcurrentForEach.Sources
+ $version$
+ red-gate
+ red-gate
+ false
+
+
+
+ Copyright $copyrightYear$ Red Gate Software Ltd
+ https://github.com/red-gate/MicroLibraries
+
+
+
+
+
diff --git a/Source/Ulibs.Tests/ConcurrentForEach/ConcurrentTests.cs b/Source/Ulibs.Tests/ConcurrentForEach/ConcurrentTests.cs
new file mode 100644
index 0000000..fa1b8f6
--- /dev/null
+++ b/Source/Ulibs.Tests/ConcurrentForEach/ConcurrentTests.cs
@@ -0,0 +1,202 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using NUnit.Framework;
+using ULibs.ConcurrentForEach;
+
+namespace Ulibs.Tests.ConcurrentForEach
+{
+ [TestFixture]
+ public class ConcurrentTests
+ {
+ private readonly Random _random = new Random();
+
+ private int GetRandomDelay()
+ {
+ lock (_random)
+ {
+ return 50 + _random.Next(50);
+ }
+ }
+
+ [Test]
+ public void ForEachAsync_ArgChecks()
+ {
+ IEnumerable sequence = Enumerable.Range(0, 10);
+ IEnumerable nullSequence = null;
+
+ Task Operation(int i) => Task.CompletedTask;
+ Func nullOperation = null;
+
+ Task CancellableOperation(int i, CancellationToken t) => Task.CompletedTask;
+ Func nullCancellableOperation = null;
+
+ Assert.That(() => sequence.ForEachAsync(nullOperation, 1, CancellationToken.None), Throws.ArgumentNullException);
+ Assert.That(() => sequence.ForEachAsync(nullCancellableOperation, 1, CancellationToken.None), Throws.ArgumentNullException);
+ Assert.That(() => nullSequence.ForEachAsync(Operation, 1, CancellationToken.None), Throws.ArgumentNullException);
+ Assert.That(() => nullSequence.ForEachAsync(CancellableOperation, 1, CancellationToken.None), Throws.ArgumentNullException);
+ Assert.That(() => sequence.ForEachAsync(Operation, 0, CancellationToken.None), Throws.ArgumentException);
+ Assert.That(() => sequence.ForEachAsync(CancellableOperation, 0, CancellationToken.None), Throws.ArgumentException);
+ }
+
+ [Test]
+ public void ForEachAsync_ExecutesTheOperationAgainstEachElement()
+ {
+ // ARRANGE
+ var count = 0;
+ Task Operation(int value) => Task.Run(() => Interlocked.Increment(ref count));
+ var sequence = Enumerable.Range(0, 10);
+
+ // ACT
+ var task = sequence.ForEachAsync(Operation, 2, CancellationToken.None);
+
+ // ASSERT
+ Assert.That(task.Wait(2000), Is.True);
+ Assert.That(count, Is.EqualTo(10));
+ }
+
+ [Test]
+ public void ForEachAsync_LimitsTheNumberOfSimultaneousOperations()
+ {
+ // ARRANGE
+ var currentOperationCount = 0;
+ var maxOperationCount = 0;
+ var monitor = new object();
+ async Task Operation(int value)
+ {
+ lock (monitor)
+ {
+ currentOperationCount++;
+ maxOperationCount = Math.Max(maxOperationCount, currentOperationCount);
+ }
+ await Task.Delay(GetRandomDelay());
+ lock (monitor)
+ {
+ currentOperationCount--;
+ }
+ }
+ var sequence = Enumerable.Range(0, 10);
+
+ // ACT
+ var task = sequence.ForEachAsync(Operation, 2, CancellationToken.None);
+
+ // ASSERT
+ Assert.That(task.Wait(2000), Is.True);
+ Assert.That(maxOperationCount, Is.EqualTo(2));
+ }
+
+ [Test]
+ public void ForEachAsync_ExecutesTheOperationAgainstAllElements_InSpiteOfExceptions_Async()
+ {
+ // ARRANGE
+ var count = 0;
+ async Task Operation(int value)
+ {
+ Interlocked.Increment(ref count);
+ if (value % 5 == 0)
+ {
+ // Sometimes the func should fail immediately.
+ throw new ApplicationException("Error");
+ }
+ await Task.Delay(GetRandomDelay());
+ if (value % 2 == 0)
+ {
+ // Sometimes the func should fail after the first await.
+ throw new ApplicationException("Error");
+ }
+ }
+ var sequence = Enumerable.Range(0, 10);
+
+ // ACT
+ var task = sequence.ForEachAsync(Operation, 2, CancellationToken.None);
+
+ // ASSERT
+ Assert.That(() => Assert.That(task.Wait(2000), Is.True),
+ Throws.InstanceOf());
+ Assert.That(count, Is.EqualTo(10));
+ }
+
+ [Test]
+ public void ForEachAsync_ExecutesTheOperationAgainstAllElements_InSpiteOfExceptions_Sync()
+ {
+ // ARRANGE
+ var count = 0;
+ Task Operation(int value)
+ {
+ Interlocked.Increment(ref count);
+ if (value % 5 == 0)
+ {
+ // Sometimes the func should fail immediately.
+ throw new ApplicationException("Error");
+ }
+
+ return Task.Run(async () =>
+ {
+ await Task.Delay(GetRandomDelay());
+ if (value % 2 == 0)
+ {
+ // Sometimes the func should fail after the first await.
+ throw new ApplicationException("Error");
+ }
+ });
+ }
+ var sequence = Enumerable.Range(0, 10);
+
+ // ACT
+ var task = sequence.ForEachAsync(Operation, 2, CancellationToken.None);
+
+ // ASSERT
+ Assert.That(() => Assert.That(task.Wait(2000), Is.True),
+ Throws.InstanceOf());
+ Assert.That(count, Is.EqualTo(10));
+ }
+
+ [Test]
+ public void ForEachAsync_AbortsWhenTheCancellationTokenIsCancelled_WhereEachTaskReceivesTheCancellationSignal()
+ {
+ // ARRANGE
+ var count = 0;
+ Task Operation(int value, CancellationToken token) => Task.Run(async () =>
+ {
+ Interlocked.Increment(ref count);
+ await Task.Delay(GetRandomDelay(), token);
+ });
+ var sequence = Enumerable.Range(0, 10);
+ var cancellationTokenSource = new CancellationTokenSource(200);
+ var cancellationToken = cancellationTokenSource.Token;
+
+ // ACT
+ var task = sequence.ForEachAsync(Operation, 2, cancellationToken);
+
+ // ASSERT
+ Assert.That(() => Assert.That(task.Wait(2000), Is.True),
+ Throws.InstanceOf());
+ Assert.That(count, Is.GreaterThan(0).And.LessThan(10));
+ }
+
+ [Test]
+ public void ForEachAsync_AbortsWhenTheCancellationTokenIsCancelled_WhereNoTaskReceivesTheCancellationSignal()
+ {
+ // ARRANGE
+ var count = 0;
+ Task Operation(int value) => Task.Run(async () =>
+ {
+ Interlocked.Increment(ref count);
+ await Task.Delay(GetRandomDelay());
+ });
+ var sequence = Enumerable.Range(0, 10);
+ var cancellationTokenSource = new CancellationTokenSource(200);
+ var cancellationToken = cancellationTokenSource.Token;
+
+ // ACT
+ var task = sequence.ForEachAsync(Operation, 2, cancellationToken);
+
+ // ASSERT
+ Assert.That(() => Assert.That(task.Wait(2000), Is.True),
+ Throws.InstanceOf());
+ Assert.That(count, Is.GreaterThan(0).And.LessThan(10));
+ }
+ }
+}
\ No newline at end of file
diff --git a/Source/Ulibs.Tests/Ulibs.Tests.csproj b/Source/Ulibs.Tests/Ulibs.Tests.csproj
index ad3c616..6aa5b63 100644
--- a/Source/Ulibs.Tests/Ulibs.Tests.csproj
+++ b/Source/Ulibs.Tests/Ulibs.Tests.csproj
@@ -43,6 +43,7 @@
+
@@ -53,6 +54,10 @@
+
+ {b9950135-5a39-4b4c-8a61-d6105ca03396}
+ ULibs.ConcurrentForEach
+
{40ea8064-a654-40d3-9849-9a0e52248de9}
ULibs.FullExceptionString
@@ -70,5 +75,8 @@
ULibs.TinyJsonSer
+
+
+
\ No newline at end of file