From 96937264940df896bb43e44610e9b3595a2eeab3 Mon Sep 17 00:00:00 2001 From: Marek Gilbert Date: Fri, 25 Jan 2019 16:03:51 -0800 Subject: [PATCH 1/9] Handle reading large mutation batches --- .../firebase/firestore/WriteBatchTest.java | 27 ++++++ .../firestore/local/SQLiteMutationQueue.java | 82 +++++++++++++++---- 2 files changed, 91 insertions(+), 18 deletions(-) diff --git a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java index 6dc62a3b8ba..49b1a454e7a 100644 --- a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java +++ b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java @@ -32,7 +32,10 @@ import com.google.firebase.firestore.FirebaseFirestoreException.Code; import com.google.firebase.firestore.testutil.EventAccumulator; import com.google.firebase.firestore.testutil.IntegrationTestUtil; +import com.google.firebase.firestore.util.Util; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; @@ -253,4 +256,28 @@ public void testCanWriteTheSameDocumentMultipleTimes() { assertNotNull(when); assertEquals(map("a", 1L, "b", 2L, "when", when), serverSnap.getData()); } + + @Test + public void testCanWriteVeryLargeBatches() { + String a = Character.toString('a'); + StringBuilder buf = new StringBuilder(1000); + for (int i = 0; i < 1000; i++) { + buf.append(a); + } + String kb = buf.toString(); + Map values = new HashMap<>(); + for (int j = 0; j < 1000; j++) { + values.put(Util.autoId(), kb); + } + + DocumentReference doc = testDocument(); + WriteBatch batch = doc.getFirestore().batch(); + + batch.set(doc, values); + for (int i = 0; i < 2; i++) { + batch.update(doc, values); + } + + waitFor(batch.commit()); + } } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java index adfd3c1de1d..866245859e4 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java @@ -19,6 +19,7 @@ import static com.google.firebase.firestore.util.Assert.hardAssert; import android.database.sqlite.SQLiteStatement; +import android.os.ParcelFileDescriptor; import com.google.firebase.Timestamp; import com.google.firebase.firestore.auth.User; import com.google.firebase.firestore.core.Query; @@ -31,6 +32,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -42,6 +44,8 @@ /** A mutation queue for a specific user, backed by SQLite. */ final class SQLiteMutationQueue implements MutationQueue { + private static final int BLOB_MAX_INLINE_LENGTH = 1000000; + private final SQLitePersistence db; private final LocalSerializer serializer; @@ -204,9 +208,12 @@ public MutationBatch addMutationBatch(Timestamp localWriteTime, List m @Nullable @Override public MutationBatch lookupMutationBatch(int batchId) { - return db.query("SELECT mutations FROM mutations WHERE uid = ? AND batch_id = ?") - .binding(uid, batchId) - .firstValue(row -> decodeMutationBatch(row.getBlob(0))); + return db.query( + "SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) " + + "FROM mutations m " + + "WHERE uid = ? AND batch_id = ?") + .binding(BLOB_MAX_INLINE_LENGTH, uid, batchId) + .firstValue(row -> decodeMutationBatchRow(row.getInt(0), row.getBlob(1))); } @Nullable @@ -215,19 +222,23 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) { int nextBatchId = batchId + 1; return db.query( - "SELECT mutations FROM mutations " + "SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) " + + "FROM mutations m " + "WHERE uid = ? AND batch_id >= ? " + "ORDER BY batch_id ASC LIMIT 1") - .binding(uid, nextBatchId) - .firstValue(row -> decodeMutationBatch(row.getBlob(0))); + .binding(BLOB_MAX_INLINE_LENGTH, uid, nextBatchId) + .firstValue(row -> decodeMutationBatchRow(row.getInt(0), row.getBlob(1))); } @Override public List getAllMutationBatches() { List result = new ArrayList<>(); - db.query("SELECT mutations FROM mutations WHERE uid = ? ORDER BY batch_id ASC") - .binding(uid) - .forEach(row -> result.add(decodeMutationBatch(row.getBlob(0)))); + db.query( + "SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) " + + "FROM mutations m " + + "WHERE uid = ? ORDER BY batch_id ASC") + .binding(BLOB_MAX_INLINE_LENGTH, uid) + .forEach(row -> result.add(decodeMutationBatchRow(row.getInt(0), row.getBlob(1)))); return result; } @@ -237,14 +248,15 @@ public List getAllMutationBatchesAffectingDocumentKey(DocumentKey List result = new ArrayList<>(); db.query( - "SELECT m.mutations FROM document_mutations dm, mutations m " + "SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) " + + "FROM document_mutations dm, mutations m " + "WHERE dm.uid = ? " + "AND dm.path = ? " + "AND dm.uid = m.uid " + "AND dm.batch_id = m.batch_id " + "ORDER BY dm.batch_id") - .binding(uid, path) - .forEach(row -> result.add(decodeMutationBatch(row.getBlob(0)))); + .binding(BLOB_MAX_INLINE_LENGTH, uid, path) + .forEach(row -> result.add(decodeMutationBatchRow(row.getInt(0), row.getBlob(1)))); return result; } @@ -259,10 +271,11 @@ public List getAllMutationBatchesAffectingDocumentKeys( SQLitePersistence.LongQuery longQuery = new SQLitePersistence.LongQuery( db, - "SELECT DISTINCT dm.batch_id, m.mutations FROM document_mutations dm, mutations m " + "SELECT DISTINCT dm.batch_id, SUBSTR(m.mutations, 1, ?) " + + "FROM document_mutations dm, mutations m " + "WHERE dm.uid = ? " + "AND dm.path IN (", - Arrays.asList(uid), + Arrays.asList(BLOB_MAX_INLINE_LENGTH, uid), args, ") " + "AND dm.uid = m.uid " @@ -279,7 +292,7 @@ public List getAllMutationBatchesAffectingDocumentKeys( int batchId = row.getInt(0); if (!uniqueBatchIds.contains(batchId)) { uniqueBatchIds.add(batchId); - result.add(decodeMutationBatch(row.getBlob(1))); + result.add(decodeMutationBatchRow(batchId, row.getBlob(1))); } }); } @@ -321,14 +334,15 @@ public List getAllMutationBatchesAffectingQuery(Query query) { List result = new ArrayList<>(); db.query( - "SELECT dm.batch_id, dm.path, m.mutations FROM document_mutations dm, mutations m " + "SELECT dm.batch_id, dm.path, SUBSTR(m.mutations, 1, ?) " + + "FROM document_mutations dm, mutations m " + "WHERE dm.uid = ? " + "AND dm.path >= ? " + "AND dm.path < ? " + "AND dm.uid = m.uid " + "AND dm.batch_id = m.batch_id " + "ORDER BY dm.batch_id") - .binding(uid, prefixPath, prefixSuccessorPath) + .binding(BLOB_MAX_INLINE_LENGTH, uid, prefixPath, prefixSuccessorPath) .forEach( row -> { // Ensure unique batches only. This works because the batches come out in order so we @@ -350,7 +364,7 @@ public List getAllMutationBatchesAffectingQuery(Query query) { return; } - result.add(decodeMutationBatch(row.getBlob(2))); + result.add(decodeMutationBatchRow(batchId, row.getBlob(2))); }); return result; @@ -399,6 +413,38 @@ public void performConsistencyCheck() { danglingMutationReferences); } + /** + * Decodes a mutation batch row containing a batch id and a substring of a blob. If the blob is + * too large, executes another query to load the blob directly. + * + * @param batchId The batch ID of the row containing the blob + * @param bytes The bytes represented + * @return + */ + private MutationBatch decodeMutationBatchRow(int batchId, byte[] bytes) { + if (bytes.length < BLOB_MAX_INLINE_LENGTH) { + return decodeMutationBatch(bytes); + } + + SQLiteStatement loader = + db.prepare("SELECT mutations FROM mutations WHERE uid = ? AND batch_id = ?"); + loader.bindString(1, uid); + loader.bindLong(2, batchId); + + ParcelFileDescriptor blobFile = loader.simpleQueryForBlobFileDescriptor(); + hardAssert(blobFile != null, "Blob exists so descriptor should not be null"); + + try (ParcelFileDescriptor.AutoCloseInputStream stream = + new ParcelFileDescriptor.AutoCloseInputStream(blobFile)) { + return serializer.decodeMutationBatch( + com.google.firebase.firestore.proto.WriteBatch.parseFrom(stream)); + } catch (InvalidProtocolBufferException e) { + throw fail("MutationBatch failed to parse: %s", e); + } catch (IOException e) { + throw fail("Failed to read blob for uid=%s, batch_id=%d: %s", uid, batchId, e); + } + } + private MutationBatch decodeMutationBatch(byte[] bytes) { try { return serializer.decodeMutationBatch( From dd75bcd007b383db4ccd5ba6872422dbf7b72ccc Mon Sep 17 00:00:00 2001 From: Marek Gilbert Date: Fri, 25 Jan 2019 16:21:11 -0800 Subject: [PATCH 2/9] Rework lookupMutationBatch and getNextMutationBatchAfterBatchId to load directly --- .../firestore/local/SQLiteMutationQueue.java | 80 ++++++++++--------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java index 866245859e4..c1735421911 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java @@ -18,6 +18,7 @@ import static com.google.firebase.firestore.util.Assert.fail; import static com.google.firebase.firestore.util.Assert.hardAssert; +import android.database.sqlite.SQLiteDoneException; import android.database.sqlite.SQLiteStatement; import android.os.ParcelFileDescriptor; import com.google.firebase.Timestamp; @@ -208,12 +209,12 @@ public MutationBatch addMutationBatch(Timestamp localWriteTime, List m @Nullable @Override public MutationBatch lookupMutationBatch(int batchId) { - return db.query( - "SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) " - + "FROM mutations m " - + "WHERE uid = ? AND batch_id = ?") - .binding(BLOB_MAX_INLINE_LENGTH, uid, batchId) - .firstValue(row -> decodeMutationBatchRow(row.getInt(0), row.getBlob(1))); + SQLiteStatement query = + db.prepare("SELECT mutations FROM mutations WHERE uid = ? AND batch_id = ?"); + query.bindString(1, uid); + query.bindLong(2, batchId); + + return executeSimpleMutationBatchLookup(query); } @Nullable @@ -221,13 +222,34 @@ public MutationBatch lookupMutationBatch(int batchId) { public MutationBatch getNextMutationBatchAfterBatchId(int batchId) { int nextBatchId = batchId + 1; - return db.query( - "SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) " - + "FROM mutations m " - + "WHERE uid = ? AND batch_id >= ? " - + "ORDER BY batch_id ASC LIMIT 1") - .binding(BLOB_MAX_INLINE_LENGTH, uid, nextBatchId) - .firstValue(row -> decodeMutationBatchRow(row.getInt(0), row.getBlob(1))); + SQLiteStatement query = + db.prepare("SELECT mutations FROM mutations " + + "WHERE uid = ? AND batch_id >= ? " + + "ORDER BY batch_id ASC LIMIT 1"); + query.bindString(1, uid); + query.bindLong(2, nextBatchId); + + return executeSimpleMutationBatchLookup(query); + } + + @Nullable + private MutationBatch executeSimpleMutationBatchLookup(SQLiteStatement query) { + ParcelFileDescriptor blobFile; + try { + blobFile = query.simpleQueryForBlobFileDescriptor(); + } catch (SQLiteDoneException e) { + return null; + } + + try (ParcelFileDescriptor.AutoCloseInputStream stream = + new ParcelFileDescriptor.AutoCloseInputStream(blobFile)) { + return serializer.decodeMutationBatch( + com.google.firebase.firestore.proto.WriteBatch.parseFrom(stream)); + } catch (InvalidProtocolBufferException e) { + throw fail("MutationBatch failed to parse: %s", e); + } catch (IOException e) { + throw fail("MutationBatch failed to load: %s", e); + } } @Override @@ -238,7 +260,7 @@ public List getAllMutationBatches() { + "FROM mutations m " + "WHERE uid = ? ORDER BY batch_id ASC") .binding(BLOB_MAX_INLINE_LENGTH, uid) - .forEach(row -> result.add(decodeMutationBatchRow(row.getInt(0), row.getBlob(1)))); + .forEach(row -> result.add(decodeInlineMutationBatch(row.getInt(0), row.getBlob(1)))); return result; } @@ -256,7 +278,7 @@ public List getAllMutationBatchesAffectingDocumentKey(DocumentKey + "AND dm.batch_id = m.batch_id " + "ORDER BY dm.batch_id") .binding(BLOB_MAX_INLINE_LENGTH, uid, path) - .forEach(row -> result.add(decodeMutationBatchRow(row.getInt(0), row.getBlob(1)))); + .forEach(row -> result.add(decodeInlineMutationBatch(row.getInt(0), row.getBlob(1)))); return result; } @@ -292,7 +314,7 @@ public List getAllMutationBatchesAffectingDocumentKeys( int batchId = row.getInt(0); if (!uniqueBatchIds.contains(batchId)) { uniqueBatchIds.add(batchId); - result.add(decodeMutationBatchRow(batchId, row.getBlob(1))); + result.add(decodeInlineMutationBatch(batchId, row.getBlob(1))); } }); } @@ -364,7 +386,7 @@ public List getAllMutationBatchesAffectingQuery(Query query) { return; } - result.add(decodeMutationBatchRow(batchId, row.getBlob(2))); + result.add(decodeInlineMutationBatch(batchId, row.getBlob(2))); }); return result; @@ -417,32 +439,16 @@ public void performConsistencyCheck() { * Decodes a mutation batch row containing a batch id and a substring of a blob. If the blob is * too large, executes another query to load the blob directly. * - * @param batchId The batch ID of the row containing the blob + * @param batchId The batch ID of the row containing the bytes, for fallback lookup if the value + * is too large. * @param bytes The bytes represented - * @return */ - private MutationBatch decodeMutationBatchRow(int batchId, byte[] bytes) { + private MutationBatch decodeInlineMutationBatch(int batchId, byte[] bytes) { if (bytes.length < BLOB_MAX_INLINE_LENGTH) { return decodeMutationBatch(bytes); } - SQLiteStatement loader = - db.prepare("SELECT mutations FROM mutations WHERE uid = ? AND batch_id = ?"); - loader.bindString(1, uid); - loader.bindLong(2, batchId); - - ParcelFileDescriptor blobFile = loader.simpleQueryForBlobFileDescriptor(); - hardAssert(blobFile != null, "Blob exists so descriptor should not be null"); - - try (ParcelFileDescriptor.AutoCloseInputStream stream = - new ParcelFileDescriptor.AutoCloseInputStream(blobFile)) { - return serializer.decodeMutationBatch( - com.google.firebase.firestore.proto.WriteBatch.parseFrom(stream)); - } catch (InvalidProtocolBufferException e) { - throw fail("MutationBatch failed to parse: %s", e); - } catch (IOException e) { - throw fail("Failed to read blob for uid=%s, batch_id=%d: %s", uid, batchId, e); - } + return lookupMutationBatch(batchId); } private MutationBatch decodeMutationBatch(byte[] bytes) { From dd8611007563fbf10c9383d4f4937528dd6d1742 Mon Sep 17 00:00:00 2001 From: Marek Gilbert Date: Fri, 25 Jan 2019 16:21:28 -0800 Subject: [PATCH 3/9] format --- .../firebase/firestore/local/SQLiteMutationQueue.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java index c1735421911..241a7369c54 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java @@ -223,9 +223,10 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) { int nextBatchId = batchId + 1; SQLiteStatement query = - db.prepare("SELECT mutations FROM mutations " - + "WHERE uid = ? AND batch_id >= ? " - + "ORDER BY batch_id ASC LIMIT 1"); + db.prepare( + "SELECT mutations FROM mutations " + + "WHERE uid = ? AND batch_id >= ? " + + "ORDER BY batch_id ASC LIMIT 1"); query.bindString(1, uid); query.bindLong(2, nextBatchId); @@ -242,7 +243,7 @@ private MutationBatch executeSimpleMutationBatchLookup(SQLiteStatement query) { } try (ParcelFileDescriptor.AutoCloseInputStream stream = - new ParcelFileDescriptor.AutoCloseInputStream(blobFile)) { + new ParcelFileDescriptor.AutoCloseInputStream(blobFile)) { return serializer.decodeMutationBatch( com.google.firebase.firestore.proto.WriteBatch.parseFrom(stream)); } catch (InvalidProtocolBufferException e) { @@ -440,7 +441,7 @@ public void performConsistencyCheck() { * too large, executes another query to load the blob directly. * * @param batchId The batch ID of the row containing the bytes, for fallback lookup if the value - * is too large. + * is too large. * @param bytes The bytes represented */ private MutationBatch decodeInlineMutationBatch(int batchId, byte[] bytes) { From 9bd99148ec8e2ec4d2460035c2e15da3f03db38a Mon Sep 17 00:00:00 2001 From: Marek Gilbert Date: Sun, 27 Jan 2019 14:27:23 -0800 Subject: [PATCH 4/9] Rework large blob handling to load in chunks Unfortunately, SQLiteStatement.executeForBlobFileDescriptor is unreliable, sometimes returning null even when the row exists and the blob has a value (!!). --- .../firestore/local/SQLiteMutationQueue.java | 124 ++++++++++-------- 1 file changed, 71 insertions(+), 53 deletions(-) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java index 241a7369c54..0914e3e0d28 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java @@ -18,9 +18,8 @@ import static com.google.firebase.firestore.util.Assert.fail; import static com.google.firebase.firestore.util.Assert.hardAssert; -import android.database.sqlite.SQLiteDoneException; +import android.database.Cursor; import android.database.sqlite.SQLiteStatement; -import android.os.ParcelFileDescriptor; import com.google.firebase.Timestamp; import com.google.firebase.firestore.auth.User; import com.google.firebase.firestore.core.Query; @@ -29,11 +28,11 @@ import com.google.firebase.firestore.model.mutation.Mutation; import com.google.firebase.firestore.model.mutation.MutationBatch; import com.google.firebase.firestore.remote.WriteStream; +import com.google.firebase.firestore.util.Consumer; import com.google.firebase.firestore.util.Util; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -95,10 +94,7 @@ public void start() { int rows = db.query("SELECT last_stream_token FROM mutation_queues WHERE uid = ?") .binding(uid) - .first( - row -> { - lastStreamToken = ByteString.copyFrom(row.getBlob(0)); - }); + .first(row -> lastStreamToken = ByteString.copyFrom(row.getBlob(0))); if (rows == 0) { // Ensure we write a default entry in mutation_queues since loadNextBatchIdAcrossAllUsers() @@ -209,12 +205,9 @@ public MutationBatch addMutationBatch(Timestamp localWriteTime, List m @Nullable @Override public MutationBatch lookupMutationBatch(int batchId) { - SQLiteStatement query = - db.prepare("SELECT mutations FROM mutations WHERE uid = ? AND batch_id = ?"); - query.bindString(1, uid); - query.bindLong(2, batchId); - - return executeSimpleMutationBatchLookup(query); + return db.query("SELECT SUBSTR(mutations, 1, ?) FROM mutations WHERE uid = ? AND batch_id = ?") + .binding(BLOB_MAX_INLINE_LENGTH, uid, batchId) + .firstValue(row -> decodeInlineMutationBatch(batchId, row.getBlob(0))); } @Nullable @@ -222,35 +215,12 @@ public MutationBatch lookupMutationBatch(int batchId) { public MutationBatch getNextMutationBatchAfterBatchId(int batchId) { int nextBatchId = batchId + 1; - SQLiteStatement query = - db.prepare( - "SELECT mutations FROM mutations " - + "WHERE uid = ? AND batch_id >= ? " - + "ORDER BY batch_id ASC LIMIT 1"); - query.bindString(1, uid); - query.bindLong(2, nextBatchId); - - return executeSimpleMutationBatchLookup(query); - } - - @Nullable - private MutationBatch executeSimpleMutationBatchLookup(SQLiteStatement query) { - ParcelFileDescriptor blobFile; - try { - blobFile = query.simpleQueryForBlobFileDescriptor(); - } catch (SQLiteDoneException e) { - return null; - } - - try (ParcelFileDescriptor.AutoCloseInputStream stream = - new ParcelFileDescriptor.AutoCloseInputStream(blobFile)) { - return serializer.decodeMutationBatch( - com.google.firebase.firestore.proto.WriteBatch.parseFrom(stream)); - } catch (InvalidProtocolBufferException e) { - throw fail("MutationBatch failed to parse: %s", e); - } catch (IOException e) { - throw fail("MutationBatch failed to load: %s", e); - } + return db.query( + "SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) FROM mutations m " + + "WHERE m.uid = ? AND m.batch_id >= ? " + + "ORDER BY m.batch_id ASC LIMIT 1") + .binding(BLOB_MAX_INLINE_LENGTH, uid, nextBatchId) + .firstValue(row -> decodeInlineMutationBatch(row.getInt(0), row.getBlob(1))); } @Override @@ -437,27 +407,75 @@ public void performConsistencyCheck() { } /** - * Decodes a mutation batch row containing a batch id and a substring of a blob. If the blob is - * too large, executes another query to load the blob directly. + * Decodes mutation batch bytes obtained via substring. If the blob is too smaller than + * BLOB_MAX_INLINE_LENGTH, executes additional queries to load the rest of the blob. * * @param batchId The batch ID of the row containing the bytes, for fallback lookup if the value * is too large. - * @param bytes The bytes represented + * @param bytes The bytes of the first chunk of the mutation batch. Should be obtained via + * SUBSTR(mutations, 1, BLOB_MAX_INLINE_LENGTH). */ private MutationBatch decodeInlineMutationBatch(int batchId, byte[] bytes) { - if (bytes.length < BLOB_MAX_INLINE_LENGTH) { - return decodeMutationBatch(bytes); - } + try { + if (bytes.length < BLOB_MAX_INLINE_LENGTH) { + return serializer.decodeMutationBatch( + com.google.firebase.firestore.proto.WriteBatch.parseFrom(bytes)); + } - return lookupMutationBatch(batchId); - } + BlobAccumulator accumulator = new BlobAccumulator(bytes); + while (accumulator.more) { + int start = accumulator.numChunks() * BLOB_MAX_INLINE_LENGTH + 1; - private MutationBatch decodeMutationBatch(byte[] bytes) { - try { + db.query("SELECT SUBSTR(mutations, ?, ?) FROM mutations WHERE uid = ? AND batch_id = ?") + .binding(start, BLOB_MAX_INLINE_LENGTH, uid, batchId) + .first(accumulator); + } + + ByteString blob = accumulator.result(); return serializer.decodeMutationBatch( - com.google.firebase.firestore.proto.WriteBatch.parseFrom(bytes)); + com.google.firebase.firestore.proto.WriteBatch.parseFrom(blob)); } catch (InvalidProtocolBufferException e) { throw fail("MutationBatch failed to parse: %s", e); } } + + /** + * Explicit consumer of blob chunks, accumulating the chunks and wrapping them in a single + * ByteString. Accepts a Cursor whose results include the blob in column 0. + * + *

(This is a named class here to allow decodeInlineMutationBlock to access the result of the + * accumulation.) + */ + private static class BlobAccumulator implements Consumer { + private final ArrayList chunks = new ArrayList<>(); + private boolean more = true; + + BlobAccumulator(byte[] firstChunk) { + addChunk(firstChunk); + } + + int numChunks() { + return chunks.size(); + } + + ByteString result() { + // Not actually a copy; this creates a balanced rope-like structure that reuses the given + // ByteStrings as a part of its representation. + return ByteString.copyFrom(chunks); + } + + @Override + public void accept(Cursor row) { + byte[] bytes = row.getBlob(0); + addChunk(bytes); + if (bytes.length < BLOB_MAX_INLINE_LENGTH) { + more = false; + } + } + + private void addChunk(byte[] bytes) { + ByteString wrapped = ByteString.copyFrom(bytes); + chunks.add(wrapped); + } + } } From fdb1cebe32c2de649ba224739cd61a4ea8a6f324 Mon Sep 17 00:00:00 2001 From: Marek Gilbert Date: Sun, 27 Jan 2019 14:31:21 -0800 Subject: [PATCH 5/9] cleanup --- .../google/firebase/firestore/local/MutationQueueTestCase.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/MutationQueueTestCase.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/MutationQueueTestCase.java index ac3da7ec08c..ccbbc7f0e76 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/MutationQueueTestCase.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/MutationQueueTestCase.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.firebase.Timestamp; @@ -326,7 +325,6 @@ public void testAllMutationBatchesAffectingQuery_withCompoundBatches() { @Test public void testRemoveMutationBatches() { List batches = createBatches(10); - MutationBatch last = batches.get(batches.size() - 1); removeMutationBatches(batches.remove(0)); assertEquals(9, batchCount()); From 2e3c7652568253bce4215231d81d78f3682ba486 Mon Sep 17 00:00:00 2001 From: Marek Gilbert Date: Sun, 27 Jan 2019 14:39:44 -0800 Subject: [PATCH 6/9] Assert document round-trips correctly --- .../java/com/google/firebase/firestore/WriteBatchTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java index 49b1a454e7a..29003624a78 100644 --- a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java +++ b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java @@ -279,5 +279,7 @@ public void testCanWriteVeryLargeBatches() { } waitFor(batch.commit()); + DocumentSnapshot snap = waitFor(doc.get()); + assertEquals(values, snap.getData()); } } From 6aecf2e4ecef057ee29ad5fba75d37159d866dc0 Mon Sep 17 00:00:00 2001 From: Marek Gilbert Date: Mon, 28 Jan 2019 13:28:06 -0800 Subject: [PATCH 7/9] Review feedback --- .../firebase/firestore/WriteBatchTest.java | 9 ++++++++ .../firestore/local/SQLiteMutationQueue.java | 22 ++++++++++++++----- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java index 29003624a78..10544e735fc 100644 --- a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java +++ b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java @@ -259,6 +259,12 @@ public void testCanWriteTheSameDocumentMultipleTimes() { @Test public void testCanWriteVeryLargeBatches() { + // On Android, SQLite Cursors are limited reading no more than 2 MB per row (despite being able + // to write very large values). This test verifies that the SQLiteMutationQueue properly works + // around this limitation. + + // Create a map containing nearly 1 MB of data. Note that if you use 1024 below this will create + // a document larger than 1 MB, which will be rejected by the backend as too large. String a = Character.toString('a'); StringBuilder buf = new StringBuilder(1000); for (int i = 0; i < 1000; i++) { @@ -273,6 +279,9 @@ public void testCanWriteVeryLargeBatches() { DocumentReference doc = testDocument(); WriteBatch batch = doc.getFirestore().batch(); + // Write a batch containing 3 copies of the data, creating a ~3 MB batch. Writing to the same + // document in a batch is allowed and so long as the net size of the document is under 1 MB the + // batch is allowed. batch.set(doc, values); for (int i = 0; i < 2; i++) { batch.update(doc, values); diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java index 0914e3e0d28..ebd54e46a76 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java @@ -44,6 +44,16 @@ /** A mutation queue for a specific user, backed by SQLite. */ final class SQLiteMutationQueue implements MutationQueue { + /** + * On Android, SQLite Cursors are limited reading no more than 2 MB per row (despite being able to + * write very large values). All reads of the mutations column in the mutations table need to read + * in chunks with SUBSTR to avoid going over this limit. + * + *

The value here has to be 2 MB or smaller, while allowing for all possible other values that + * might be selected out along with the mutations column in any given result set. Nearly 1 MB is + * conservative, but allows all combinations of document paths and batch ids without needing to + * figure out if the row has gotten too large. + */ private static final int BLOB_MAX_INLINE_LENGTH = 1000000; private final SQLitePersistence db; @@ -216,9 +226,9 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) { int nextBatchId = batchId + 1; return db.query( - "SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) FROM mutations m " - + "WHERE m.uid = ? AND m.batch_id >= ? " - + "ORDER BY m.batch_id ASC LIMIT 1") + "SELECT batch_id, SUBSTR(mutations, 1, ?) FROM mutations " + + "WHERE uid = ? AND batch_id >= ? " + + "ORDER BY batch_id ASC LIMIT 1") .binding(BLOB_MAX_INLINE_LENGTH, uid, nextBatchId) .firstValue(row -> decodeInlineMutationBatch(row.getInt(0), row.getBlob(1))); } @@ -227,8 +237,8 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) { public List getAllMutationBatches() { List result = new ArrayList<>(); db.query( - "SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) " - + "FROM mutations m " + "SELECT batch_id, SUBSTR(mutations, 1, ?) " + + "FROM mutations " + "WHERE uid = ? ORDER BY batch_id ASC") .binding(BLOB_MAX_INLINE_LENGTH, uid) .forEach(row -> result.add(decodeInlineMutationBatch(row.getInt(0), row.getBlob(1)))); @@ -407,7 +417,7 @@ public void performConsistencyCheck() { } /** - * Decodes mutation batch bytes obtained via substring. If the blob is too smaller than + * Decodes mutation batch bytes obtained via substring. If the blob is smaller than * BLOB_MAX_INLINE_LENGTH, executes additional queries to load the rest of the blob. * * @param batchId The batch ID of the row containing the bytes, for fallback lookup if the value From 5859e555c8578c7929c4c2131f7edb6149bf7eb0 Mon Sep 17 00:00:00 2001 From: Marek Gilbert Date: Mon, 28 Jan 2019 13:37:04 -0800 Subject: [PATCH 8/9] Review feedback --- .../google/firebase/firestore/local/SQLiteMutationQueue.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java index ebd54e46a76..9c41c101ab8 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java @@ -434,6 +434,9 @@ private MutationBatch decodeInlineMutationBatch(int batchId, byte[] bytes) { BlobAccumulator accumulator = new BlobAccumulator(bytes); while (accumulator.more) { + // As we read in chunks the start of the next chunk should be the total accumulated length + // plus 1 (since SUBSTR() counts from 1). The second argument is not adjusted because it's + // the length of the chunk, not the end index. int start = accumulator.numChunks() * BLOB_MAX_INLINE_LENGTH + 1; db.query("SELECT SUBSTR(mutations, ?, ?) FROM mutations WHERE uid = ? AND batch_id = ?") From 9ef85f040f5d4bc821ce2ffda92b576bc0f73428 Mon Sep 17 00:00:00 2001 From: Marek Gilbert Date: Mon, 28 Jan 2019 14:59:34 -0800 Subject: [PATCH 9/9] Changelog --- firebase-firestore/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/firebase-firestore/CHANGELOG.md b/firebase-firestore/CHANGELOG.md index 4224745b3a4..d1c177a5a86 100644 --- a/firebase-firestore/CHANGELOG.md +++ b/firebase-firestore/CHANGELOG.md @@ -1,4 +1,6 @@ # Unreleased +- [fixed] Fixed an issue where Firestore would crash if handling write batches + larger than 2 MB in size (#208). # 18.0.0 - [changed] The `timestampsInSnapshotsEnabled` setting is now enabled by