18
18
import static com .google .firebase .firestore .util .Assert .fail ;
19
19
import static com .google .firebase .firestore .util .Assert .hardAssert ;
20
20
21
+ import android .database .Cursor ;
21
22
import android .database .sqlite .SQLiteStatement ;
22
23
import com .google .firebase .Timestamp ;
23
24
import com .google .firebase .firestore .auth .User ;
27
28
import com .google .firebase .firestore .model .mutation .Mutation ;
28
29
import com .google .firebase .firestore .model .mutation .MutationBatch ;
29
30
import com .google .firebase .firestore .remote .WriteStream ;
31
+ import com .google .firebase .firestore .util .Consumer ;
30
32
import com .google .firebase .firestore .util .Util ;
31
33
import com .google .protobuf .ByteString ;
32
34
import com .google .protobuf .InvalidProtocolBufferException ;
42
44
/** A mutation queue for a specific user, backed by SQLite. */
43
45
final class SQLiteMutationQueue implements MutationQueue {
44
46
47
+ /**
48
+ * On Android, SQLite Cursors are limited reading no more than 2 MB per row (despite being able to
49
+ * write very large values). All reads of the mutations column in the mutations table need to read
50
+ * in chunks with SUBSTR to avoid going over this limit.
51
+ *
52
+ * <p>The value here has to be 2 MB or smaller, while allowing for all possible other values that
53
+ * might be selected out along with the mutations column in any given result set. Nearly 1 MB is
54
+ * conservative, but allows all combinations of document paths and batch ids without needing to
55
+ * figure out if the row has gotten too large.
56
+ */
57
+ private static final int BLOB_MAX_INLINE_LENGTH = 1000000 ;
58
+
45
59
private final SQLitePersistence db ;
46
60
private final LocalSerializer serializer ;
47
61
@@ -90,10 +104,7 @@ public void start() {
90
104
int rows =
91
105
db .query ("SELECT last_stream_token FROM mutation_queues WHERE uid = ?" )
92
106
.binding (uid )
93
- .first (
94
- row -> {
95
- lastStreamToken = ByteString .copyFrom (row .getBlob (0 ));
96
- });
107
+ .first (row -> lastStreamToken = ByteString .copyFrom (row .getBlob (0 )));
97
108
98
109
if (rows == 0 ) {
99
110
// Ensure we write a default entry in mutation_queues since loadNextBatchIdAcrossAllUsers()
@@ -204,9 +215,9 @@ public MutationBatch addMutationBatch(Timestamp localWriteTime, List<Mutation> m
204
215
@ Nullable
205
216
@ Override
206
217
public MutationBatch lookupMutationBatch (int batchId ) {
207
- return db .query ("SELECT mutations FROM mutations WHERE uid = ? AND batch_id = ?" )
208
- .binding (uid , batchId )
209
- .firstValue (row -> decodeMutationBatch ( row .getBlob (0 )));
218
+ return db .query ("SELECT SUBSTR( mutations, 1, ?) FROM mutations WHERE uid = ? AND batch_id = ?" )
219
+ .binding (BLOB_MAX_INLINE_LENGTH , uid , batchId )
220
+ .firstValue (row -> decodeInlineMutationBatch ( batchId , row .getBlob (0 )));
210
221
}
211
222
212
223
@ Nullable
@@ -215,19 +226,22 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) {
215
226
int nextBatchId = batchId + 1 ;
216
227
217
228
return db .query (
218
- "SELECT mutations FROM mutations "
229
+ "SELECT batch_id, SUBSTR( mutations, 1, ?) FROM mutations "
219
230
+ "WHERE uid = ? AND batch_id >= ? "
220
231
+ "ORDER BY batch_id ASC LIMIT 1" )
221
- .binding (uid , nextBatchId )
222
- .firstValue (row -> decodeMutationBatch (row .getBlob ( 0 )));
232
+ .binding (BLOB_MAX_INLINE_LENGTH , uid , nextBatchId )
233
+ .firstValue (row -> decodeInlineMutationBatch (row .getInt ( 0 ), row . getBlob ( 1 )));
223
234
}
224
235
225
236
@ Override
226
237
public List <MutationBatch > getAllMutationBatches () {
227
238
List <MutationBatch > result = new ArrayList <>();
228
- db .query ("SELECT mutations FROM mutations WHERE uid = ? ORDER BY batch_id ASC" )
229
- .binding (uid )
230
- .forEach (row -> result .add (decodeMutationBatch (row .getBlob (0 ))));
239
+ db .query (
240
+ "SELECT batch_id, SUBSTR(mutations, 1, ?) "
241
+ + "FROM mutations "
242
+ + "WHERE uid = ? ORDER BY batch_id ASC" )
243
+ .binding (BLOB_MAX_INLINE_LENGTH , uid )
244
+ .forEach (row -> result .add (decodeInlineMutationBatch (row .getInt (0 ), row .getBlob (1 ))));
231
245
return result ;
232
246
}
233
247
@@ -237,14 +251,15 @@ public List<MutationBatch> getAllMutationBatchesAffectingDocumentKey(DocumentKey
237
251
238
252
List <MutationBatch > result = new ArrayList <>();
239
253
db .query (
240
- "SELECT m.mutations FROM document_mutations dm, mutations m "
254
+ "SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) "
255
+ + "FROM document_mutations dm, mutations m "
241
256
+ "WHERE dm.uid = ? "
242
257
+ "AND dm.path = ? "
243
258
+ "AND dm.uid = m.uid "
244
259
+ "AND dm.batch_id = m.batch_id "
245
260
+ "ORDER BY dm.batch_id" )
246
- .binding (uid , path )
247
- .forEach (row -> result .add (decodeMutationBatch (row .getBlob ( 0 ))));
261
+ .binding (BLOB_MAX_INLINE_LENGTH , uid , path )
262
+ .forEach (row -> result .add (decodeInlineMutationBatch (row .getInt ( 0 ), row . getBlob ( 1 ))));
248
263
return result ;
249
264
}
250
265
@@ -259,10 +274,11 @@ public List<MutationBatch> getAllMutationBatchesAffectingDocumentKeys(
259
274
SQLitePersistence .LongQuery longQuery =
260
275
new SQLitePersistence .LongQuery (
261
276
db ,
262
- "SELECT DISTINCT dm.batch_id, m.mutations FROM document_mutations dm, mutations m "
277
+ "SELECT DISTINCT dm.batch_id, SUBSTR(m.mutations, 1, ?) "
278
+ + "FROM document_mutations dm, mutations m "
263
279
+ "WHERE dm.uid = ? "
264
280
+ "AND dm.path IN (" ,
265
- Arrays .asList (uid ),
281
+ Arrays .asList (BLOB_MAX_INLINE_LENGTH , uid ),
266
282
args ,
267
283
") "
268
284
+ "AND dm.uid = m.uid "
@@ -279,7 +295,7 @@ public List<MutationBatch> getAllMutationBatchesAffectingDocumentKeys(
279
295
int batchId = row .getInt (0 );
280
296
if (!uniqueBatchIds .contains (batchId )) {
281
297
uniqueBatchIds .add (batchId );
282
- result .add (decodeMutationBatch ( row .getBlob (1 )));
298
+ result .add (decodeInlineMutationBatch ( batchId , row .getBlob (1 )));
283
299
}
284
300
});
285
301
}
@@ -321,14 +337,15 @@ public List<MutationBatch> getAllMutationBatchesAffectingQuery(Query query) {
321
337
322
338
List <MutationBatch > result = new ArrayList <>();
323
339
db .query (
324
- "SELECT dm.batch_id, dm.path, m.mutations FROM document_mutations dm, mutations m "
340
+ "SELECT dm.batch_id, dm.path, SUBSTR(m.mutations, 1, ?) "
341
+ + "FROM document_mutations dm, mutations m "
325
342
+ "WHERE dm.uid = ? "
326
343
+ "AND dm.path >= ? "
327
344
+ "AND dm.path < ? "
328
345
+ "AND dm.uid = m.uid "
329
346
+ "AND dm.batch_id = m.batch_id "
330
347
+ "ORDER BY dm.batch_id" )
331
- .binding (uid , prefixPath , prefixSuccessorPath )
348
+ .binding (BLOB_MAX_INLINE_LENGTH , uid , prefixPath , prefixSuccessorPath )
332
349
.forEach (
333
350
row -> {
334
351
// Ensure unique batches only. This works because the batches come out in order so we
@@ -350,7 +367,7 @@ public List<MutationBatch> getAllMutationBatchesAffectingQuery(Query query) {
350
367
return ;
351
368
}
352
369
353
- result .add (decodeMutationBatch ( row .getBlob (2 )));
370
+ result .add (decodeInlineMutationBatch ( batchId , row .getBlob (2 )));
354
371
});
355
372
356
373
return result ;
@@ -399,12 +416,79 @@ public void performConsistencyCheck() {
399
416
danglingMutationReferences );
400
417
}
401
418
402
- private MutationBatch decodeMutationBatch (byte [] bytes ) {
419
+ /**
420
+ * Decodes mutation batch bytes obtained via substring. If the blob is smaller than
421
+ * BLOB_MAX_INLINE_LENGTH, executes additional queries to load the rest of the blob.
422
+ *
423
+ * @param batchId The batch ID of the row containing the bytes, for fallback lookup if the value
424
+ * is too large.
425
+ * @param bytes The bytes of the first chunk of the mutation batch. Should be obtained via
426
+ * SUBSTR(mutations, 1, BLOB_MAX_INLINE_LENGTH).
427
+ */
428
+ private MutationBatch decodeInlineMutationBatch (int batchId , byte [] bytes ) {
403
429
try {
430
+ if (bytes .length < BLOB_MAX_INLINE_LENGTH ) {
431
+ return serializer .decodeMutationBatch (
432
+ com .google .firebase .firestore .proto .WriteBatch .parseFrom (bytes ));
433
+ }
434
+
435
+ BlobAccumulator accumulator = new BlobAccumulator (bytes );
436
+ while (accumulator .more ) {
437
+ // As we read in chunks the start of the next chunk should be the total accumulated length
438
+ // plus 1 (since SUBSTR() counts from 1). The second argument is not adjusted because it's
439
+ // the length of the chunk, not the end index.
440
+ int start = accumulator .numChunks () * BLOB_MAX_INLINE_LENGTH + 1 ;
441
+
442
+ db .query ("SELECT SUBSTR(mutations, ?, ?) FROM mutations WHERE uid = ? AND batch_id = ?" )
443
+ .binding (start , BLOB_MAX_INLINE_LENGTH , uid , batchId )
444
+ .first (accumulator );
445
+ }
446
+
447
+ ByteString blob = accumulator .result ();
404
448
return serializer .decodeMutationBatch (
405
- com .google .firebase .firestore .proto .WriteBatch .parseFrom (bytes ));
449
+ com .google .firebase .firestore .proto .WriteBatch .parseFrom (blob ));
406
450
} catch (InvalidProtocolBufferException e ) {
407
451
throw fail ("MutationBatch failed to parse: %s" , e );
408
452
}
409
453
}
454
+
455
+ /**
456
+ * Explicit consumer of blob chunks, accumulating the chunks and wrapping them in a single
457
+ * ByteString. Accepts a Cursor whose results include the blob in column 0.
458
+ *
459
+ * <p>(This is a named class here to allow decodeInlineMutationBlock to access the result of the
460
+ * accumulation.)
461
+ */
462
+ private static class BlobAccumulator implements Consumer <Cursor > {
463
+ private final ArrayList <ByteString > chunks = new ArrayList <>();
464
+ private boolean more = true ;
465
+
466
+ BlobAccumulator (byte [] firstChunk ) {
467
+ addChunk (firstChunk );
468
+ }
469
+
470
+ int numChunks () {
471
+ return chunks .size ();
472
+ }
473
+
474
+ ByteString result () {
475
+ // Not actually a copy; this creates a balanced rope-like structure that reuses the given
476
+ // ByteStrings as a part of its representation.
477
+ return ByteString .copyFrom (chunks );
478
+ }
479
+
480
+ @ Override
481
+ public void accept (Cursor row ) {
482
+ byte [] bytes = row .getBlob (0 );
483
+ addChunk (bytes );
484
+ if (bytes .length < BLOB_MAX_INLINE_LENGTH ) {
485
+ more = false ;
486
+ }
487
+ }
488
+
489
+ private void addChunk (byte [] bytes ) {
490
+ ByteString wrapped = ByteString .copyFrom (bytes );
491
+ chunks .add (wrapped );
492
+ }
493
+ }
410
494
}
0 commit comments