Skip to content

Commit c04e483

Browse files
authored
Updated retry handling in storage access. (#8499)
1 parent a5dc0d2 commit c04e483

File tree

4 files changed

+36
-91
lines changed

4 files changed

+36
-91
lines changed

app/lib/package/api_export/exported_api.dart

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ final class ExportedApi {
121121
// Only delete the item if it's older than _minGarbageAge
122122
// This avoids any races where we delete files we've just created
123123
// TODO: Conditionally deletion API from package:gcloud would be better!
124-
await _bucket.tryDelete(item.name);
124+
await _bucket.tryDeleteWithRetry(item.name);
125125
}
126126
});
127127

@@ -137,7 +137,7 @@ final class ExportedApi {
137137
item.updated.isBefore(gcFilesBefore)) {
138138
// Only delete the item if it's older than _minGarbageAge
139139
// This avoids any races where we delete files we've just created
140-
await _bucket.tryDelete(item.name);
140+
await _bucket.tryDeleteWithRetry(item.name);
141141
}
142142
});
143143
}
@@ -184,7 +184,7 @@ final class ExportedApi {
184184
await _listBucket(
185185
prefix: entry.name,
186186
delimiter: '',
187-
(entry) async => await _bucket.tryDelete(entry.name),
187+
(entry) async => await _bucket.tryDeleteWithRetry(entry.name),
188188
);
189189
}
190190
}));
@@ -336,7 +336,7 @@ final class ExportedPackage {
336336
item.updated.isBefore(clock.agoBy(_minGarbageAge))) {
337337
// Only delete if the item if it's older than _minGarbageAge
338338
// This avoids any races where we delete files we've just created
339-
await _owner._bucket.tryDelete(item.name);
339+
await _owner._bucket.tryDeleteWithRetry(item.name);
340340
}
341341
});
342342

@@ -380,7 +380,7 @@ final class ExportedPackage {
380380
if (info.updated.isBefore(clock.agoBy(_minGarbageAge))) {
381381
// Only delete if the item if it's older than _minGarbageAge
382382
// This avoids any races where we delete files we've just created
383-
await _owner._bucket.tryDelete(item.name);
383+
await _owner._bucket.tryDeleteWithRetry(item.name);
384384
}
385385
}
386386
// Ignore cases where tryInfo fails, assuming the object has been
@@ -399,7 +399,7 @@ final class ExportedPackage {
399399
await _owner._listBucket(
400400
prefix: prefix + '/api/archives/$_package-',
401401
delimiter: '',
402-
(item) async => await _owner._bucket.tryDelete(item.name),
402+
(item) async => await _owner._bucket.tryDeleteWithRetry(item.name),
403403
);
404404
}),
405405
]);
@@ -442,7 +442,7 @@ sealed class ExportedObject {
442442
Future<void> delete() async {
443443
await Future.wait(_owner._prefixes.map((prefix) async {
444444
await _owner._pool.withResource(() async {
445-
await _owner._bucket.tryDelete(prefix + _objectName);
445+
await _owner._bucket.tryDeleteWithRetry(prefix + _objectName);
446446
});
447447
}));
448448
}

app/lib/package/tarball_storage.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ class TarballStorage {
176176
Future<void> deleteArchiveFromAllBuckets(
177177
String package, String version) async {
178178
final objectName = tarballObjectName(package, version);
179-
await deleteFromBucket(_canonicalBucket, objectName);
180-
await deleteFromBucket(_publicBucket, objectName);
179+
await _canonicalBucket.deleteWithRetry(objectName);
180+
await _publicBucket.deleteWithRetry(objectName);
181181
}
182182

183183
/// Deletes the package archive file from the canonical bucket.

app/lib/shared/storage.dart

Lines changed: 27 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,12 @@ import 'package:retry/retry.dart';
2121

2222
import 'configuration.dart';
2323
import 'utils.dart'
24-
show
25-
contentType,
26-
jsonUtf8Encoder,
27-
retryAsync,
28-
ByteArrayEqualsExt,
29-
DeleteCounts;
24+
show contentType, jsonUtf8Encoder, ByteArrayEqualsExt, DeleteCounts;
3025
import 'versions.dart' as versions;
3126

3227
final _gzip = GZipCodec();
3328
final _logger = Logger('shared.storage');
3429

35-
const _retryStatusCodes = <int>{502, 503, 504};
36-
3730
/// Additional methods on the storage service.
3831
extension StorageExt on Storage {
3932
/// Verifies bucket existence and access.
@@ -115,18 +108,19 @@ extension BucketExt on Bucket {
115108
}
116109

117110
/// Deletes [name] if it exists, ignores 404 otherwise.
118-
Future<void> tryDelete(String name) async {
119-
return await retry(
111+
Future<bool> tryDeleteWithRetry(String name) async {
112+
return await _retry(
120113
() async {
121114
try {
122-
return await delete(name);
115+
await delete(name);
116+
return true;
123117
} on DetailedApiRequestError catch (e) {
124-
if (e.status == 404) return null;
118+
if (e.status == 404) {
119+
return false;
120+
}
125121
rethrow;
126122
}
127123
},
128-
maxAttempts: 3,
129-
retryIf: _retryIf,
130124
);
131125
}
132126

@@ -158,7 +152,7 @@ extension BucketExt on Bucket {
158152
if (maxSize != null && length != null && maxSize < length) {
159153
throw MaximumSizeExceeded(maxSize);
160154
}
161-
return retry(
155+
return _retry(
162156
() async {
163157
final timeout = Duration(seconds: 30);
164158
final deadline = clock.now().add(timeout);
@@ -175,8 +169,6 @@ extension BucketExt on Bucket {
175169
}
176170
return builder.toBytes();
177171
},
178-
maxAttempts: 3,
179-
retryIf: _retryIf,
180172
);
181173
}
182174

@@ -270,8 +262,17 @@ extension PageExt<T> on Page<T> {
270262
}
271263
}
272264

273-
Future<R> _retry<R>(Future<R> Function() fn) async {
274-
return await retry(fn, maxAttempts: 3, retryIf: _retryIf);
265+
Future<R> _retry<R>(
266+
Future<R> Function() fn, {
267+
FutureOr<void> Function(Exception)? onRetry,
268+
}) async {
269+
return await retry(
270+
fn,
271+
maxAttempts: 3,
272+
delayFactor: Duration(seconds: 2),
273+
retryIf: _retryIf,
274+
onRetry: onRetry,
275+
);
275276
}
276277

277278
bool _retryIf(Exception e) {
@@ -295,32 +296,6 @@ bool _retryIf(Exception e) {
295296
String bucketUri(Bucket bucket, String path) =>
296297
'gs://${bucket.bucketName}/$path';
297298

298-
/// Deletes a single object from the [bucket].
299-
///
300-
/// Returns `true` if the object was deleted by this operation, `false` if it
301-
/// didn't exist at the time of the operation.
302-
Future<bool> deleteFromBucket(Bucket bucket, String objectName) async {
303-
Future<bool> delete() async {
304-
try {
305-
await bucket.delete(objectName);
306-
return true;
307-
} on DetailedApiRequestError catch (e) {
308-
if (e.status != 404) {
309-
rethrow;
310-
}
311-
return false;
312-
}
313-
}
314-
315-
return await retry(
316-
delete,
317-
delayFactor: Duration(seconds: 10),
318-
maxAttempts: 3,
319-
retryIf: (e) =>
320-
e is DetailedApiRequestError && _retryStatusCodes.contains(e.status),
321-
);
322-
}
323-
324299
Future<void> updateContentDispositionToAttachment(
325300
ObjectInfo info, Bucket bucket) async {
326301
if (info.metadata.contentDisposition != 'attachment') {
@@ -351,23 +326,19 @@ Future<int> deleteBucketFolderRecursively(
351326
var count = 0;
352327
Page<BucketEntry>? page;
353328
while (page == null || !page.isLast) {
354-
page = await retry(
329+
page = await _retry(
355330
() async {
356331
return page == null
357332
? await bucket.pageWithRetry(
358333
prefix: folder, delimiter: '', pageSize: 100)
359334
: await page.nextWithRetry(pageSize: 100);
360335
},
361-
delayFactor: Duration(seconds: 10),
362-
maxAttempts: 3,
363-
retryIf: (e) =>
364-
e is DetailedApiRequestError && _retryStatusCodes.contains(e.status),
365336
);
366337
final futures = <Future>[];
367338
final pool = Pool(concurrency ?? 1);
368339
for (final entry in page!.items) {
369340
final f = pool.withResource(() async {
370-
final deleted = await deleteFromBucket(bucket, entry.name);
341+
final deleted = await bucket.tryDeleteWithRetry(entry.name);
371342
if (deleted) count++;
372343
});
373344
futures.add(f);
@@ -382,7 +353,7 @@ Future<int> deleteBucketFolderRecursively(
382353
Future uploadWithRetry(Bucket bucket, String objectName, int length,
383354
Stream<List<int>> Function() openStream,
384355
{ObjectMetadata? metadata}) async {
385-
await retryAsync(
356+
await _retry(
386357
() async {
387358
final sink = bucket.write(objectName,
388359
length: length,
@@ -391,9 +362,9 @@ Future uploadWithRetry(Bucket bucket, String objectName, int length,
391362
await sink.addStream(openStream());
392363
await sink.close();
393364
},
394-
description: 'Upload to $objectName',
395-
shouldRetryOnError: _retryIf,
396-
sleep: Duration(seconds: 10),
365+
onRetry: (e) {
366+
_logger.info('Upload to $objectName failed.', e, StackTrace.current);
367+
},
397368
);
398369
}
399370

@@ -506,7 +477,7 @@ class VersionedJsonStorage {
506477
final age = clock.now().difference(info.updated);
507478
if (minAgeThreshold == null || age > minAgeThreshold) {
508479
deleted++;
509-
await deleteFromBucket(_bucket, entry.name);
480+
await _bucket.tryDeleteWithRetry(entry.name);
510481
}
511482
}
512483
});

app/lib/shared/utils.dart

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import 'dart:typed_data';
1111

1212
import 'package:appengine/appengine.dart';
1313
import 'package:intl/intl.dart';
14-
import 'package:logging/logging.dart';
1514
// ignore: implementation_imports
1615
import 'package:mime/src/default_extension_map.dart' as mime;
1716
import 'package:path/path.dart' as p;
@@ -28,7 +27,6 @@ final Duration twoYears = const Duration(days: 2 * 365);
2827
/// Appengine.
2928
const _cloudTraceContextHeader = 'X-Cloud-Trace-Context';
3029

31-
final Logger _logger = Logger('pub.utils');
3230
final _random = Random.secure();
3331

3432
final DateFormat shortDateFormat = DateFormat.yMMMd();
@@ -171,30 +169,6 @@ List<T> boundedList<T>(List<T> list, {int? offset, int? limit}) {
171169
return iterable.toList();
172170
}
173171

174-
/// Executes [body] and returns with the same result.
175-
/// When it throws an exception, it will be re-run until [maxAttempt] is reached.
176-
Future<R> retryAsync<R>(
177-
Future<R> Function() body, {
178-
int maxAttempt = 3,
179-
bool Function(Exception)? shouldRetryOnError,
180-
String description = 'Async operation',
181-
Duration sleep = const Duration(seconds: 1),
182-
}) async {
183-
for (int i = 1;; i++) {
184-
try {
185-
return await body();
186-
} on Exception catch (e, st) {
187-
_logger.info('$description failed (attempt: $i of $maxAttempt).', e, st);
188-
if (i < maxAttempt &&
189-
(shouldRetryOnError == null || shouldRetryOnError(e))) {
190-
await Future.delayed(sleep);
191-
continue;
192-
}
193-
rethrow;
194-
}
195-
}
196-
}
197-
198172
/// Returns a UUID in v4 format as a `String`.
199173
///
200174
/// If [bytes] is provided, it must be length 16 and have values between `0` and

0 commit comments

Comments
 (0)