Skip to content

Commit e7401e4

Browse files
authored
gRPC: fix bugs found during testing (#2039)
All in `GrpcStream`: * make sure there are no pending completions when "finish" operation is enqueued; * always finish gRPC calls; * when connectivity changes, reset not just the calls, but the underlying channel as well.
1 parent 5f874a6 commit e7401e4

File tree

10 files changed

+139
-93
lines changed

10 files changed

+139
-93
lines changed

Firestore/Example/Firestore.xcodeproj/xcshareddata/xcschemes/Firestore_IntegrationTests_iOS.xcscheme

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
buildConfiguration = "Debug"
2727
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
2828
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
29+
enableASanStackUseAfterReturn = "YES"
30+
enableThreadSanitizer = "YES"
2931
shouldUseLaunchSchemeArgsEnv = "YES">
3032
<Testables>
3133
<TestableReference

Firestore/Example/Firestore.xcodeproj/xcshareddata/xcschemes/Firestore_Tests_iOS.xcscheme

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
buildConfiguration = "Debug"
2727
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
2828
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
29+
enableASanStackUseAfterReturn = "YES"
30+
enableThreadSanitizer = "YES"
2931
shouldUseLaunchSchemeArgsEnv = "YES">
3032
<Testables>
3133
<TestableReference

Firestore/core/src/firebase/firestore/remote/grpc_connection.mm

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ bool HasSpecialConfig(const std::string& host) {
227227
call->FinishAndNotify(Status{FirestoreErrorCode::Unavailable,
228228
"Network connectivity changed"});
229229
}
230+
// The old channel may hang for a long time trying to reestablish
231+
// connection before eventually failing. Note that gRPC Objective-C
232+
// client does the same thing:
233+
// https://github.com/grpc/grpc/blob/fe11db09575f2dfbe1f88cd44bd417acc168e354/src/objective-c/GRPCClient/private/GRPCHost.m#L309-L314
234+
grpc_channel_.reset();
230235
});
231236
}
232237

Firestore/core/src/firebase/firestore/remote/grpc_stream.cc

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -175,27 +175,20 @@ void GrpcStream::FinishAndNotify(const Status& status) {
175175
void GrpcStream::Shutdown() {
176176
MaybeUnregister();
177177
if (completions_.empty()) {
178-
// Nothing to cancel.
178+
// Nothing to cancel -- either the call was already finished, or it has
179+
// never been started.
179180
return;
180181
}
181182

182183
// Important: since the stream always has a pending read operation,
183184
// cancellation has to be called, or else the read would hang forever, and
184185
// finish operation will never get completed.
185-
//
186186
// (on the other hand, when an operation fails, cancellation should not be
187187
// called, otherwise the real failure cause will be overwritten by status
188188
// "canceled".)
189189
context_->TryCancel();
190-
191-
// The observer is not interested in this event -- since it initiated the
192-
// finish operation, the observer must know the reason.
193-
GrpcCompletion* completion = NewCompletion(Type::Finish, {});
194-
// TODO(varconst): is issuing a finish operation necessary in this case? We
195-
// don't care about the status, but perhaps it will make the server notice
196-
// client disconnecting sooner?
197-
call_->Finish(completion->status(), completion);
198-
190+
FinishCall({});
191+
// Wait until "finish" is off the queue.
199192
FastFinishCompletionsBlocking();
200193
}
201194

@@ -206,6 +199,14 @@ void GrpcStream::MaybeUnregister() {
206199
}
207200
}
208201

202+
void GrpcStream::FinishCall(const OnSuccess& callback) {
203+
// All completions issued by this call must be taken off the queue before
204+
// finish operation can be enqueued.
205+
FastFinishCompletionsBlocking();
206+
GrpcCompletion* completion = NewCompletion(Type::Finish, callback);
207+
call_->Finish(completion->status(), completion);
208+
}
209+
209210
void GrpcStream::FastFinishCompletionsBlocking() {
210211
// TODO(varconst): reset buffered_writer_? Should not be necessary, because it
211212
// should never be called again after a call to Finish.
@@ -276,29 +277,10 @@ void GrpcStream::OnWrite() {
276277
}
277278

278279
void GrpcStream::OnOperationFailed() {
279-
if (is_finishing_) {
280-
// `Finish` itself cannot fail. If another failed operation already
281-
// triggered `Finish`, there's nothing to do.
282-
return;
283-
}
284-
285-
is_finishing_ = true;
286-
287-
if (observer_) {
288-
GrpcCompletion* completion =
289-
NewCompletion(Type::Finish, [this](const GrpcCompletion* completion) {
290-
OnFinishedByServer(*completion->status());
291-
});
292-
call_->Finish(completion->status(), completion);
293-
} else {
294-
// The only reason to finish would be to get the status; if the observer is
295-
// no longer interested, there is no need to do that.
296-
Shutdown();
297-
}
298-
}
299-
300-
void GrpcStream::OnFinishedByServer(const grpc::Status& status) {
301-
FinishAndNotify(ConvertStatus(status));
280+
FinishCall([this](const GrpcCompletion* completion) {
281+
Status status = ConvertStatus(*completion->status());
282+
FinishAndNotify(status);
283+
});
302284
}
303285

304286
void GrpcStream::RemoveCompletion(const GrpcCompletion* to_remove) {
@@ -315,7 +297,9 @@ GrpcCompletion* GrpcStream::NewCompletion(Type tag,
315297
RemoveCompletion(completion);
316298

317299
if (ok) {
318-
on_success(completion);
300+
if (on_success) {
301+
on_success(completion);
302+
}
319303
} else {
320304
// Use the same error-handling for all operations; all errors are
321305
// unrecoverable.

Firestore/core/src/firebase/firestore/remote/grpc_stream.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,16 @@ class GrpcStream : public GrpcCall {
191191
void OnRead(const grpc::ByteBuffer& message);
192192
void OnWrite();
193193
void OnOperationFailed();
194-
void OnFinishedByServer(const grpc::Status& status);
195194
void RemoveCompletion(const GrpcCompletion* to_remove);
196195

197196
using OnSuccess = std::function<void(const GrpcCompletion*)>;
198197
GrpcCompletion* NewCompletion(GrpcCompletion::Type type,
199198
const OnSuccess& callback);
199+
// Finishes the underlying gRPC call. Must always be invoked on any call that
200+
// was started. Presumes that any pending completions will quickly come off
201+
// the queue and will block until they do, so this must only be invoked when
202+
// the current call either failed (`OnOperationFailed`) or canceled.
203+
void FinishCall(const OnSuccess& callback);
200204

201205
// Blocks until all the completions issued by this stream come out from the
202206
// gRPC completion queue. Once they do, it is safe to delete this `GrpcStream`
@@ -227,8 +231,6 @@ class GrpcStream : public GrpcCall {
227231
internal::BufferedWriter buffered_writer_;
228232

229233
std::vector<GrpcCompletion*> completions_;
230-
231-
bool is_finishing_ = false;
232234
};
233235

234236
} // namespace remote

Firestore/core/test/firebase/firestore/remote/grpc_stream_test.cc

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,11 @@ class GrpcStreamTest : public testing::Test {
149149
return {states};
150150
}
151151

152+
void UnexpectedType(GrpcCompletion* completion) {
153+
ADD_FAILURE() << "Unexpected completion type "
154+
<< static_cast<int>(completion->type());
155+
}
156+
152157
AsyncQueue worker_queue;
153158

154159
std::unique_ptr<ConnectivityMonitor> connectivity_monitor;
@@ -239,8 +244,7 @@ TEST_F(GrpcStreamTest, CanAddSeveralWrites) {
239244
completion->Complete(true);
240245
break;
241246
default:
242-
ADD_FAILURE() << "Unexpected completion type "
243-
<< static_cast<int>(completion->type());
247+
UnexpectedType(completion);
244248
break;
245249
}
246250

@@ -316,28 +320,31 @@ TEST_F(GrpcStreamTest, ErrorOnWrite) {
316320
});
317321

318322
bool failed_write = false;
319-
ForceFinish([&](GrpcCompletion* completion) {
323+
auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) {
320324
switch (completion->type()) {
321325
case Type::Read:
322-
completion->Complete(true);
323-
break;
326+
// After a write is failed, fail the read too.
327+
completion->Complete(!failed_write);
328+
return false;
324329

325330
case Type::Write:
326331
failed_write = true;
327332
completion->Complete(false);
328-
break;
333+
return false;
334+
335+
case Type::Finish:
336+
EXPECT_TRUE(failed_write);
337+
*completion->status() = grpc::Status{grpc::ABORTED, ""};
338+
completion->Complete(true);
339+
return true;
329340

330341
default:
331-
ADD_FAILURE() << "Unexpected completion type "
332-
<< static_cast<int>(completion->type());
333-
break;
342+
UnexpectedType(completion);
343+
return false;
334344
}
335-
336-
return failed_write;
337345
});
338-
339-
ForceFinish(
340-
{{Type::Read, Error}, {Type::Finish, grpc::Status{grpc::ABORTED, ""}}});
346+
future.wait();
347+
worker_queue.EnqueueBlocking([] {});
341348

342349
EXPECT_EQ(observed_states().back(), "OnStreamFinish(Aborted)");
343350
}
@@ -351,25 +358,27 @@ TEST_F(GrpcStreamTest, ErrorWithPendingWrites) {
351358
});
352359

353360
bool failed_write = false;
354-
ForceFinish([&](GrpcCompletion* completion) {
361+
auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) {
355362
switch (completion->type()) {
356363
case Type::Read:
357-
completion->Complete(true);
358-
break;
364+
completion->Complete(!failed_write);
365+
return false;
359366
case Type::Write:
360367
failed_write = true;
361368
completion->Complete(false);
362-
break;
369+
return false;
370+
case Type::Finish:
371+
EXPECT_TRUE(failed_write);
372+
*completion->status() = grpc::Status{grpc::UNAVAILABLE, ""};
373+
completion->Complete(true);
374+
return true;
363375
default:
364-
ADD_FAILURE() << "Unexpected completion type "
365-
<< static_cast<int>(completion->type());
366-
break;
376+
UnexpectedType(completion);
377+
return false;
367378
}
368-
369-
return failed_write;
370379
});
371-
ForceFinish({{Type::Read, Error},
372-
{Type::Finish, grpc::Status{grpc::UNAVAILABLE, ""}}});
380+
future.wait();
381+
worker_queue.EnqueueBlocking([] {});
373382

374383
EXPECT_EQ(observed_states().back(), "OnStreamFinish(Unavailable)");
375384
}

Firestore/core/test/firebase/firestore/remote/grpc_streaming_reader_test.cc

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -214,31 +214,33 @@ TEST_F(GrpcStreamingReaderTest, ErrorOnWrite) {
214214
StartReader();
215215

216216
bool failed_write = false;
217-
// Callback is used because it's indeterminate whether one or two read
218-
// operations will have a chance to succeed.
219-
ForceFinish([&](GrpcCompletion* completion) {
217+
auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) {
220218
switch (completion->type()) {
221219
case Type::Read:
222-
completion->Complete(true);
223-
break;
220+
// After a write is failed, fail the read too.
221+
completion->Complete(!failed_write);
222+
return false;
224223

225224
case Type::Write:
226225
failed_write = true;
227226
completion->Complete(false);
228-
break;
227+
return false;
228+
229+
case Type::Finish:
230+
EXPECT_TRUE(failed_write);
231+
*completion->status() = grpc::Status{grpc::RESOURCE_EXHAUSTED, ""};
232+
completion->Complete(true);
233+
return true;
229234

230235
default:
231236
ADD_FAILURE() << "Unexpected completion type "
232237
<< static_cast<int>(completion->type());
233-
break;
238+
return false;
234239
}
235-
236-
return failed_write;
237240
});
241+
future.wait();
242+
worker_queue.EnqueueBlocking([] {});
238243

239-
ForceFinish(
240-
{{Type::Read, Error},
241-
{Type::Finish, grpc::Status{grpc::StatusCode::RESOURCE_EXHAUSTED, ""}}});
242244
ASSERT_TRUE(status.has_value());
243245
EXPECT_EQ(status.value().code(), FirestoreErrorCode::ResourceExhausted);
244246
EXPECT_TRUE(responses.empty());

Firestore/core/test/firebase/firestore/remote/stream_test.mm

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -461,31 +461,32 @@ void StartStream() {
461461
worker_queue.EnqueueBlocking([&] { firestore_stream->WriteEmptyBuffer(); });
462462

463463
bool failed_write = false;
464-
// Callback is used because it's indeterminate whether one or two read
465-
// operations will have a chance to succeed.
466-
ForceFinish([&](GrpcCompletion* completion) {
464+
auto future = tester.ForceFinishAsync([&](GrpcCompletion* completion) {
467465
switch (completion->type()) {
468466
case Type::Read:
469-
completion->Complete(true);
470-
break;
467+
// After a write is failed, fail the read too.
468+
completion->Complete(!failed_write);
469+
return false;
471470

472471
case Type::Write:
473472
failed_write = true;
474473
completion->Complete(false);
475-
break;
474+
return false;
475+
476+
case Type::Finish:
477+
EXPECT_TRUE(failed_write);
478+
*completion->status() = grpc::Status{grpc::UNAUTHENTICATED, ""};
479+
completion->Complete(true);
480+
return true;
476481

477482
default:
478483
ADD_FAILURE() << "Unexpected completion type "
479484
<< static_cast<int>(completion->type());
480-
break;
485+
return false;
481486
}
482-
483-
return failed_write;
484487
});
485-
486-
ForceFinish(
487-
{{Type::Read, Error},
488-
{Type::Finish, grpc::Status{grpc::StatusCode::UNAUTHENTICATED, ""}}});
488+
future.wait();
489+
worker_queue.EnqueueBlocking([] {});
489490

490491
worker_queue.EnqueueBlocking([&] {
491492
EXPECT_FALSE(firestore_stream->IsStarted());

0 commit comments

Comments
 (0)