From 378b7126262ac28850da6a379410da06ee678e1e Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 18 Jul 2025 18:00:07 +0200 Subject: [PATCH 1/3] Refactor/split checks. --- CHANGELOG.md | 1 + app/lib/shared/integrity.dart | 335 ++++++++++++---------- app/lib/tool/neat_task/pub_dev_tasks.dart | 10 + app/test/shared/test_services.dart | 5 +- 4 files changed, 198 insertions(+), 153 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b7b641e8..4d2bca3a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ AppEngine version, listed here to ease deployment and troubleshooting. ## `20250804t140400-all` * Bump runtimeVersion to `2025.07.14`. * Note: Updated worker base image to use Debian 12. + * Note: Tarball-related integrity checks are run in a separate weekly task (`check-tarball-integrity`). ## `20250708t090800-all` * Bump runtimeVersion to `2025.07.04`. diff --git a/app/lib/shared/integrity.dart b/app/lib/shared/integrity.dart index 2e7f69a48..da6f897c7 100644 --- a/app/lib/shared/integrity.dart +++ b/app/lib/shared/integrity.dart @@ -11,6 +11,7 @@ import 'package:clock/clock.dart'; import 'package:crypto/crypto.dart'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; +import 'package:meta/meta.dart'; import 'package:pool/pool.dart'; import 'package:retry/retry.dart'; @@ -33,7 +34,6 @@ import 'storage.dart'; import 'urls.dart' as urls; import 'utils.dart' show canonicalizeVersion, ByteArrayEqualsExt; -final _logger = Logger('integrity.check'); final _random = math.Random.secure(); /// The unmapped/unused fields that we expect to be present on some entities. @@ -45,14 +45,73 @@ const _allowedUnmappedFields = { 'User.isBlocked', }; -/// Checks the integrity of the datastore. -class IntegrityChecker { +@visibleForTesting +Stream findAllIntegrityProblems() async* { + yield* IntegrityChecker(dbService)._findProblems(); + yield* TarballIntegrityChecker(dbService)._findProblems(); +} + +class _BaseIntegrityChecker { + final Logger _logger; final DatastoreDB _db; final int _concurrency; /// Maps an unmapped field in the form of `.` to an /// object identifier (usually the `id` value of the entity). final _unmappedFieldsToObject = {}; + + _BaseIntegrityChecker._(this._logger, this._db, this._concurrency); + + void _updateUnmappedFields(Model m) { + if (m is ExpandoModel && m.additionalProperties.isNotEmpty) { + for (final key in m.additionalProperties.keys) { + final qualifiedField = [m.runtimeType.toString(), key].join('.'); + if (_unmappedFieldsToObject.containsKey(qualifiedField)) continue; + _unmappedFieldsToObject[qualifiedField] = m.id.toString(); + } + } + } + + Stream _queryWithPool( + Stream Function(R model) fn, { + /// Note: This time limit aborts the integrity check after a reasonable + /// amount of time has passed with an entity-related operation. + /// + /// The integrity check process should be restarted soon after, and + /// hopefully it should complete on the next round. + Duration timeLimit = const Duration(minutes: 15), + }) async* { + final query = _db.query(); + final pool = Pool(_concurrency); + final futures = >>[]; + try { + await for (final m in query.run()) { + _updateUnmappedFields(m); + final taskFuture = pool.withResource(() async { + final f = fn(m).toList(); + try { + return await f.timeout(timeLimit); + } on TimeoutException catch (e, st) { + _logger.pubNoticeShout('integrity-check-timeout', + 'Integrity check operation timed out.', e, st); + rethrow; + } + }); + futures.add(taskFuture); + } + for (final f in futures) { + for (final item in await f) { + yield item; + } + } + } finally { + await pool.close(); + } + } +} + +/// Checks the integrity of the datastore. +class IntegrityChecker extends _BaseIntegrityChecker { final _userToOauth = {}; final _oauthToUser = {}; final _deletedUsers = {}; @@ -68,15 +127,14 @@ class IntegrityChecker { final _badVersionInPubspec = >{}; int _packageChecked = 0; int _versionChecked = 0; - late http.Client _httpClient; - IntegrityChecker(this._db, {int? concurrency}) - : _concurrency = concurrency ?? 1; + IntegrityChecker(DatastoreDB db, {int? concurrency}) + : super._(Logger('integrity.check'), db, concurrency ?? 1); /// Runs integrity checks, and reports the problems via a [Logger]. Future verifyAndLogIssues() async { var count = 0; - await for (final problem in findProblems()) { + await for (final problem in _findProblems()) { count++; _logger.warning('[pub-integrity-problem] $problem'); } @@ -87,34 +145,29 @@ class IntegrityChecker { } /// Runs integrity checks, and returns the list of problems. - Stream findProblems() async* { - _httpClient = httpRetryClient(lenient: true); - try { - yield* _checkUsers(); - yield* _checkOAuthUserIDs(); - - final publisherAttributes = _PublisherAttributes(); - yield* _checkPublishers(publisherAttributes); - yield* _checkPublisherMembers(publisherAttributes); - yield* _checkPublishersAfterMembers(publisherAttributes); - yield* _checkPackages(publisherAttributes: publisherAttributes); - publisherAttributes.clear(); // no longer used - - yield* _checkVersions(); - yield* _checkLikes(); - yield* _checkModeratedPackages(); - yield* _checkAuditLogs(); - yield* _checkModerationCases(); - yield* _reportPubspecVersionIssues(); - - if (_unmappedFieldsToObject.isNotEmpty) { - for (final entry in _unmappedFieldsToObject.entries) { - if (_allowedUnmappedFields.contains(entry.key)) continue; - yield 'Unmapped field found: "${entry.key}" on entity "${entry.value}".'; - } + Stream _findProblems() async* { + yield* _checkUsers(); + yield* _checkOAuthUserIDs(); + + final publisherAttributes = _PublisherAttributes(); + yield* _checkPublishers(publisherAttributes); + yield* _checkPublisherMembers(publisherAttributes); + yield* _checkPublishersAfterMembers(publisherAttributes); + yield* _checkPackages(publisherAttributes: publisherAttributes); + publisherAttributes.clear(); // no longer used + + yield* _checkVersions(); + yield* _checkLikes(); + yield* _checkModeratedPackages(); + yield* _checkAuditLogs(); + yield* _checkModerationCases(); + yield* _reportPubspecVersionIssues(); + + if (_unmappedFieldsToObject.isNotEmpty) { + for (final entry in _unmappedFieldsToObject.entries) { + if (_allowedUnmappedFields.contains(entry.key)) continue; + yield 'Unmapped field found: "${entry.key}" on entity "${entry.value}".'; } - } finally { - _httpClient.close(); } } @@ -563,9 +616,6 @@ class IntegrityChecker { } Stream _checkPackageVersion(PackageVersion pv) async* { - final archiveDownloadUri = Uri.parse(urls.pkgArchiveDownloadUrl( - pv.package, pv.version!, - baseUri: activeConfiguration.primaryApiUri)); _packagesWithVersion.add(pv.package); if (pv.uploader == null) { @@ -581,17 +631,6 @@ class IntegrityChecker { if (pv.isRetracted && pv.retracted == null) { yield 'PackageVersion "${pv.qualifiedVersionKey}" is retracted, but `retracted` property is null.'; } - final shouldBeInPublicBucket = - !_packagesWithIsModeratedFlag.contains(pv.package) && pv.isVisible; - final tarballItems = await retry( - () async { - return await _checkTarballInBuckets(pv, archiveDownloadUri, - shouldBeInPublicBucket: shouldBeInPublicBucket) - .toList(); - }, - maxAttempts: 2, - ); - yield* Stream.fromIterable(tarballItems); yield* _checkModeratedFlags( kind: 'PackageVersion', @@ -631,61 +670,6 @@ class IntegrityChecker { } } - Stream _checkTarballInBuckets( - PackageVersion pv, - Uri archiveDownloadUri, { - required bool shouldBeInPublicBucket, - }) async* { - final canonicalInfo = await packageBackend.tarballStorage - .getCanonicalBucketArchiveInfo(pv.package, pv.version!); - if (canonicalInfo == null) { - yield 'PackageVersion "${pv.qualifiedVersionKey}" has no matching canonical archive file.'; - return; - } - - final info = - await packageBackend.packageTarballInfo(pv.package, pv.version!); - if (info == null) { - if (shouldBeInPublicBucket) { - yield 'PackageVersion "${pv.qualifiedVersionKey}" has no matching public archive file.'; - } - return; - } - - if (!shouldBeInPublicBucket) { - yield 'PackageVersion "${pv.qualifiedVersionKey}" has matching public archive file but it must not.'; - return; - } - - if (!canonicalInfo.hasSameSignatureAs(info)) { - yield 'Canonical archive for PackageVersion "${pv.qualifiedVersionKey}" differs from public bucket.'; - } - - final publicInfo = await packageBackend.tarballStorage - .getPublicBucketArchiveInfo(pv.package, pv.version!); - if (!canonicalInfo.hasSameSignatureAs(publicInfo)) { - yield 'Canonical archive for PackageVersion "${pv.qualifiedVersionKey}" differs in the public bucket.'; - } - - final sha256Hash = pv.sha256; - if (sha256Hash == null || sha256Hash.length != 32) { - yield 'PackageVersion "${pv.qualifiedVersionKey}" has invalid sha256.'; - } else if (envConfig.isRunningLocally || _random.nextInt(1000) == 0) { - // On prod do not check every archive all the time, but select a few of the archives randomly. - final bytes = (await _httpClient.get(archiveDownloadUri)).bodyBytes; - final hash = sha256.convert(bytes).bytes; - if (!hash.byteToByteEquals(sha256Hash)) { - yield 'PackageVersion "${pv.qualifiedVersionKey}" has sha256 hash mismatch.'; - } - } - - // Also issue a HTTP request. - final rs = await _httpClient.head(archiveDownloadUri); - if (rs.statusCode != 200) { - yield 'PackageVersion "${pv.qualifiedVersionKey}" has no matching archive file (HTTP status ${rs.statusCode}).'; - } - } - Stream _checkLikes() async* { _logger.info('Scanning Likes...'); @@ -936,53 +920,6 @@ class IntegrityChecker { ].join(); } } - - void _updateUnmappedFields(Model m) { - if (m is ExpandoModel && m.additionalProperties.isNotEmpty) { - for (final key in m.additionalProperties.keys) { - final qualifiedField = [m.runtimeType.toString(), key].join('.'); - if (_unmappedFieldsToObject.containsKey(qualifiedField)) continue; - _unmappedFieldsToObject[qualifiedField] = m.id.toString(); - } - } - } - - Stream _queryWithPool( - Stream Function(R model) fn, { - /// Note: This time limit aborts the integrity check after a reasonable - /// amount of time has passed with an entity-related operation. - /// - /// The integrity check process should be restarted soon after, and - /// hopefully it should complete on the next round. - Duration timeLimit = const Duration(minutes: 15), - }) async* { - final query = _db.query(); - final pool = Pool(_concurrency); - final futures = >>[]; - try { - await for (final m in query.run()) { - _updateUnmappedFields(m); - final taskFuture = pool.withResource(() async { - final f = fn(m).toList(); - try { - return await f.timeout(timeLimit); - } on TimeoutException catch (e, st) { - _logger.pubNoticeShout('integrity-check-timeout', - 'Integrity check operation timed out.', e, st); - rethrow; - } - }); - futures.add(taskFuture); - } - for (final f in futures) { - for (final item in await f) { - yield item; - } - } - } finally { - await pool.close(); - } - } } typedef StreamingIssuesFn = Stream Function(); @@ -1056,3 +993,101 @@ Stream _checkAdminDeletedFlags({ yield '$kind "$id" has `isAdminDeleted = false` but `adminDeletedAt` is not null.'; } } + +/// Checks the integrity of the tarball storage (both canonical and public buckets). +class TarballIntegrityChecker extends _BaseIntegrityChecker { + TarballIntegrityChecker(DatastoreDB db, {int? concurrency}) + : super._(Logger('tarball-integrity-check'), db, concurrency ?? 1); + + /// Runs integrity checks, and reports the problems via a [Logger]. + Future verifyAndLogIssues() async { + var count = 0; + await for (final problem in _findProblems()) { + count++; + _logger.warning('[tarball-integrity-problem] $problem'); + } + _logger.info([ + 'Tarball integrity check completed with $count issue(s).', + if (count == 0) '[tarball-integrity-no-problems-found]', + ].join(' ')); + } + + /// Runs integrity checks, and returns the list of problems. + Stream _findProblems() async* { + final httpClient = httpRetryClient(lenient: true); + try { + yield* _checkVersions(httpClient); + } finally { + httpClient.close(); + } + } + + Stream _checkVersions(http.Client httpClient) async* { + _logger.info('Scanning PackageVersion tarballs...'); + yield* _queryWithPool((pv) async* { + final items = await retry( + () => _checkTarballInBuckets(pv, httpClient).toList(), + maxAttempts: 2); + yield* Stream.fromIterable(items); + }); + } + + Stream _checkTarballInBuckets( + PackageVersion pv, http.Client httpClient) async* { + final archiveDownloadUri = Uri.parse(urls.pkgArchiveDownloadUrl( + pv.package, pv.version!, + baseUri: activeConfiguration.primaryApiUri)); + + final shouldBeInPublicBucket = + await packageBackend.isPackageVisible(pv.package) && pv.isVisible; + + final canonicalInfo = await packageBackend.tarballStorage + .getCanonicalBucketArchiveInfo(pv.package, pv.version!); + if (canonicalInfo == null) { + yield 'PackageVersion "${pv.qualifiedVersionKey}" has no matching canonical archive file.'; + return; + } + + final info = + await packageBackend.packageTarballInfo(pv.package, pv.version!); + if (info == null) { + if (shouldBeInPublicBucket) { + yield 'PackageVersion "${pv.qualifiedVersionKey}" has no matching public archive file.'; + } + return; + } + + if (!shouldBeInPublicBucket) { + yield 'PackageVersion "${pv.qualifiedVersionKey}" has matching public archive file but it must not.'; + return; + } + + if (!canonicalInfo.hasSameSignatureAs(info)) { + yield 'Canonical archive for PackageVersion "${pv.qualifiedVersionKey}" differs from public bucket.'; + } + + final publicInfo = await packageBackend.tarballStorage + .getPublicBucketArchiveInfo(pv.package, pv.version!); + if (!canonicalInfo.hasSameSignatureAs(publicInfo)) { + yield 'Canonical archive for PackageVersion "${pv.qualifiedVersionKey}" differs in the public bucket.'; + } + + final sha256Hash = pv.sha256; + if (sha256Hash == null || sha256Hash.length != 32) { + yield 'PackageVersion "${pv.qualifiedVersionKey}" has invalid sha256.'; + } else if (envConfig.isRunningLocally || _random.nextInt(1000) == 0) { + // On prod do not check every archive all the time, but select a few of the archives randomly. + final bytes = (await httpClient.get(archiveDownloadUri)).bodyBytes; + final hash = sha256.convert(bytes).bytes; + if (!hash.byteToByteEquals(sha256Hash)) { + yield 'PackageVersion "${pv.qualifiedVersionKey}" has sha256 hash mismatch.'; + } + } + + // Also issue a HTTP request. + final rs = await httpClient.head(archiveDownloadUri); + if (rs.statusCode != 200) { + yield 'PackageVersion "${pv.qualifiedVersionKey}" has no matching archive file (HTTP status ${rs.statusCode}).'; + } + } +} diff --git a/app/lib/tool/neat_task/pub_dev_tasks.dart b/app/lib/tool/neat_task/pub_dev_tasks.dart index 2d6d448b0..320edb977 100644 --- a/app/lib/tool/neat_task/pub_dev_tasks.dart +++ b/app/lib/tool/neat_task/pub_dev_tasks.dart @@ -248,6 +248,16 @@ List createPeriodicTaskSchedulers({ .verifyAndLogIssues(), timeout: Duration(days: 1), ), + + // Checks the tarball storage integrity of the archive files. + _weekly( + name: 'check-tarball-integrity', + isRuntimeVersioned: true, + task: () async => await TarballIntegrityChecker(dbService, concurrency: 4) + .verifyAndLogIssues(), + timeout: Duration(days: 1), + ), + // Deletes the old search snapshots _weekly( name: 'delete-old-search-snapshots', diff --git a/app/test/shared/test_services.dart b/app/test/shared/test_services.dart index 9cee47256..edefbf261 100644 --- a/app/test/shared/test_services.dart +++ b/app/test/shared/test_services.dart @@ -136,7 +136,7 @@ final class FakeAppengineEnv { Future _postTestVerification({ required Pattern? integrityProblem, }) async { - final problems = await IntegrityChecker(dbService).findProblems().toList(); + final problems = await findAllIntegrityProblems().toList(); if (problems.isNotEmpty && (integrityProblem == null || integrityProblem.matchAsPrefix(problems.first) == null)) { @@ -153,8 +153,7 @@ Future _postTestVerification({ } // re-run integrity checks on the updated state - final laterProblems = - await IntegrityChecker(dbService).findProblems().toList(); + final laterProblems = await findAllIntegrityProblems().toList(); expect(laterProblems, problems); } From f4882c68daba9127176d0c35a7eeebb689a58ffc Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 18 Jul 2025 18:00:07 +0200 Subject: [PATCH 2/3] Also test archive tarballs in canonical bucket. --- app/lib/package/tarball_storage.dart | 13 +++++ app/lib/shared/integrity.dart | 75 +++++++++++++++++++++++++--- 2 files changed, 80 insertions(+), 8 deletions(-) diff --git a/app/lib/package/tarball_storage.dart b/app/lib/package/tarball_storage.dart index 525ee2b43..d70d0e8bf 100644 --- a/app/lib/package/tarball_storage.dart +++ b/app/lib/package/tarball_storage.dart @@ -61,6 +61,19 @@ class TarballStorage { String getCanonicalBucketAbsoluteObjectName(String package, String version) => _canonicalBucket.absoluteObjectName(tarballObjectName(package, version)); + /// Get a list of package names in the canonical bucket. + Future> listPackagesInCanonicalBucket() async { + final items = await _canonicalBucket.listAllItemsWithRetry( + prefix: 'packages/', delimiter: '-'); + final packages = items + .where((i) => i.isDirectory) + .map((i) => i.name) + .map((name) => name.substring(9).split('-').first) + .toSet() + .toList(); + return packages; + } + /// Get map from `version` to [SourceObjectInfo] for each version of [package] in /// canonical bucket. Future> listVersionsInCanonicalBucket( diff --git a/app/lib/shared/integrity.dart b/app/lib/shared/integrity.dart index da6f897c7..8b0c5b83d 100644 --- a/app/lib/shared/integrity.dart +++ b/app/lib/shared/integrity.dart @@ -8,6 +8,7 @@ import 'dart:math' as math; import 'package:_pub_shared/search/tags.dart'; import 'package:_pub_shared/utils/http.dart'; import 'package:clock/clock.dart'; +import 'package:collection/collection.dart'; import 'package:crypto/crypto.dart'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; @@ -72,25 +73,25 @@ class _BaseIntegrityChecker { } } - Stream _queryWithPool( - Stream Function(R model) fn, { + Stream _streamWithPool( + Stream Function() streamFn, + Stream Function(T item) itemFn, { /// Note: This time limit aborts the integrity check after a reasonable /// amount of time has passed with an entity-related operation. /// /// The integrity check process should be restarted soon after, and /// hopefully it should complete on the next round. - Duration timeLimit = const Duration(minutes: 15), + Duration? timeLimit, }) async* { - final query = _db.query(); + timeLimit ??= const Duration(minutes: 15); final pool = Pool(_concurrency); final futures = >>[]; try { - await for (final m in query.run()) { - _updateUnmappedFields(m); + await for (final item in streamFn()) { final taskFuture = pool.withResource(() async { - final f = fn(m).toList(); + final f = itemFn(item).toList(); try { - return await f.timeout(timeLimit); + return await f.timeout(timeLimit!); } on TimeoutException catch (e, st) { _logger.pubNoticeShout('integrity-check-timeout', 'Integrity check operation timed out.', e, st); @@ -108,6 +109,25 @@ class _BaseIntegrityChecker { await pool.close(); } } + + Stream _queryWithPool( + Stream Function(R model) fn, { + /// Note: This time limit aborts the integrity check after a reasonable + /// amount of time has passed with an entity-related operation. + /// + /// The integrity check process should be restarted soon after, and + /// hopefully it should complete on the next round. + Duration? timeLimit, + }) async* { + yield* _streamWithPool( + () => _db.query().run(), + (m) async* { + _updateUnmappedFields(m); + yield* fn(m); + }, + timeLimit: timeLimit, + ); + } } /// Checks the integrity of the datastore. @@ -1017,6 +1037,7 @@ class TarballIntegrityChecker extends _BaseIntegrityChecker { final httpClient = httpRetryClient(lenient: true); try { yield* _checkVersions(httpClient); + yield* _checkCanonicalFiles(); } finally { httpClient.close(); } @@ -1090,4 +1111,42 @@ class TarballIntegrityChecker extends _BaseIntegrityChecker { yield 'PackageVersion "${pv.qualifiedVersionKey}" has no matching archive file (HTTP status ${rs.statusCode}).'; } } + + /// Checks if the canonical bucket contains files that have existing database entry. + Stream _checkCanonicalFiles() async* { + _logger.info('Scanning canonical bucket...'); + + // NOTE: To make it effiecient, we don't list all the files in the bucket, + // only the package names. This may leave out other files that are not in + // the `packages/` directory (objectname-prefix), or for some reason not + // matched as package names. + final packages = await packageBackend.tarballStorage + .listPackagesInCanonicalBucket() + .timeout(Duration(minutes: 15)); + yield* _streamWithPool( + () => Stream.fromIterable(packages), + (package) async* { + final p = await packageBackend.lookupPackage(package); + if (p == null) { + yield 'Missing Package entity in database: "$package".'; + return; + } + final bucketVersions = await packageBackend.tarballStorage + .listVersionsInCanonicalBucket(package); + final dbVersions = await packageBackend.versionsOfPackage(package); + + for (final e in bucketVersions.entries) { + final version = e.key; + final dbv = dbVersions.singleWhereOrNull((e) => e.version == version); + if (dbv == null) { + // NOTE: This check may expose files that become stale during an aborted upload. + // If that's the case, further bucket cleanup may be required. + yield 'Missing PackageVersion entity in database: "$package/$version". May be a stale upload, investigatation needed!'; + } + } + + // Note: checking the other way around is already done via iterating over the version entries in the database. + }, + ); + } } From 4046c586fef2b58504f5a61673a9e10a5e70f9ca Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Thu, 7 Aug 2025 10:23:52 +0200 Subject: [PATCH 3/3] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d2bca3a9..12ab5c15a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,11 +6,11 @@ AppEngine version, listed here to ease deployment and troubleshooting. * Upgraded stable Dart analysis SDK to `3.8.3` * Upgraded stable Flutter analysis SDK to `3.32.8`. * Upgraded pana to `0.22.22`. + * Note: Tarball-related integrity checks are run in a separate weekly task (`check-tarball-integrity`). ## `20250804t140400-all` * Bump runtimeVersion to `2025.07.14`. * Note: Updated worker base image to use Debian 12. - * Note: Tarball-related integrity checks are run in a separate weekly task (`check-tarball-integrity`). ## `20250708t090800-all` * Bump runtimeVersion to `2025.07.04`.