Skip to content

Commit 25f6a2f

Browse files
committed
Adjust SimpleZipArchiveReader changes + Pool Stream on Zip extractor
1 parent e098e8f commit 25f6a2f

File tree

5 files changed

+162
-38
lines changed

5 files changed

+162
-38
lines changed

CollapseLauncher/Classes/GameManagement/WpfPackage/WpfPackageContext.Methods.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private async ValueTask PerformDownloadFromZipOnTheFly(
9595
string gamePath = GamePath;
9696

9797
ZipArchiveReader reader =
98-
await ZipArchiveReader.CreateFromRemoteAsync(url, _localCts.Token);
98+
await ZipArchiveReader.CreateFromAsync(url, _localCts.Token);
9999

100100
long totalSizeUncompressed = reader.Sum(x => x.Size);
101101
ProgressAllSizeTotal = totalSizeUncompressed;
@@ -147,8 +147,7 @@ await IsPackageHashMatchWithCrc32(stream, entry.Crc32, token))
147147
byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
148148
try
149149
{
150-
await using Stream deflateStream =
151-
await entry.OpenStreamFromFactoryAsync(CreateStreamFromUrl, token);
150+
await using Stream deflateStream = await entry.OpenStreamFromAsync(CreateStreamFromUrl, token);
152151

153152
int read;
154153
while ((read = await deflateStream.ReadAsync(buffer, token)) > 0)
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
#nullable enable
8+
namespace CollapseLauncher.Helper;
9+
10+
internal class ThreadObjectPool<T> : IDisposable where T : class
11+
{
12+
private ConcurrentQueue<T> Items = new();
13+
private SemaphoreSlim Semaphore;
14+
private Func<object?> _factory;
15+
16+
private int _countUsed;
17+
private readonly int _capacity;
18+
private readonly bool _isDisposeObjects;
19+
private bool _isDisposed;
20+
21+
internal ThreadObjectPool(Func<T> factory, int capacity = 0, bool isDisposeObjects = true)
22+
{
23+
_capacity = capacity == 0 ? Environment.ProcessorCount : capacity;
24+
_factory = factory;
25+
_isDisposeObjects = isDisposeObjects;
26+
Semaphore = new SemaphoreSlim(_capacity, _capacity);
27+
}
28+
29+
internal ThreadObjectPool(Task<T> factoryAsync, int capacity = 0, bool isDisposeObjects = true)
30+
{
31+
_capacity = capacity == 0 ? Environment.ProcessorCount : capacity;
32+
_factory = () => factoryAsync;
33+
_isDisposeObjects = isDisposeObjects;
34+
Semaphore = new SemaphoreSlim(_capacity, _capacity);
35+
}
36+
37+
internal async Task<T> GetOrCreateObjectAsync(CancellationToken token = default)
38+
{
39+
await Semaphore.WaitAsync(token);
40+
Interlocked.Increment(ref _countUsed);
41+
42+
if (Items.TryDequeue(out T? pooled))
43+
return pooled;
44+
45+
if (_factory is Func<Task<T>> asyncFactory)
46+
return await asyncFactory();
47+
48+
return ((Func<T>)_factory)();
49+
}
50+
51+
internal T GetOrCreateObject()
52+
{
53+
Semaphore.Wait();
54+
Interlocked.Increment(ref _countUsed);
55+
56+
if (Items.TryDequeue(out T? pooled))
57+
return pooled;
58+
59+
if (_factory is Func<Task<T>> asyncFactory)
60+
return asyncFactory().Result;
61+
62+
return ((Func<T>)_factory)();
63+
}
64+
65+
internal void Return(T item)
66+
{
67+
Items.Enqueue(item);
68+
Interlocked.Decrement(ref _countUsed);
69+
Semaphore.Release();
70+
}
71+
72+
public void Dispose()
73+
{
74+
if (_isDisposed)
75+
{
76+
return;
77+
}
78+
79+
Semaphore.Dispose();
80+
81+
try
82+
{
83+
if (!_isDisposeObjects)
84+
{
85+
return;
86+
}
87+
88+
foreach (IDisposable item in Items.OfType<IDisposable>())
89+
{
90+
item.Dispose();
91+
}
92+
}
93+
finally
94+
{
95+
_isDisposed = true;
96+
Items.Clear();
97+
_factory = null!;
98+
Items = null!;
99+
Semaphore = null!;
100+
}
101+
}
102+
}

CollapseLauncher/Classes/InstallManagement/Genshin/GenshinInstall.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ protected override GameInstallFileInfo GetGameInstallFileInfo()
203203
(sdkApi.Data?.TryFindByBizOrId(gameBiz, gameId, out HypChannelSdkData sdkData) ?? false) &&
204204
sdkData.SdkPackageDetail?.Url is { } sdkZipUrl)
205205
{
206-
ZipArchiveReader reader = await ZipArchiveReader.CreateFromRemoteAsync(sdkZipUrl);
206+
ZipArchiveReader reader = await ZipArchiveReader.CreateFromAsync(sdkZipUrl);
207207
localFileInfos.AddRange(reader
208208
.Where(x => !x.IsDirectory)
209209
.Select(ConvertZipEntry));
@@ -214,7 +214,7 @@ protected override GameInstallFileInfo GetGameInstallFileInfo()
214214
(wpfApi.Data?.TryFindByBizOrId(gameBiz, gameId, out HypWpfPackageData wpfData) ?? false) &&
215215
wpfData.PackageInfo?.Url is { } wpfZipUrl)
216216
{
217-
ZipArchiveReader reader = await ZipArchiveReader.CreateFromRemoteAsync(wpfZipUrl);
217+
ZipArchiveReader reader = await ZipArchiveReader.CreateFromAsync(wpfZipUrl);
218218
localFileInfos.AddRange(reader
219219
.Where(x => !x.IsDirectory)
220220
.Select(ConvertZipEntry));

CollapseLauncher/Classes/Interfaces/Class/ProgressBase.cs

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,7 +1007,7 @@ protected async Task FetchBilibiliSdk(CancellationToken token)
10071007
string url = sdkData.SdkPackageDetail?.Url ?? throw new NullReferenceException();
10081008

10091009
// Create ZipArchiveReader and get the remote stream of the zip file
1010-
ZipArchiveReader reader = await ZipArchiveReader.CreateFromRemoteAsync(url, token);
1010+
ZipArchiveReader reader = await ZipArchiveReader.CreateFromAsync(url, token);
10111011
HttpClient client = FallbackCDNUtil.GetGlobalHttpClient(true);
10121012

10131013
await Parallel.ForEachAsync(reader.Where(x => !x.IsDirectory),
@@ -1041,7 +1041,7 @@ async ValueTask Impl(ZipArchiveEntry entry, CancellationToken innerToken)
10411041
return;
10421042
}
10431043

1044-
await using Stream entryStream = await entry.OpenStreamFromFactoryAsync(CreateStreamFromPosUrl, innerToken);
1044+
await using Stream entryStream = await entry.OpenStreamFromAsync(CreateStreamFromPosUrl, innerToken);
10451045
// Reset the SDK DLL stream pos and write the data
10461046
sdkDllStream.Position = 0;
10471047
await entryStream.CopyToAsync(sdkDllStream, innerToken);
@@ -1359,11 +1359,11 @@ internal static ValueTask<FileStream> NaivelyOpenFileStreamAsync(FileInfo info,
13591359
protected virtual long GetArchiveUncompressedSizeManaged(Stream archiveStream)
13601360
{
13611361
ZipArchiveReader archive = ZipArchiveReader
1362-
.CreateFromStreamFactory((pos, _) =>
1363-
{
1364-
archiveStream.Position = pos ?? 0;
1365-
return archiveStream;
1366-
});
1362+
.CreateFrom((pos, _) =>
1363+
{
1364+
archiveStream.Position = pos ?? 0;
1365+
return archiveStream;
1366+
});
13671367

13681368
return archive.Sum(x => x.Size);
13691369
}
@@ -1424,9 +1424,24 @@ protected virtual async Task ExtractUsingManagedZip(
14241424
string outputDir,
14251425
CancellationToken token)
14261426
{
1427+
// Use ThreadObjectPool to cache the Streams and re-using it.
1428+
using ThreadObjectPool<Stream> streamPool = new(
1429+
() => CreateStreamWithPos(0, 0, token),
1430+
capacity: ThreadCount * 2, // Double the thread count for spare capacity
1431+
isDisposeObjects: true);
1432+
14271433
int threadCounts = ThreadCount;
1428-
ZipArchiveReader archive = await ZipArchiveReader
1429-
.CreateFromStreamFactoryAsync(CreateStreamWithPos, token);
1434+
Stream zipInitialStream = await streamPool.GetOrCreateObjectAsync(token);
1435+
ZipArchiveReader archive;
1436+
1437+
try
1438+
{
1439+
archive = await ZipArchiveReader.CreateFromAsync(zipInitialStream, token);
1440+
}
1441+
finally
1442+
{
1443+
streamPool.Return(zipInitialStream);
1444+
}
14301445

14311446
// Run the workers
14321447
await Parallel.ForEachAsync(archive,
@@ -1439,45 +1454,53 @@ await Parallel.ForEachAsync(archive,
14391454

14401455
return;
14411456

1442-
ValueTask Impl(ZipArchiveEntry entry, CancellationToken innerToken)
1457+
async ValueTask Impl(ZipArchiveEntry entry, CancellationToken innerToken)
14431458
{
1444-
return ExtractUsingManagedZipWorker(entry,
1445-
CreateStreamWithPos,
1446-
outputDir,
1447-
innerToken);
1459+
string outputPath = Path.Combine(outputDir, entry.Filename);
1460+
if (entry.IsDirectory)
1461+
{
1462+
_ = Directory.CreateDirectory(outputPath);
1463+
return;
1464+
}
1465+
1466+
Stream reusableStream = await streamPool.GetOrCreateObjectAsync(innerToken);
1467+
try
1468+
{
1469+
await ExtractUsingManagedZipWorker(entry,
1470+
reusableStream,
1471+
outputDir,
1472+
innerToken);
1473+
}
1474+
finally
1475+
{
1476+
streamPool.Return(reusableStream);
1477+
}
14481478
}
14491479

1450-
Task<Stream> CreateStreamWithPos(long? start, long? _, CancellationToken innerToken)
1451-
=> Task.Factory.StartNew(() =>
1452-
{
1453-
Stream stream = streamFactory();
1454-
stream.Position = start ?? 0;
1455-
return stream;
1456-
}, innerToken);
1480+
Stream CreateStreamWithPos(long? start, long? _, CancellationToken innerToken)
1481+
{
1482+
Stream stream = streamFactory();
1483+
stream.Position = start ?? 0;
1484+
return stream;
1485+
}
14571486
}
14581487

14591488
protected virtual async ValueTask ExtractUsingManagedZipWorker(
1460-
ZipArchiveEntry entry,
1461-
StreamFactoryAsync streamFactory,
1462-
string outputDir,
1463-
CancellationToken token)
1489+
ZipArchiveEntry entry,
1490+
Stream sourceStream,
1491+
string outputDir,
1492+
CancellationToken token)
14641493
{
14651494
byte[] buffer = ArrayPool<byte>.Shared.Rent(128 << 10);
14661495

14671496
try
14681497
{
1469-
if (token.IsCancellationRequested ||
1470-
entry.IsDirectory)
1471-
{
1472-
return;
1473-
}
1474-
14751498
string outputPath = Path.Combine(outputDir, entry.Filename);
14761499
FileInfo outputFile = new FileInfo(outputPath).EnsureCreationOfDirectory()
14771500
.StripAlternateDataStream()
14781501
.EnsureNoReadOnly();
14791502

1480-
await using Stream deflateStream = await entry.OpenStreamFromFactoryAsync(streamFactory, token);
1503+
await using Stream deflateStream = await entry.OpenStreamFromAsync(sourceStream, false, false, token);
14811504
await using FileStream fileStream = outputFile.Open(FileMode.Create,
14821505
FileAccess.Write,
14831506
FileShare.Write);

0 commit comments

Comments
 (0)