Skip to content

Trigger post-update events for Package updates. #8846

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Fails if that package has no existing Package entity.
tx.insert(p!);
tx.delete(mpKey);
});
await purgePackageCache(packageName);
await triggerPackagePostUpdates(packageName).future;

return {
'package': packageName,
Expand Down
12 changes: 3 additions & 9 deletions app/lib/admin/actions/moderate_package.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

import '../../admin/backend.dart';
import '../../admin/models.dart';
import '../../package/api_export/api_exporter.dart';
import '../../package/backend.dart';
import '../../package/models.dart';
import '../../scorecard/backend.dart';
import '../../shared/datastore.dart';
import '../../task/backend.dart';
import 'actions.dart';

final moderatePackage = AdminAction(
Expand Down Expand Up @@ -108,11 +107,7 @@ Future<Map<String, dynamic>> adminMarkPackageVisibility(
return pkg;
});

// make sure visibility cache is updated immediately
await purgePackageCache(package);

// sync exported API(s)
await apiExporter.synchronizePackage(package, forceDelete: true);
await triggerPackagePostUpdates(package, exportForceDelete: true).future;

// retract or re-populate public archive files
await packageBackend.tarballStorage.updatePublicArchiveBucket(
Expand All @@ -121,8 +116,7 @@ Future<Map<String, dynamic>> adminMarkPackageVisibility(
deleteIfOlder: Duration.zero,
);

await taskBackend.trackPackage(package);
await purgePackageCache(package);
await purgeScorecardData(package, p2!.latestVersion!, isLatest: true);
}

return {
Expand Down
10 changes: 1 addition & 9 deletions app/lib/admin/actions/moderate_package_versions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
import 'package:_pub_shared/utils/sdk_version_cache.dart';
import 'package:clock/clock.dart';

import '../../package/api_export/api_exporter.dart';
import '../../package/backend.dart';
import '../../package/models.dart';
import '../../scorecard/backend.dart';
import '../../shared/datastore.dart';
import '../../shared/versions.dart';
import '../../task/backend.dart';

import '../backend.dart';
import '../models.dart';
Expand Down Expand Up @@ -144,11 +142,7 @@ Future<Map<String, dynamic>> adminMarkPackageVersionVisibility(
return v;
});

// make sure visibility cache is updated immediately
await purgePackageCache(package);

// sync exported API(s)
await apiExporter.synchronizePackage(package, forceDelete: true);
await triggerPackagePostUpdates(package, exportForceDelete: true).future;

// retract or re-populate public archive files
await packageBackend.tarballStorage.updatePublicArchiveBucket(
Expand All @@ -157,8 +151,6 @@ Future<Map<String, dynamic>> adminMarkPackageVersionVisibility(
deleteIfOlder: Duration.zero,
);

await taskBackend.trackPackage(package);
await purgePackageCache(package);
await purgeScorecardData(package, version, isLatest: true);
}

Expand Down
2 changes: 1 addition & 1 deletion app/lib/admin/actions/package_version_retraction.dart
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ value of `set-retracted`, which should either be `true` or `false`.
'isRetracted': pv.isRetracted,
};
});
await purgePackageCache(packageName);
triggerPackagePostUpdates(packageName);

return {
'before': before,
Expand Down
3 changes: 2 additions & 1 deletion app/lib/admin/actions/publisher_package_remove.dart
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ If the publisher has no members, the package will end up without uploaders.
),
);
});
await purgePackageCache(packageName);
triggerPackagePostUpdates(packageName,
skipTask: true, skipVersionsExport: true);
await purgePublisherCache(publisherId: currentPublisherId);
return {
'previousPublisher': currentPublisherId,
Expand Down
11 changes: 4 additions & 7 deletions app/lib/admin/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import '../account/models.dart';
import '../admin/models.dart';
import '../audit/models.dart';
import '../package/backend.dart'
show checkPackageVersionParams, packageBackend, purgePackageCache;
show checkPackageVersionParams, packageBackend, triggerPackagePostUpdates;
import '../package/models.dart';
import '../publisher/models.dart';
import '../scorecard/backend.dart';
Expand All @@ -32,7 +32,6 @@ import '../shared/configuration.dart';
import '../shared/datastore.dart';
import '../shared/exceptions.dart';
import '../shared/versions.dart';
import '../task/backend.dart';
import 'actions/actions.dart' show AdminAction;
import 'tools/delete_all_staging.dart';
import 'tools/list_tools.dart';
Expand Down Expand Up @@ -400,7 +399,7 @@ class AdminBackend {
await _db
.deleteWithQuery(_db.query<PackageVersion>(ancestorKey: packageKey));

await purgePackageCache(packageName);
triggerPackagePostUpdates(packageName);

_logger.info('Package "$packageName" got successfully removed.');
return (
Expand Down Expand Up @@ -448,7 +447,7 @@ class AdminBackend {
caller, tx, p, pv, isRetracted);
}
});
await purgePackageCache(packageName);
triggerPackagePostUpdates(packageName);
}
}

Expand Down Expand Up @@ -517,10 +516,8 @@ class AdminBackend {
tx.insert(package);
});

await purgePackageCache(packageName);
await purgeScorecardData(packageName, version, isLatest: true);
// trigger (eventual) re-analysis
await taskBackend.trackPackage(packageName);
triggerPackagePostUpdates(packageName);
return (
deletedPackageVersions: deletedPackageVersions,
deletedPackageVersionInfos: deletedPackageVersionInfos.deleted,
Expand Down
3 changes: 2 additions & 1 deletion app/lib/admin/tools/package_publisher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ Future<String> executeSetPackagePublisher(List<String> args) async {
tx.insert(pkg);
});
await purgePublisherCache(publisherId: publisherId);
await purgePackageCache(packageName);
triggerPackagePostUpdates(packageName,
skipTask: true, skipVersionsExport: true);
if (currentPublisherId != null) {
await purgePublisherCache(publisherId: currentPublisherId);
}
Expand Down
78 changes: 63 additions & 15 deletions app/lib/package/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,11 @@ class PackageBackend {
return true;
});
if (updated) {
await purgePackageCache(package);
triggerPackagePostUpdates(
package,
skipTask: true,
skipExport: true,
);
}
return updated;
}
Expand Down Expand Up @@ -480,9 +484,7 @@ class PackageBackend {
options: optionsChanges,
));
});
await purgePackageCache(package);
await taskBackend.trackPackage(package);
await apiExporter.synchronizePackage(package);
triggerPackagePostUpdates(package, skipVersionsExport: true);
}

/// Updates [options] on [package]/[version], assuming the current user
Expand Down Expand Up @@ -520,10 +522,9 @@ class PackageBackend {
authenticatedUser, tx, p, pv, options.isRetracted!);
}
});
await purgePackageCache(package);
await purgeScorecardData(package, version,
isLatest: pkg.latestVersion == version);
await apiExporter.synchronizePackage(package);
triggerPackagePostUpdates(package);
}

/// Verifies an update to the credential-less publishing settings and
Expand Down Expand Up @@ -780,15 +781,18 @@ class PackageBackend {
return _asPackagePublisherInfo(package);
});
await purgePublisherCache(publisherId: request.publisherId);
await purgePackageCache(packageName);

if (email != null) {
await emailBackend.trySendOutgoingEmail(email!);
}
if (currentPublisherId != null) {
await purgePublisherCache(publisherId: currentPublisherId);
}
await apiExporter.synchronizePackage(packageName);
triggerPackagePostUpdates(
packageName,
skipTask: true,
skipVersionsExport: true,
);
return rs;
}

Expand Down Expand Up @@ -1299,7 +1303,7 @@ class PackageBackend {
sw.reset();

_logger.info('Invalidating cache for package ${newVersion.package}.');
await purgePackageCache(newVersion.package);
triggerPackagePostUpdates(newVersion.package, taskUpdateDependents: true);

// Let's not block the upload response on these post-upload tasks.
// The operations should either be non-critical, or should be retried
Expand All @@ -1324,12 +1328,10 @@ class PackageBackend {
await Future.wait([
if (activeConfiguration.isPublishedEmailNotificationEnabled)
emailBackend.trySendOutgoingEmail(outgoingEmail),
taskBackend.trackPackage(newVersion.package, updateDependents: true),
apiExporter.synchronizePackage(newVersion.package),
apiExporter.synchronizeAllPackagesAtomFeed(),
tarballStorage.updateContentDispositionOnPublicBucket(
newVersion.package, newVersion.version!),
]);
await tarballStorage.updateContentDispositionOnPublicBucket(
newVersion.package, newVersion.version!);
} catch (e, st) {
final v = newVersion.qualifiedVersionKey;
_logger.severe('Error post-processing package upload $v', e, st);
Expand Down Expand Up @@ -1581,7 +1583,7 @@ class PackageBackend {
package: packageName,
));
});
await purgePackageCache(packageName);
triggerPackagePostUpdates(packageName, skipTask: true, skipExport: true);
}

Future<void> _validatePackageUploader(
Expand Down Expand Up @@ -1657,7 +1659,7 @@ class PackageBackend {
uploaderUser: uploader,
));
});
await purgePackageCache(packageName);
triggerPackagePostUpdates(packageName, skipTask: true, skipExport: true);
return api.SuccessMessage(
success: api.Message(
message:
Expand Down Expand Up @@ -2096,3 +2098,49 @@ class _VersionTransactionDataAcccess {
return await _tx.query<PackageVersion>(pkgKey).run().toList();
}
}

/// Triggers post-update event processing after a [Package] object is part of
/// a transaction.
///
/// Returns a record with an optionally awaitable [Future] in case the caller needs
/// wait the updates before yielding its response.
({Future future}) triggerPackagePostUpdates(
String package, {
/// Skip trigger a new analysis on the package.
bool skipTask = false,

/// Skip triggering a new export to the CDN bucket.
bool skipExport = false,

/// Skip only the version-related exports to the CDN bucket, keeps the
/// package-related operations.
/// TODO: implement this in API exporter.
bool skipVersionsExport = false,

/// Pass the force-deletion flag to the package export operation.
bool exportForceDelete = false,

/// Pass the update-dependents flag to the task update operation.
bool taskUpdateDependents = false,
}) {
Future add(Future Function() fn) {
return asyncQueue.addAsyncFn(fn).future;
}

final futures = [
add(() => purgePackageCache(package)),
if (!skipTask)
add(() => taskBackend.trackPackage(
package,
updateDependents: taskUpdateDependents,
)),
if (!skipExport)
add(() => apiExporter.synchronizePackage(
package,
forceDelete: exportForceDelete,
// TODO: implement and use [skipVersionsExport]
)),
];

return (future: Future.wait(futures));
}
9 changes: 7 additions & 2 deletions app/lib/service/async_queue/async_queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ class AsyncQueue {

AsyncQueue() : _zone = Zone.current;

void addAsyncFn(AsyncFn fn) {
({Future future}) addAsyncFn(AsyncFn fn) {
if (_closed) {
throw StateError('AsyncQueue is closed, task was not accepted.');
}
_queue.add(_Task(fn, StackTrace.current));
final task = _Task(fn, StackTrace.current);
_queue.add(task);
_triggerProcessing();
return (future: task.completer.future);
}

void _triggerProcessing() {
Expand All @@ -61,10 +63,12 @@ class AsyncQueue {
final first = _queue.removeFirst();
try {
await first.fn();
first.completer.complete();
} catch (e, st) {
final trace = Chain([Trace.from(first.origin), Trace.from(st)]).terse;
stderr.writeln('Error executing off-request function: $e\n$trace');
_logger.severe('Error executing off-request function.', e, trace);
first.completer.completeError(e, st);
}
}

Expand All @@ -80,6 +84,7 @@ class AsyncQueue {
class _Task {
final AsyncFn fn;
final StackTrace origin;
final completer = Completer();

_Task(this.fn, this.origin);
}