Skip to content

Commit 934c2a1

Browse files
authored
Move RoslynParallel and ProducerConsumer to threading source package (#79474)
1 parent 5069c81 commit 934c2a1

File tree

53 files changed

+207
-180
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+207
-180
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
#nullable enable
6+
7+
using System.Collections.Generic;
8+
using System.Runtime.CompilerServices;
9+
10+
namespace System.Threading.Channels;
11+
12+
internal static class RoslynChannelReaderExtensions
13+
{
14+
#if NET // binary compatibility
15+
public static IAsyncEnumerable<T> ReadAllAsync<T>(ChannelReader<T> reader, CancellationToken cancellationToken)
16+
=> reader.ReadAllAsync(cancellationToken);
17+
#else
18+
public static async IAsyncEnumerable<T> ReadAllAsync<T>(this ChannelReader<T> reader, [EnumeratorCancellation] CancellationToken cancellationToken)
19+
{
20+
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
21+
{
22+
while (reader.TryRead(out var item))
23+
yield return item;
24+
}
25+
}
26+
#endif
27+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Collections.Generic;
6+
using System.Collections.Immutable;
7+
using System.Linq;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Microsoft.CodeAnalysis.PooledObjects;
11+
12+
namespace Microsoft.CodeAnalysis;
13+
14+
internal static class IAsyncEnumerableExtensions
15+
{
16+
public static async Task<ImmutableArray<T>> ToImmutableArrayAsync<T>(this IAsyncEnumerable<T> values, CancellationToken cancellationToken)
17+
{
18+
using var _ = ArrayBuilder<T>.GetInstance(out var result);
19+
20+
await foreach (var value in values.WithCancellation(cancellationToken).ConfigureAwait(false))
21+
result.Add(value);
22+
23+
return result.ToImmutableAndClear();
24+
}
25+
26+
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
27+
#pragma warning disable VSTHRD200 // Use "Async" suffix for async methods
28+
public static async IAsyncEnumerable<TSource> AsAsyncEnumerable<TSource>(this IEnumerable<TSource> source)
29+
#pragma warning restore VSTHRD200 // Use "Async" suffix for async methods
30+
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
31+
{
32+
foreach (var item in source)
33+
yield return item;
34+
}
35+
}

src/Dependencies/Threading/Microsoft.CodeAnalysis.Threading.Package.csproj

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<!-- Licensed to the .NET Foundation under one or more agreements. The .NET Foundation licenses this file to you under the MIT license. See the LICENSE file in the project root for more information. -->
33
<Project Sdk="Microsoft.NET.Sdk">
44
<PropertyGroup>
5-
<TargetFramework>netstandard2.0</TargetFramework>
5+
<TargetFrameworks>$(NetRoslyn);netstandard2.0</TargetFrameworks>
66
<GenerateDocumentationFile>false</GenerateDocumentationFile>
77
<DebugType>none</DebugType>
88
<GenerateDependencyFile>false</GenerateDependencyFile>
@@ -21,7 +21,9 @@
2121
</PropertyGroup>
2222
<ItemGroup>
2323
<PackageReference Include="System.Collections.Immutable" />
24+
<PackageReference Include="System.Threading.Channels" />
2425
<PackageReference Include="System.Threading.Tasks.Extensions" />
26+
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
2527
</ItemGroup>
2628
<Import Project="..\Collections\Microsoft.CodeAnalysis.Collections.projitems" Label="Shared" />
2729
<Import Project="..\PooledObjects\Microsoft.CodeAnalysis.PooledObjects.projitems" Label="Shared" />

src/Dependencies/Threading/Microsoft.CodeAnalysis.Threading.projitems

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@
2323
<Compile Include="$(MSBuildThisFileDirectory)TestHooks\IAsynchronousOperationListenerProvider.cs" />
2424
<Compile Include="$(MSBuildThisFileDirectory)TestHooks\IAsynchronousOperationListener.cs" />
2525
<Compile Include="$(MSBuildThisFileDirectory)YieldAwaitableExtensions.cs" />
26+
<Compile Include="$(MSBuildThisFileDirectory)ParallelExtensions.cs" />
27+
<Compile Include="$(MSBuildThisFileDirectory)ParallelExtensions.NetFramework.cs" />
28+
<Compile Include="$(MSBuildThisFileDirectory)IAsyncEnumerableExtensions.cs" />
29+
<Compile Include="$(MSBuildThisFileDirectory)ChannelReaderExtensions.cs" />
30+
<Compile Include="$(MSBuildThisFileDirectory)ProducerConsumer.cs" />
31+
<Compile Include="$(MSBuildThisFileDirectory)ProducerConsumerOptions.cs" />
2632
</ItemGroup>
2733
<ItemGroup Condition="'$(DefaultLanguageSourceExtension)' != '' AND '$(BuildingInsideVisualStudio)' != 'true'">
2834
<ExpectedCompile Include="$(MSBuildThisFileDirectory)**\*$(DefaultLanguageSourceExtension)" />
Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,13 @@
1717
// With only changes to make the code work on NetFx. Where changes have been made, the original code is kept around in
1818
// an ifdef'ed block to see what it was doing.
1919

20-
using System;
2120
using System.Collections.Generic;
2221
using System.Diagnostics;
23-
using System.Threading;
24-
using System.Threading.Tasks;
2522
using Microsoft.CodeAnalysis.Threading;
26-
using Roslyn.Utilities;
2723

28-
namespace Microsoft.CodeAnalysis.Shared.Utilities;
24+
namespace System.Threading.Tasks;
2925

30-
internal static partial class RoslynParallel
26+
internal static partial class RoslynParallelExtensions
3127
{
3228
private static class NetFramework
3329
{
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
#nullable enable
6+
7+
#pragma warning disable CA1068 // CancellationToken parameters must come last
8+
9+
using System.Collections.Generic;
10+
11+
namespace System.Threading.Tasks;
12+
13+
internal static partial class RoslynParallelExtensions
14+
{
15+
#if NET // binary compatibility
16+
public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
17+
=> Parallel.ForEachAsync(source, cancellationToken, body);
18+
19+
public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TSource, CancellationToken, ValueTask> body)
20+
=> Parallel.ForEachAsync(source, parallelOptions, body);
21+
22+
public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
23+
=> Parallel.ForEachAsync(source, cancellationToken, body);
24+
25+
public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TSource, CancellationToken, ValueTask> body)
26+
=> Parallel.ForEachAsync(source, parallelOptions, body);
27+
#else
28+
extension(Parallel)
29+
{
30+
public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
31+
=> NetFramework.ForEachAsync(source, cancellationToken, body);
32+
33+
public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TSource, CancellationToken, ValueTask> body)
34+
=> NetFramework.ForEachAsync(source, parallelOptions, body);
35+
36+
public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
37+
=> NetFramework.ForEachAsync(source, cancellationToken, body);
38+
39+
public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TSource, CancellationToken, ValueTask> body)
40+
=> NetFramework.ForEachAsync(source, parallelOptions, body);
41+
}
42+
#endif
43+
}
Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,19 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5+
#nullable enable
6+
57
using System;
68
using System.Collections.Generic;
79
using System.Collections.Immutable;
810
using System.Threading;
911
using System.Threading.Channels;
1012
using System.Threading.Tasks;
1113
using Microsoft.CodeAnalysis.PooledObjects;
12-
using Microsoft.CodeAnalysis.Shared.Extensions;
1314
using Microsoft.CodeAnalysis.Threading;
15+
using Roslyn.Utilities;
1416

15-
namespace Microsoft.CodeAnalysis.Shared.Utilities;
16-
17-
internal readonly record struct ProducerConsumerOptions
18-
{
19-
/// <summary>
20-
/// Used when the consumeItems routine will only pull items on a single thread (never concurrently). produceItems
21-
/// can be called concurrently on many threads.
22-
/// </summary>
23-
public static readonly ProducerConsumerOptions SingleReaderOptions = new() { SingleReader = true };
24-
25-
/// <summary>
26-
/// Used when the consumeItems routine will only pull items on a single thread (never concurrently). produceItems
27-
/// can be called on a single thread as well (never concurrently).
28-
/// </summary>
29-
public static readonly ProducerConsumerOptions SingleReaderWriterOptions = new() { SingleReader = true, SingleWriter = true };
30-
31-
/// <inheritdoc cref="ChannelOptions.SingleWriter"/>
32-
public bool SingleWriter { get; init; }
33-
34-
/// <inheritdoc cref="ChannelOptions.SingleReader"/>
35-
public bool SingleReader { get; init; }
36-
}
17+
namespace Microsoft.CodeAnalysis.Threading;
3718

3819
internal static class ProducerConsumer<TItem>
3920
{
@@ -248,7 +229,7 @@ private static Task<TResult> RunParallelChannelAsync<TSource, TArgs, TResult>(
248229
// We're running in parallel, so we def have multiple writers
249230
ProducerConsumerOptions.SingleReaderOptions,
250231
produceItems: static (callback, args, cancellationToken) =>
251-
RoslynParallel.ForEachAsync(
232+
Parallel.ForEachAsync(
252233
args.source,
253234
cancellationToken,
254235
async (source, cancellationToken) =>
@@ -310,7 +291,7 @@ public static IAsyncEnumerable<TItem> RunParallelStreamAsync<TSource, TArgs>(
310291
{
311292
return RunAsync(
312293
static (callback, args, cancellationToken) =>
313-
RoslynParallel.ForEachAsync(
294+
Parallel.ForEachAsync(
314295
args.source, cancellationToken,
315296
async (source, cancellationToken) => await args.produceItems(
316297
source, callback, args.args, cancellationToken).ConfigureAwait(false)),
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
#nullable enable
6+
7+
using System.Threading.Channels;
8+
9+
namespace Microsoft.CodeAnalysis.Threading;
10+
11+
internal readonly record struct ProducerConsumerOptions
12+
{
13+
/// <summary>
14+
/// Used when the consumeItems routine will only pull items on a single thread (never concurrently). produceItems
15+
/// can be called concurrently on many threads.
16+
/// </summary>
17+
public static readonly ProducerConsumerOptions SingleReaderOptions = new() { SingleReader = true };
18+
19+
/// <summary>
20+
/// Used when the consumeItems routine will only pull items on a single thread (never concurrently). produceItems
21+
/// can be called on a single thread as well (never concurrently).
22+
/// </summary>
23+
public static readonly ProducerConsumerOptions SingleReaderWriterOptions = new() { SingleReader = true, SingleWriter = true };
24+
25+
#if NET
26+
/// <inheritdoc cref="ChannelOptions.SingleWriter"/>
27+
#endif
28+
public bool SingleWriter { get; init; }
29+
30+
#if NET
31+
/// <inheritdoc cref="ChannelOptions.SingleReader"/>
32+
#endif
33+
public bool SingleReader { get; init; }
34+
}

src/EditorFeatures/Core/ExternalAccess/VSTypeScript/VSTypeScriptFindUsagesContext.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public ValueTask OnDefinitionFoundAsync(VSTypeScriptDefinitionItem definition, C
2727
=> UnderlyingObject.OnDefinitionFoundAsync(definition.UnderlyingObject, cancellationToken);
2828

2929
public ValueTask OnReferenceFoundAsync(VSTypeScriptSourceReferenceItem reference, CancellationToken cancellationToken)
30-
=> UnderlyingObject.OnReferencesFoundAsync(IAsyncEnumerableExtensions.SingletonAsync(reference.UnderlyingObject), cancellationToken);
30+
=> UnderlyingObject.OnReferencesFoundAsync(AsyncEnumerableFactory.SingletonAsync(reference.UnderlyingObject), cancellationToken);
3131

3232
public ValueTask OnCompletedAsync(CancellationToken cancellationToken)
3333
=> UnderlyingObject.OnCompletedAsync(cancellationToken);

src/EditorFeatures/Core/ExternalAccess/VSTypeScript/VSTypeScriptFindUsagesService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public ValueTask OnDefinitionFoundAsync(VSTypeScriptDefinitionItem definition, C
4646
=> _context.OnDefinitionFoundAsync(definition.UnderlyingObject, cancellationToken);
4747

4848
public ValueTask OnReferenceFoundAsync(VSTypeScriptSourceReferenceItem reference, CancellationToken cancellationToken)
49-
=> _context.OnReferencesFoundAsync(IAsyncEnumerableExtensions.SingletonAsync(reference.UnderlyingObject), cancellationToken);
49+
=> _context.OnReferencesFoundAsync(AsyncEnumerableFactory.SingletonAsync(reference.UnderlyingObject), cancellationToken);
5050

5151
public ValueTask OnCompletedAsync(CancellationToken cancellationToken)
5252
=> ValueTaskFactory.CompletedTask;

0 commit comments

Comments
 (0)