Skip to content

Commit e2501f8

Browse files
authored
C++ migration: implement WriteStream (#1807)
This is a fairly close port of `FSTWriteStream`. Most of the comments are copied from the Web client.
1 parent 76ba6e9 commit e2501f8

File tree

4 files changed

+424
-14
lines changed

4 files changed

+424
-14
lines changed

Firestore/core/src/firebase/firestore/remote/stream_objc_bridge.h

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ namespace bridge {
4646
bool IsLoggingEnabled();
4747

4848
/**
49-
* This file contains operations in `WatchStream` that are still delegated to
49+
* This file contains operations in remote/ folder that are still delegated to
5050
* Objective-C: proto parsing and delegates.
5151
*
5252
* The principle is that the C++ implementation can only take Objective-C
5353
* objects as parameters or return them, but never instantiate them or call any
5454
* methods on them -- if that is necessary, it's delegated to one of the bridge
55-
* classes. This allows easily identifying which parts of `WatchStream` still
56-
* rely on not-yet-ported code.
55+
* classes. This allows easily identifying which parts of remote/ still rely on
56+
* not-yet-ported code.
5757
*/
5858

5959
/**
@@ -68,7 +68,7 @@ class WatchStreamSerializer {
6868

6969
GCFSListenRequest* CreateWatchRequest(FSTQueryData* query) const;
7070
GCFSListenRequest* CreateUnwatchRequest(model::TargetId target_id) const;
71-
grpc::ByteBuffer ToByteBuffer(GCFSListenRequest* request) const;
71+
static grpc::ByteBuffer ToByteBuffer(GCFSListenRequest* request);
7272

7373
/**
7474
* If parsing fails, will return nil and write information on the error to
@@ -89,8 +89,52 @@ class WatchStreamSerializer {
8989
};
9090

9191
/**
92-
* A C++ bridge that invokes methods on an `FSTWatchStreamDelegate`.
92+
* A C++ bridge to `FSTSerializerBeta` that allows creating
93+
* `GCFSWriteRequest`s and parsing `GCFSWriteResponse`s.
9394
*/
95+
class WriteStreamSerializer {
96+
public:
97+
explicit WriteStreamSerializer(FSTSerializerBeta* serializer)
98+
: serializer_{serializer} {
99+
}
100+
101+
void UpdateLastStreamToken(GCFSWriteResponse* proto);
102+
void SetLastStreamToken(NSData* token) {
103+
last_stream_token_ = token;
104+
}
105+
NSData* GetLastStreamToken() const {
106+
return last_stream_token_;
107+
}
108+
109+
GCFSWriteRequest* CreateHandshake() const;
110+
GCFSWriteRequest* CreateWriteMutationsRequest(
111+
NSArray<FSTMutation*>* mutations) const;
112+
GCFSWriteRequest* CreateEmptyMutationsList() {
113+
return CreateWriteMutationsRequest(@[]);
114+
}
115+
static grpc::ByteBuffer ToByteBuffer(GCFSWriteRequest* request);
116+
117+
/**
118+
* If parsing fails, will return nil and write information on the error to
119+
* `out_status`. Otherwise, returns the parsed proto and sets `out_status` to
120+
* ok.
121+
*/
122+
GCFSWriteResponse* ParseResponse(const grpc::ByteBuffer& message,
123+
util::Status* out_status) const;
124+
model::SnapshotVersion ToCommitVersion(GCFSWriteResponse* proto) const;
125+
NSArray<FSTMutationResult*>* ToMutationResults(
126+
GCFSWriteResponse* proto) const;
127+
128+
/** Creates a pretty-printed description of the proto for debugging. */
129+
static NSString* Describe(GCFSWriteRequest* request);
130+
static NSString* Describe(GCFSWriteResponse* request);
131+
132+
private:
133+
FSTSerializerBeta* serializer_;
134+
NSData* last_stream_token_;
135+
};
136+
137+
/** A C++ bridge that invokes methods on an `FSTWatchStreamDelegate`. */
94138
class WatchStreamDelegate {
95139
public:
96140
explicit WatchStreamDelegate(id<FSTWatchStreamDelegate> delegate)
@@ -106,6 +150,23 @@ class WatchStreamDelegate {
106150
id<FSTWatchStreamDelegate> delegate_;
107151
};
108152

153+
/** A C++ bridge that invokes methods on an `FSTWriteStreamDelegate`. */
154+
class WriteStreamDelegate {
155+
public:
156+
explicit WriteStreamDelegate(id<FSTWriteStreamDelegate> delegate)
157+
: delegate_{delegate} {
158+
}
159+
160+
void NotifyDelegateOnOpen();
161+
void NotifyDelegateOnHandshakeComplete();
162+
void NotifyDelegateOnCommit(const model::SnapshotVersion& commit_version,
163+
NSArray<FSTMutationResult*>* results);
164+
void NotifyDelegateOnClose(const util::Status& status);
165+
166+
private:
167+
id<FSTWriteStreamDelegate> delegate_;
168+
};
169+
109170
} // namespace bridge
110171
} // namespace remote
111172
} // namespace firestore

Firestore/core/src/firebase/firestore/remote/stream_objc_bridge.mm

Lines changed: 97 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
return grpc::ByteBuffer{&slice, 1};
8181
}
8282

83+
// Note: `StatusOr` cannot be used with ARC-managed objects.
8384
template <typename Proto>
8485
Proto* ToProto(const grpc::ByteBuffer& message, Status* out_status) {
8586
NSError* error = nil;
@@ -106,6 +107,8 @@ bool IsLoggingEnabled() {
106107
return [FIRFirestore isLoggingEnabled];
107108
}
108109

110+
// WatchStreamSerializer
111+
109112
GCFSListenRequest* WatchStreamSerializer::CreateWatchRequest(
110113
FSTQueryData* query) const {
111114
GCFSListenRequest* request = [GCFSListenRequest message];
@@ -124,16 +127,13 @@ bool IsLoggingEnabled() {
124127
}
125128

126129
grpc::ByteBuffer WatchStreamSerializer::ToByteBuffer(
127-
GCFSListenRequest* request) const {
130+
GCFSListenRequest* request) {
128131
return ConvertToByteBuffer([request data]);
129132
}
130133

131-
NSString* WatchStreamSerializer::Describe(GCFSListenRequest* request) {
132-
return [request description];
133-
}
134-
135-
NSString* WatchStreamSerializer::Describe(GCFSListenResponse* response) {
136-
return [response description];
134+
GCFSListenResponse* WatchStreamSerializer::ParseResponse(
135+
const grpc::ByteBuffer& message, Status* out_status) const {
136+
return ToProto<GCFSListenResponse>(message, out_status);
137137
}
138138

139139
FSTWatchChange* WatchStreamSerializer::ToWatchChange(
@@ -146,11 +146,78 @@ bool IsLoggingEnabled() {
146146
return [serializer_ versionFromListenResponse:proto];
147147
}
148148

149-
GCFSListenResponse* WatchStreamSerializer::ParseResponse(
149+
NSString* WatchStreamSerializer::Describe(GCFSListenRequest* request) {
150+
return [request description];
151+
}
152+
153+
NSString* WatchStreamSerializer::Describe(GCFSListenResponse* response) {
154+
return [response description];
155+
}
156+
157+
// WriteStreamSerializer
158+
159+
void WriteStreamSerializer::UpdateLastStreamToken(GCFSWriteResponse* proto) {
160+
last_stream_token_ = proto.streamToken;
161+
}
162+
163+
GCFSWriteRequest* WriteStreamSerializer::CreateHandshake() const {
164+
// The initial request cannot contain mutations, but must contain a projectID.
165+
GCFSWriteRequest* request = [GCFSWriteRequest message];
166+
request.database = [serializer_ encodedDatabaseID];
167+
return request;
168+
}
169+
170+
GCFSWriteRequest* WriteStreamSerializer::CreateWriteMutationsRequest(
171+
NSArray<FSTMutation*>* mutations) const {
172+
NSMutableArray<GCFSWrite*>* protos =
173+
[NSMutableArray arrayWithCapacity:mutations.count];
174+
for (FSTMutation* mutation in mutations) {
175+
[protos addObject:[serializer_ encodedMutation:mutation]];
176+
};
177+
178+
GCFSWriteRequest* request = [GCFSWriteRequest message];
179+
request.writesArray = protos;
180+
request.streamToken = last_stream_token_;
181+
182+
return request;
183+
}
184+
185+
grpc::ByteBuffer WriteStreamSerializer::ToByteBuffer(
186+
GCFSWriteRequest* request) {
187+
return ConvertToByteBuffer([request data]);
188+
}
189+
190+
GCFSWriteResponse* WriteStreamSerializer::ParseResponse(
150191
const grpc::ByteBuffer& message, Status* out_status) const {
151-
return ToProto<GCFSListenResponse>(message, out_status);
192+
return ToProto<GCFSWriteResponse>(message, out_status);
193+
}
194+
195+
model::SnapshotVersion WriteStreamSerializer::ToCommitVersion(
196+
GCFSWriteResponse* proto) const {
197+
return [serializer_ decodedVersion:proto.commitTime];
198+
}
199+
200+
NSArray<FSTMutationResult*>* WriteStreamSerializer::ToMutationResults(
201+
GCFSWriteResponse* response) const {
202+
NSMutableArray<GCFSWriteResult*>* responses = response.writeResultsArray;
203+
NSMutableArray<FSTMutationResult*>* results =
204+
[NSMutableArray arrayWithCapacity:responses.count];
205+
for (GCFSWriteResult* proto in responses) {
206+
[results addObject:[serializer_ decodedMutationResult:proto]];
207+
};
208+
return results;
209+
}
210+
211+
NSString* WriteStreamSerializer::Describe(GCFSWriteRequest* request) {
212+
return [request description];
152213
}
153214

215+
NSString* WriteStreamSerializer::Describe(GCFSWriteResponse* response) {
216+
return [response description];
217+
}
218+
219+
// WatchStreamDelegate
220+
154221
void WatchStreamDelegate::NotifyDelegateOnOpen() {
155222
[delegate_ watchStreamDidOpen];
156223
}
@@ -164,6 +231,27 @@ bool IsLoggingEnabled() {
164231
[delegate_ watchStreamWasInterruptedWithError:MakeNSError(status)];
165232
}
166233

234+
// WriteStreamDelegate
235+
236+
void WriteStreamDelegate::NotifyDelegateOnOpen() {
237+
[delegate_ writeStreamDidOpen];
238+
}
239+
240+
void WriteStreamDelegate::NotifyDelegateOnHandshakeComplete() {
241+
[delegate_ writeStreamDidCompleteHandshake];
242+
}
243+
244+
void WriteStreamDelegate::NotifyDelegateOnCommit(
245+
const SnapshotVersion& commit_version,
246+
NSArray<FSTMutationResult*>* results) {
247+
[delegate_ writeStreamDidReceiveResponseWithVersion:commit_version
248+
mutationResults:results];
249+
}
250+
251+
void WriteStreamDelegate::NotifyDelegateOnClose(const Status& status) {
252+
[delegate_ writeStreamWasInterruptedWithError:MakeNSError(status)];
253+
}
254+
167255
} // namespace bridge
168256
} // namespace remote
169257
} // namespace firestore
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2018 Google
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_REMOTE_WRITE_STREAM_H_
18+
#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_REMOTE_WRITE_STREAM_H_
19+
20+
#if !defined(__OBJC__)
21+
#error "This header only supports Objective-C++"
22+
#endif // !defined(__OBJC__)
23+
24+
#import <Foundation/Foundation.h>
25+
#include <memory>
26+
#include <string>
27+
28+
#include "Firestore/core/src/firebase/firestore/remote/datastore.h"
29+
#include "Firestore/core/src/firebase/firestore/remote/stream.h"
30+
#include "Firestore/core/src/firebase/firestore/remote/stream_objc_bridge.h"
31+
#include "Firestore/core/src/firebase/firestore/util/async_queue.h"
32+
#include "Firestore/core/src/firebase/firestore/util/status.h"
33+
#include "absl/strings/string_view.h"
34+
#include "grpcpp/support/byte_buffer.h"
35+
36+
#import "Firestore/Source/Core/FSTTypes.h"
37+
#import "Firestore/Source/Model/FSTMutation.h"
38+
#import "Firestore/Source/Remote/FSTSerializerBeta.h"
39+
40+
namespace firebase {
41+
namespace firestore {
42+
namespace remote {
43+
44+
/**
45+
* A Stream that implements the Write RPC.
46+
*
47+
* The Write RPC requires the caller to maintain special stream token
48+
* state in-between calls, to help the server understand which responses the
49+
* client has processed by the time the next request is made. Every response
50+
* will contain a stream token; this value must be passed to the next
51+
* request.
52+
*
53+
* After calling `Start` on this stream, the next request must be a handshake,
54+
* containing whatever stream token is on hand. Once a response to this
55+
* request is received, all pending mutations may be submitted. When
56+
* submitting multiple batches of mutations at the same time, it's
57+
* okay to use the same stream token for the calls to `WriteMutations`.
58+
*/
59+
class WriteStream : public Stream {
60+
public:
61+
WriteStream(util::AsyncQueue* async_queue,
62+
auth::CredentialsProvider* credentials_provider,
63+
FSTSerializerBeta* serializer,
64+
Datastore* datastore,
65+
id<FSTWriteStreamDelegate> delegate);
66+
67+
void SetLastStreamToken(NSData* token);
68+
/**
69+
* The last received stream token from the server, used to acknowledge which
70+
* responses the client has processed. Stream tokens are opaque checkpoint
71+
* markers whose only real value is their inclusion in the next request.
72+
*
73+
* `WriteStream` manages propagating this value from responses to the
74+
* next request.
75+
*/
76+
NSData* GetLastStreamToken() const;
77+
78+
/**
79+
* Tracks whether or not a handshake has been successfully exchanged and
80+
* the stream is ready to accept mutations.
81+
*/
82+
bool handshake_complete() const {
83+
return handshake_complete_;
84+
}
85+
86+
/**
87+
* Sends an initial stream token to the server, performing the handshake
88+
* required to make the StreamingWrite RPC work.
89+
*/
90+
void WriteHandshake();
91+
92+
/** Sends a group of mutations to the Firestore backend to apply. */
93+
void WriteMutations(NSArray<FSTMutation*>* mutations);
94+
95+
private:
96+
std::unique_ptr<GrpcStream> CreateGrpcStream(
97+
Datastore* datastore, const absl::string_view token) override;
98+
void TearDown(GrpcStream* call) override;
99+
100+
void NotifyStreamOpen() override;
101+
util::Status NotifyStreamResponse(const grpc::ByteBuffer& message) override;
102+
void NotifyStreamClose(const util::Status& status) override;
103+
104+
std::string GetDebugName() const override {
105+
return "WriteStream";
106+
}
107+
108+
bridge::WriteStreamSerializer serializer_bridge_;
109+
bridge::WriteStreamDelegate delegate_bridge_;
110+
bool handshake_complete_ = false;
111+
};
112+
113+
} // namespace remote
114+
} // namespace firestore
115+
} // namespace firebase
116+
117+
#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_REMOTE_WRITE_STREAM_H_

0 commit comments

Comments
 (0)