Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
# Ignore Visual Studio cache/options folder.
.vs/

# Ignore ReSharper stuff.
# Ignore JetBrains stuff.
_ReSharper*/
*.[Rr]e[Ss]harper
.idea/

# Ignore user-specific files.
*.user
Expand Down
6 changes: 6 additions & 0 deletions Source/Redgate.MicroLibraries.sln
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ULibs.FullExceptionString",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ULibs.TinyJsonDeser", "ULibs.TinyJsonDeser\ULibs.TinyJsonDeser.csproj", "{F46791E7-5434-400B-B5B4-6659909C262E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ULibs.ConcurrentForEach", "ULibs.ConcurrentForEach\ULibs.ConcurrentForEach.csproj", "{B9950135-5A39-4B4C-8A61-D6105CA03396}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -45,6 +47,10 @@ Global
{F46791E7-5434-400B-B5B4-6659909C262E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F46791E7-5434-400B-B5B4-6659909C262E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F46791E7-5434-400B-B5B4-6659909C262E}.Release|Any CPU.Build.0 = Release|Any CPU
{B9950135-5A39-4B4C-8A61-D6105CA03396}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B9950135-5A39-4B4C-8A61-D6105CA03396}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B9950135-5A39-4B4C-8A61-D6105CA03396}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B9950135-5A39-4B4C-8A61-D6105CA03396}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
3 changes: 3 additions & 0 deletions Source/ULibs.ConcurrentForEach/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Ulibs.Tests")]
137 changes: 137 additions & 0 deletions Source/ULibs.ConcurrentForEach/Concurrent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using System;
using System.Collections.Generic;
/***using System.Diagnostics.CodeAnalysis;***/
using System.Threading;
using System.Threading.Tasks;

namespace /***$rootnamespace$.***/ULibs.ConcurrentForEach
{
/***[ExcludeFromCodeCoverage]***/
internal static class Concurrent
{
/// <summary>
/// Applies an asynchronous operation to each element in a sequence.
/// </summary>
/// <typeparam name="T">The type of elements in the sequence.</typeparam>
/// <param name="source">The input sequence of elements.</param>
/// <param name="func">The asynchronous operation applied to each element in the sequence. It's strongly
/// recommended that the operation should take care to handle its own expected exceptions.</param>
/// <param name="maxConcurrentTasks">
/// <para>
/// The maximum number of concurrent operations. Must be greater than or equal to 1.
/// </para>
/// <para>
/// The number of simultaneous operations can vary depending on the use case. For cpu intensive
/// operations, consider using <see cref="Environment.ProcessorCount">Environment.ProcessorCount</see>.
/// For operations that invoke the same web service for each item, RFC 7230 suggests that the number
/// of simultaneous requests/connections should be limited (https://tools.ietf.org/html/rfc7230#section-6.4).
/// A search for the connection limits used by common web-browsers suggests that a value in the range 6-8 is
/// appropriate (any more, and you risk triggering abuse detection mechanisms). For operations that invoke a
/// different web service for each item, a search for the connection limits used by common web-browsers
/// suggests that a value in the range 10-20 is appropriate.
/// </para>
/// </param>
/// <param name="cancellationToken">Used to cancel the operations.</param>
/// <returns>A task that can be awaited upon for all operations to complete. Awaiting on the task will
/// raise an <see cref="AggregateException"/> if any operation fails, or work is cancelled via the
/// <paramref name="cancellationToken"/>.</returns>
public static Task ForEachAsync<T>(
this IEnumerable<T> source,
Func<T, Task> func,
int maxConcurrentTasks,
CancellationToken cancellationToken)
{
if (func == null) throw new ArgumentNullException(nameof(func));
return source.ForEachAsync((item, _) => func(item), maxConcurrentTasks, cancellationToken);
}

/// <summary>
/// Applies an asynchronous operation to each element in a sequence.
/// </summary>
/// <typeparam name="T">The type of elements in the sequence.</typeparam>
/// <param name="source">The input sequence of elements.</param>
/// <param name="func">The asynchronous operation applied to each element in the sequence. It's strongly
/// recommended that the operation should take care to handle its own expected exceptions.</param>
/// <param name="maxConcurrentTasks">
/// <para>
/// The maximum number of concurrent operations. Must be greater than or equal to 1.
/// </para>
/// <para>
/// The number of simultaneous operations can vary depending on the use case. For cpu intensive
/// operations, consider using <see cref="Environment.ProcessorCount">Environment.ProcessorCount</see>.
/// For operations that invoke the same web service for each item, RFC 7230 suggests that the number
/// of simultaneous requests/connections should be limited (https://tools.ietf.org/html/rfc7230#section-6.4).
/// A search for the connection limits used by common web-browsers suggests that a value in the range 6-8 is
/// appropriate (any more, and you risk triggering abuse detection mechanisms). For operations that invoke a
/// different web service for each item, a search for the connection limits used by common web-browsers
/// suggests that a value in the range 10-20 is appropriate.
/// </para>
/// </param>
/// <param name="cancellationToken">Used to cancel the operations.</param>
/// <returns>A task that can be awaited upon for all operations to complete. Awaiting on the task will
/// raise an <see cref="AggregateException"/> if any operation fails, or work is cancelled via the
/// <paramref name="cancellationToken"/>.</returns>
public static async Task ForEachAsync<T>(
this IEnumerable<T> source,
Func<T, CancellationToken, Task> func,
int maxConcurrentTasks,
CancellationToken cancellationToken)
{
if (maxConcurrentTasks < 1)
throw new ArgumentException("Value cannot be less than 1", nameof(maxConcurrentTasks));
if (source == null) throw new ArgumentNullException(nameof(source));
if (func == null) throw new ArgumentNullException(nameof(func));

using (var semaphore = new SemaphoreSlim(maxConcurrentTasks, maxConcurrentTasks))
{
var tasks = new List<Task>();
foreach (var item in source)
{
// Wait for the next available slot.
try
{
await semaphore.WaitAsync(cancellationToken);
}
catch (OperationCanceledException exception)
{
tasks.Add(Task.FromException(exception));
break;
}

// Discard completed tasks. Not strictly necessary, but keeps the list size down.
tasks.RemoveAll(task => task.IsCompleted);

// Kick-off the next task.
tasks.Add(CreateTask(func, item, cancellationToken).ReleaseSemaphoreOnCompletion(semaphore));
}

await Task.WhenAll(tasks);
}
}

private static Task CreateTask<T>(
Func<T, CancellationToken, Task> func, T item, CancellationToken cancellationToken)
{
try
{
return func(item, cancellationToken);
}
catch (Exception exception)
{
return Task.FromException(exception);
}
}

private static async Task ReleaseSemaphoreOnCompletion(this Task task, SemaphoreSlim semaphore)
{
try
{
await task;
}
finally
{
semaphore.Release();
}
}
}
}
1 change: 1 addition & 0 deletions Source/ULibs.ConcurrentForEach/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Provides a way to asynchronously apply an operation to each element in a sequence, whilst limiting the maximum number of concurrent operations.
7 changes: 7 additions & 0 deletions Source/ULibs.ConcurrentForEach/RELEASENOTES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# ULibs.ConcurrentForEach release notes

## 1.0.0

### Features

- New `ForEachAsync` extension method for `IEnumerable<T>` that applies an asynchronous operation to each element, whilst limiting the maximum number of concurrent operations.
12 changes: 12 additions & 0 deletions Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<Copyright>2018</Copyright>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.DotNet.Analyzers.Compatibility" Version="0.2.12-alpha" />
</ItemGroup>

</Project>
19 changes: 19 additions & 0 deletions Source/ULibs.ConcurrentForEach/ULibs.ConcurrentForEach.nuspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0"?>
<!-- http://docs.nuget.org/docs/reference/nuspec-reference -->
<package >
<metadata>
<id>RedGate.ULibs.ConcurrentForEach.Sources</id>
<version>$version$</version>
<authors>red-gate</authors>
<owners>red-gate</owners>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<summary><![CDATA[$summary$]]></summary>
<description><![CDATA[$description$]]></description>
<releaseNotes><![CDATA[$releaseNotes$]]></releaseNotes>
<copyright>Copyright $copyrightYear$ Red Gate Software Ltd</copyright>
<projectUrl>https://github.com/red-gate/MicroLibraries</projectUrl>
</metadata>
<files>
<file src="*.pp" target="content\App_Packages\RedGate.ULibs.ConcurrentForEach.$version$"/>
</files>
</package>
Loading