Skip to content

[metrics_center] Add retries to unlock a lock file in case of 504 errors #4323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 29, 2023
Merged
4 changes: 4 additions & 0 deletions packages/metrics_center/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 1.0.10

* Adds retry logic when removing a `GcsLock` file lock in case of failure.

## 1.0.9

* Adds compatibility with `http` 1.0.
Expand Down
28 changes: 20 additions & 8 deletions packages/metrics_center/lib/src/gcs_lock.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
// ignore_for_file: avoid_print

import 'package:googleapis/storage/v1.dart';
import 'package:googleapis_auth/googleapis_auth.dart';

/// Global (in terms of earth) mutex using Google Cloud Storage.
class GcsLock {
/// Create a lock with an authenticated client and a GCS bucket name.
///
/// The client is used to communicate with Google Cloud Storage APIs.
GcsLock(this._client, this._bucketName) {
_api = StorageApi(_client);
}
GcsLock(this._api, this._bucketName);

/// Create a temporary lock file in GCS, and use it as a mutex mechanism to
/// run a piece of code exclusively.
Expand Down Expand Up @@ -79,13 +76,28 @@ class GcsLock {
}

Future<void> _unlock(String lockFileName) async {
await _api.objects.delete(_bucketName, lockFileName);
Duration waitPeriod = const Duration(milliseconds: 10);
bool unlocked = false;
// Retry in the case of GCS returning an API error, but rethrow if unable
// to unlock after a certain period of time.
while (!unlocked) {
try {
await _api.objects.delete(_bucketName, lockFileName);
unlocked = true;
} on DetailedApiRequestError {
if (waitPeriod < _unlockThreshold) {
await Future<void>.delayed(waitPeriod);
waitPeriod *= 2;
} else {
rethrow;
}
}
}
}

late StorageApi _api;

final String _bucketName;
final AuthClient _client;
final StorageApi _api;

static const Duration _kWarningThreshold = Duration(seconds: 10);
static const Duration _unlockThreshold = Duration(minutes: 1);
}
5 changes: 3 additions & 2 deletions packages/metrics_center/lib/src/skiaperf.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import 'dart:convert';

import 'package:gcloud/storage.dart';
import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError;
import 'package:googleapis/storage/v1.dart'
show DetailedApiRequestError, StorageApi;
import 'package:googleapis_auth/auth_io.dart';

import 'common.dart';
Expand Down Expand Up @@ -388,7 +389,7 @@ class SkiaPerfDestination extends MetricDestination {
}
final SkiaPerfGcsAdaptor adaptor =
SkiaPerfGcsAdaptor(storage.bucket(bucketName));
final GcsLock lock = GcsLock(client, bucketName);
final GcsLock lock = GcsLock(StorageApi(client), bucketName);
return SkiaPerfDestination(adaptor, lock);
}

Expand Down
3 changes: 2 additions & 1 deletion packages/metrics_center/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: metrics_center
version: 1.0.9
version: 1.0.10
description:
Support multiple performance metrics sources/formats and destinations.
repository: https://github.com/flutter/packages/tree/main/packages/metrics_center
Expand All @@ -9,6 +9,7 @@ environment:
sdk: ">=2.18.0 <4.0.0"

dependencies:
_discoveryapis_commons: ^1.0.0
crypto: ^3.0.1
equatable: ^2.0.3
gcloud: ^0.8.2
Expand Down
50 changes: 45 additions & 5 deletions packages/metrics_center/test/gcs_lock_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ enum TestPhase {
run2,
}

@GenerateMocks(<Type>[AuthClient])
@GenerateMocks(<Type>[
AuthClient,
StorageApi
], customMocks: <MockSpec<dynamic>>[
MockSpec<ObjectsResource>(onMissingStub: OnMissingStub.returnDefault)
])
void main() {
const Duration kDelayStep = Duration(milliseconds: 10);
final Map<String, dynamic>? credentialsJson = getTestGcpCredentialsJson();
Expand All @@ -36,7 +41,7 @@ void main() {
Zone.current.fork(specification: spec).run<void>(() {
fakeAsync((FakeAsync fakeAsync) {
final MockAuthClient mockClient = MockAuthClient();
final GcsLock lock = GcsLock(mockClient, 'mockBucket');
final GcsLock lock = GcsLock(StorageApi(mockClient), 'mockBucket');
when(mockClient.send(any)).thenThrow(DetailedApiRequestError(412, ''));
final Future<void> runFinished =
lock.protectedRun('mock.lock', () async {});
Expand All @@ -63,7 +68,7 @@ void main() {
test('GcsLock integration test: single protectedRun is successful', () async {
final AutoRefreshingAuthClient client = await clientViaServiceAccount(
ServiceAccountCredentials.fromJson(credentialsJson), Storage.SCOPES);
final GcsLock lock = GcsLock(client, kTestBucketName);
final GcsLock lock = GcsLock(StorageApi(client), kTestBucketName);
int testValue = 0;
await lock.protectedRun('test.lock', () async {
testValue = 1;
Expand All @@ -74,8 +79,8 @@ void main() {
test('GcsLock integration test: protectedRun is exclusive', () async {
final AutoRefreshingAuthClient client = await clientViaServiceAccount(
ServiceAccountCredentials.fromJson(credentialsJson), Storage.SCOPES);
final GcsLock lock1 = GcsLock(client, kTestBucketName);
final GcsLock lock2 = GcsLock(client, kTestBucketName);
final GcsLock lock1 = GcsLock(StorageApi(client), kTestBucketName);
final GcsLock lock2 = GcsLock(StorageApi(client), kTestBucketName);

TestPhase phase = TestPhase.run1;
final Completer<void> started1 = Completer<void>();
Expand Down Expand Up @@ -105,4 +110,39 @@ void main() {
await finished1;
await finished2;
}, skip: credentialsJson == null);

test('GcsLock attempts to unlock again on a DetailedApiRequestError',
() async {
fakeAsync((FakeAsync fakeAsync) {
final StorageApi mockStorageApi = MockStorageApi();
final ObjectsResource mockObjectsResource = MockObjectsResource();
final GcsLock gcsLock = GcsLock(mockStorageApi, kTestBucketName);
const String lockFileName = 'test.lock';
when(mockStorageApi.objects).thenReturn(mockObjectsResource);

// Simulate a failure to delete a lock file.
when(mockObjectsResource.delete(kTestBucketName, lockFileName))
.thenThrow(DetailedApiRequestError(504, ''));

gcsLock.protectedRun(lockFileName, () async {});

// Allow time to pass by to ensure deleting the lock file is retried multiple times.
fakeAsync.elapse(const Duration(milliseconds: 30));
verify(mockObjectsResource.delete(kTestBucketName, lockFileName))
.called(3);

// Simulate a successful deletion of the lock file.
when(mockObjectsResource.delete(kTestBucketName, lockFileName))
.thenAnswer((_) => Future<void>(
() {
return;
},
));

// At this point, there should only be one more (successful) attempt to delete the lock file.
fakeAsync.elapse(const Duration(minutes: 2));
verify(mockObjectsResource.delete(kTestBucketName, lockFileName))
.called(1);
});
});
}
Loading