diff --git a/firebase-firestore/CHANGELOG.md b/firebase-firestore/CHANGELOG.md index 4224745b3a4..d1c177a5a86 100644 --- a/firebase-firestore/CHANGELOG.md +++ b/firebase-firestore/CHANGELOG.md @@ -1,4 +1,6 @@ # Unreleased +- [fixed] Fixed an issue where Firestore would crash if handling write batches + larger than 2 MB in size (#208). # 18.0.0 - [changed] The `timestampsInSnapshotsEnabled` setting is now enabled by diff --git a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java index 6dc62a3b8ba..10544e735fc 100644 --- a/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java +++ b/firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java @@ -32,7 +32,10 @@ import com.google.firebase.firestore.FirebaseFirestoreException.Code; import com.google.firebase.firestore.testutil.EventAccumulator; import com.google.firebase.firestore.testutil.IntegrationTestUtil; +import com.google.firebase.firestore.util.Util; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; @@ -253,4 +256,39 @@ public void testCanWriteTheSameDocumentMultipleTimes() { assertNotNull(when); assertEquals(map("a", 1L, "b", 2L, "when", when), serverSnap.getData()); } + + @Test + public void testCanWriteVeryLargeBatches() { + // On Android, SQLite Cursors are limited reading no more than 2 MB per row (despite being able + // to write very large values). This test verifies that the SQLiteMutationQueue properly works + // around this limitation. + + // Create a map containing nearly 1 MB of data. Note that if you use 1024 below this will create + // a document larger than 1 MB, which will be rejected by the backend as too large. + String a = Character.toString('a'); + StringBuilder buf = new StringBuilder(1000); + for (int i = 0; i < 1000; i++) { + buf.append(a); + } + String kb = buf.toString(); + Map values = new HashMap<>(); + for (int j = 0; j < 1000; j++) { + values.put(Util.autoId(), kb); + } + + DocumentReference doc = testDocument(); + WriteBatch batch = doc.getFirestore().batch(); + + // Write a batch containing 3 copies of the data, creating a ~3 MB batch. Writing to the same + // document in a batch is allowed and so long as the net size of the document is under 1 MB the + // batch is allowed. + batch.set(doc, values); + for (int i = 0; i < 2; i++) { + batch.update(doc, values); + } + + waitFor(batch.commit()); + DocumentSnapshot snap = waitFor(doc.get()); + assertEquals(values, snap.getData()); + } } diff --git a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java index adfd3c1de1d..9c41c101ab8 100644 --- a/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java +++ b/firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java @@ -18,6 +18,7 @@ import static com.google.firebase.firestore.util.Assert.fail; import static com.google.firebase.firestore.util.Assert.hardAssert; +import android.database.Cursor; import android.database.sqlite.SQLiteStatement; import com.google.firebase.Timestamp; import com.google.firebase.firestore.auth.User; @@ -27,6 +28,7 @@ import com.google.firebase.firestore.model.mutation.Mutation; import com.google.firebase.firestore.model.mutation.MutationBatch; import com.google.firebase.firestore.remote.WriteStream; +import com.google.firebase.firestore.util.Consumer; import com.google.firebase.firestore.util.Util; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; @@ -42,6 +44,18 @@ /** A mutation queue for a specific user, backed by SQLite. */ final class SQLiteMutationQueue implements MutationQueue { + /** + * On Android, SQLite Cursors are limited reading no more than 2 MB per row (despite being able to + * write very large values). All reads of the mutations column in the mutations table need to read + * in chunks with SUBSTR to avoid going over this limit. + * + *

The value here has to be 2 MB or smaller, while allowing for all possible other values that + * might be selected out along with the mutations column in any given result set. Nearly 1 MB is + * conservative, but allows all combinations of document paths and batch ids without needing to + * figure out if the row has gotten too large. + */ + private static final int BLOB_MAX_INLINE_LENGTH = 1000000; + private final SQLitePersistence db; private final LocalSerializer serializer; @@ -90,10 +104,7 @@ public void start() { int rows = db.query("SELECT last_stream_token FROM mutation_queues WHERE uid = ?") .binding(uid) - .first( - row -> { - lastStreamToken = ByteString.copyFrom(row.getBlob(0)); - }); + .first(row -> lastStreamToken = ByteString.copyFrom(row.getBlob(0))); if (rows == 0) { // Ensure we write a default entry in mutation_queues since loadNextBatchIdAcrossAllUsers() @@ -204,9 +215,9 @@ public MutationBatch addMutationBatch(Timestamp localWriteTime, List m @Nullable @Override public MutationBatch lookupMutationBatch(int batchId) { - return db.query("SELECT mutations FROM mutations WHERE uid = ? AND batch_id = ?") - .binding(uid, batchId) - .firstValue(row -> decodeMutationBatch(row.getBlob(0))); + return db.query("SELECT SUBSTR(mutations, 1, ?) FROM mutations WHERE uid = ? AND batch_id = ?") + .binding(BLOB_MAX_INLINE_LENGTH, uid, batchId) + .firstValue(row -> decodeInlineMutationBatch(batchId, row.getBlob(0))); } @Nullable @@ -215,19 +226,22 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) { int nextBatchId = batchId + 1; return db.query( - "SELECT mutations FROM mutations " + "SELECT batch_id, SUBSTR(mutations, 1, ?) FROM mutations " + "WHERE uid = ? AND batch_id >= ? " + "ORDER BY batch_id ASC LIMIT 1") - .binding(uid, nextBatchId) - .firstValue(row -> decodeMutationBatch(row.getBlob(0))); + .binding(BLOB_MAX_INLINE_LENGTH, uid, nextBatchId) + .firstValue(row -> decodeInlineMutationBatch(row.getInt(0), row.getBlob(1))); } @Override public List getAllMutationBatches() { List result = new ArrayList<>(); - db.query("SELECT mutations FROM mutations WHERE uid = ? ORDER BY batch_id ASC") - .binding(uid) - .forEach(row -> result.add(decodeMutationBatch(row.getBlob(0)))); + db.query( + "SELECT batch_id, SUBSTR(mutations, 1, ?) " + + "FROM mutations " + + "WHERE uid = ? ORDER BY batch_id ASC") + .binding(BLOB_MAX_INLINE_LENGTH, uid) + .forEach(row -> result.add(decodeInlineMutationBatch(row.getInt(0), row.getBlob(1)))); return result; } @@ -237,14 +251,15 @@ public List getAllMutationBatchesAffectingDocumentKey(DocumentKey List result = new ArrayList<>(); db.query( - "SELECT m.mutations FROM document_mutations dm, mutations m " + "SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) " + + "FROM document_mutations dm, mutations m " + "WHERE dm.uid = ? " + "AND dm.path = ? " + "AND dm.uid = m.uid " + "AND dm.batch_id = m.batch_id " + "ORDER BY dm.batch_id") - .binding(uid, path) - .forEach(row -> result.add(decodeMutationBatch(row.getBlob(0)))); + .binding(BLOB_MAX_INLINE_LENGTH, uid, path) + .forEach(row -> result.add(decodeInlineMutationBatch(row.getInt(0), row.getBlob(1)))); return result; } @@ -259,10 +274,11 @@ public List getAllMutationBatchesAffectingDocumentKeys( SQLitePersistence.LongQuery longQuery = new SQLitePersistence.LongQuery( db, - "SELECT DISTINCT dm.batch_id, m.mutations FROM document_mutations dm, mutations m " + "SELECT DISTINCT dm.batch_id, SUBSTR(m.mutations, 1, ?) " + + "FROM document_mutations dm, mutations m " + "WHERE dm.uid = ? " + "AND dm.path IN (", - Arrays.asList(uid), + Arrays.asList(BLOB_MAX_INLINE_LENGTH, uid), args, ") " + "AND dm.uid = m.uid " @@ -279,7 +295,7 @@ public List getAllMutationBatchesAffectingDocumentKeys( int batchId = row.getInt(0); if (!uniqueBatchIds.contains(batchId)) { uniqueBatchIds.add(batchId); - result.add(decodeMutationBatch(row.getBlob(1))); + result.add(decodeInlineMutationBatch(batchId, row.getBlob(1))); } }); } @@ -321,14 +337,15 @@ public List getAllMutationBatchesAffectingQuery(Query query) { List result = new ArrayList<>(); db.query( - "SELECT dm.batch_id, dm.path, m.mutations FROM document_mutations dm, mutations m " + "SELECT dm.batch_id, dm.path, SUBSTR(m.mutations, 1, ?) " + + "FROM document_mutations dm, mutations m " + "WHERE dm.uid = ? " + "AND dm.path >= ? " + "AND dm.path < ? " + "AND dm.uid = m.uid " + "AND dm.batch_id = m.batch_id " + "ORDER BY dm.batch_id") - .binding(uid, prefixPath, prefixSuccessorPath) + .binding(BLOB_MAX_INLINE_LENGTH, uid, prefixPath, prefixSuccessorPath) .forEach( row -> { // Ensure unique batches only. This works because the batches come out in order so we @@ -350,7 +367,7 @@ public List getAllMutationBatchesAffectingQuery(Query query) { return; } - result.add(decodeMutationBatch(row.getBlob(2))); + result.add(decodeInlineMutationBatch(batchId, row.getBlob(2))); }); return result; @@ -399,12 +416,79 @@ public void performConsistencyCheck() { danglingMutationReferences); } - private MutationBatch decodeMutationBatch(byte[] bytes) { + /** + * Decodes mutation batch bytes obtained via substring. If the blob is smaller than + * BLOB_MAX_INLINE_LENGTH, executes additional queries to load the rest of the blob. + * + * @param batchId The batch ID of the row containing the bytes, for fallback lookup if the value + * is too large. + * @param bytes The bytes of the first chunk of the mutation batch. Should be obtained via + * SUBSTR(mutations, 1, BLOB_MAX_INLINE_LENGTH). + */ + private MutationBatch decodeInlineMutationBatch(int batchId, byte[] bytes) { try { + if (bytes.length < BLOB_MAX_INLINE_LENGTH) { + return serializer.decodeMutationBatch( + com.google.firebase.firestore.proto.WriteBatch.parseFrom(bytes)); + } + + BlobAccumulator accumulator = new BlobAccumulator(bytes); + while (accumulator.more) { + // As we read in chunks the start of the next chunk should be the total accumulated length + // plus 1 (since SUBSTR() counts from 1). The second argument is not adjusted because it's + // the length of the chunk, not the end index. + int start = accumulator.numChunks() * BLOB_MAX_INLINE_LENGTH + 1; + + db.query("SELECT SUBSTR(mutations, ?, ?) FROM mutations WHERE uid = ? AND batch_id = ?") + .binding(start, BLOB_MAX_INLINE_LENGTH, uid, batchId) + .first(accumulator); + } + + ByteString blob = accumulator.result(); return serializer.decodeMutationBatch( - com.google.firebase.firestore.proto.WriteBatch.parseFrom(bytes)); + com.google.firebase.firestore.proto.WriteBatch.parseFrom(blob)); } catch (InvalidProtocolBufferException e) { throw fail("MutationBatch failed to parse: %s", e); } } + + /** + * Explicit consumer of blob chunks, accumulating the chunks and wrapping them in a single + * ByteString. Accepts a Cursor whose results include the blob in column 0. + * + *

(This is a named class here to allow decodeInlineMutationBlock to access the result of the + * accumulation.) + */ + private static class BlobAccumulator implements Consumer { + private final ArrayList chunks = new ArrayList<>(); + private boolean more = true; + + BlobAccumulator(byte[] firstChunk) { + addChunk(firstChunk); + } + + int numChunks() { + return chunks.size(); + } + + ByteString result() { + // Not actually a copy; this creates a balanced rope-like structure that reuses the given + // ByteStrings as a part of its representation. + return ByteString.copyFrom(chunks); + } + + @Override + public void accept(Cursor row) { + byte[] bytes = row.getBlob(0); + addChunk(bytes); + if (bytes.length < BLOB_MAX_INLINE_LENGTH) { + more = false; + } + } + + private void addChunk(byte[] bytes) { + ByteString wrapped = ByteString.copyFrom(bytes); + chunks.add(wrapped); + } + } } diff --git a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/MutationQueueTestCase.java b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/MutationQueueTestCase.java index ac3da7ec08c..ccbbc7f0e76 100644 --- a/firebase-firestore/src/test/java/com/google/firebase/firestore/local/MutationQueueTestCase.java +++ b/firebase-firestore/src/test/java/com/google/firebase/firestore/local/MutationQueueTestCase.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.firebase.Timestamp; @@ -326,7 +325,6 @@ public void testAllMutationBatchesAffectingQuery_withCompoundBatches() { @Test public void testRemoveMutationBatches() { List batches = createBatches(10); - MutationBatch last = batches.get(batches.size() - 1); removeMutationBatches(batches.remove(0)); assertEquals(9, batchCount());