Skip to content

Handle large write batches #215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions firebase-firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Unreleased
- [fixed] Fixed an issue where Firestore would crash if handling write batches
larger than 2 MB in size (#208).

# 18.0.0
- [changed] The `timestampsInSnapshotsEnabled` setting is now enabled by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import com.google.firebase.firestore.FirebaseFirestoreException.Code;
import com.google.firebase.firestore.testutil.EventAccumulator;
import com.google.firebase.firestore.testutil.IntegrationTestUtil;
import com.google.firebase.firestore.util.Util;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -253,4 +256,39 @@ public void testCanWriteTheSameDocumentMultipleTimes() {
assertNotNull(when);
assertEquals(map("a", 1L, "b", 2L, "when", when), serverSnap.getData());
}

@Test
public void testCanWriteVeryLargeBatches() {
// On Android, SQLite Cursors are limited reading no more than 2 MB per row (despite being able
// to write very large values). This test verifies that the SQLiteMutationQueue properly works
// around this limitation.

// Create a map containing nearly 1 MB of data. Note that if you use 1024 below this will create
// a document larger than 1 MB, which will be rejected by the backend as too large.
String a = Character.toString('a');
StringBuilder buf = new StringBuilder(1000);
for (int i = 0; i < 1000; i++) {
buf.append(a);
}
String kb = buf.toString();
Map<String, Object> values = new HashMap<>();
for (int j = 0; j < 1000; j++) {
values.put(Util.autoId(), kb);
}

DocumentReference doc = testDocument();
WriteBatch batch = doc.getFirestore().batch();

// Write a batch containing 3 copies of the data, creating a ~3 MB batch. Writing to the same
// document in a batch is allowed and so long as the net size of the document is under 1 MB the
// batch is allowed.
batch.set(doc, values);
for (int i = 0; i < 2; i++) {
batch.update(doc, values);
}

waitFor(batch.commit());
DocumentSnapshot snap = waitFor(doc.get());
assertEquals(values, snap.getData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.google.firebase.firestore.util.Assert.fail;
import static com.google.firebase.firestore.util.Assert.hardAssert;

import android.database.Cursor;
import android.database.sqlite.SQLiteStatement;
import com.google.firebase.Timestamp;
import com.google.firebase.firestore.auth.User;
Expand All @@ -27,6 +28,7 @@
import com.google.firebase.firestore.model.mutation.Mutation;
import com.google.firebase.firestore.model.mutation.MutationBatch;
import com.google.firebase.firestore.remote.WriteStream;
import com.google.firebase.firestore.util.Consumer;
import com.google.firebase.firestore.util.Util;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
Expand All @@ -42,6 +44,18 @@
/** A mutation queue for a specific user, backed by SQLite. */
final class SQLiteMutationQueue implements MutationQueue {

/**
* On Android, SQLite Cursors are limited reading no more than 2 MB per row (despite being able to
* write very large values). All reads of the mutations column in the mutations table need to read
* in chunks with SUBSTR to avoid going over this limit.
*
* <p>The value here has to be 2 MB or smaller, while allowing for all possible other values that
* might be selected out along with the mutations column in any given result set. Nearly 1 MB is
* conservative, but allows all combinations of document paths and batch ids without needing to
* figure out if the row has gotten too large.
*/
private static final int BLOB_MAX_INLINE_LENGTH = 1000000;

private final SQLitePersistence db;
private final LocalSerializer serializer;

Expand Down Expand Up @@ -90,10 +104,7 @@ public void start() {
int rows =
db.query("SELECT last_stream_token FROM mutation_queues WHERE uid = ?")
.binding(uid)
.first(
row -> {
lastStreamToken = ByteString.copyFrom(row.getBlob(0));
});
.first(row -> lastStreamToken = ByteString.copyFrom(row.getBlob(0)));

if (rows == 0) {
// Ensure we write a default entry in mutation_queues since loadNextBatchIdAcrossAllUsers()
Expand Down Expand Up @@ -204,9 +215,9 @@ public MutationBatch addMutationBatch(Timestamp localWriteTime, List<Mutation> m
@Nullable
@Override
public MutationBatch lookupMutationBatch(int batchId) {
return db.query("SELECT mutations FROM mutations WHERE uid = ? AND batch_id = ?")
.binding(uid, batchId)
.firstValue(row -> decodeMutationBatch(row.getBlob(0)));
return db.query("SELECT SUBSTR(mutations, 1, ?) FROM mutations WHERE uid = ? AND batch_id = ?")
.binding(BLOB_MAX_INLINE_LENGTH, uid, batchId)
.firstValue(row -> decodeInlineMutationBatch(batchId, row.getBlob(0)));
}

@Nullable
Expand All @@ -215,19 +226,22 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) {
int nextBatchId = batchId + 1;

return db.query(
"SELECT mutations FROM mutations "
"SELECT batch_id, SUBSTR(mutations, 1, ?) FROM mutations "
+ "WHERE uid = ? AND batch_id >= ? "
+ "ORDER BY batch_id ASC LIMIT 1")
.binding(uid, nextBatchId)
.firstValue(row -> decodeMutationBatch(row.getBlob(0)));
.binding(BLOB_MAX_INLINE_LENGTH, uid, nextBatchId)
.firstValue(row -> decodeInlineMutationBatch(row.getInt(0), row.getBlob(1)));
}

@Override
public List<MutationBatch> getAllMutationBatches() {
List<MutationBatch> result = new ArrayList<>();
db.query("SELECT mutations FROM mutations WHERE uid = ? ORDER BY batch_id ASC")
.binding(uid)
.forEach(row -> result.add(decodeMutationBatch(row.getBlob(0))));
db.query(
"SELECT batch_id, SUBSTR(mutations, 1, ?) "
+ "FROM mutations "
+ "WHERE uid = ? ORDER BY batch_id ASC")
.binding(BLOB_MAX_INLINE_LENGTH, uid)
.forEach(row -> result.add(decodeInlineMutationBatch(row.getInt(0), row.getBlob(1))));
return result;
}

Expand All @@ -237,14 +251,15 @@ public List<MutationBatch> getAllMutationBatchesAffectingDocumentKey(DocumentKey

List<MutationBatch> result = new ArrayList<>();
db.query(
"SELECT m.mutations FROM document_mutations dm, mutations m "
"SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) "
+ "FROM document_mutations dm, mutations m "
+ "WHERE dm.uid = ? "
+ "AND dm.path = ? "
+ "AND dm.uid = m.uid "
+ "AND dm.batch_id = m.batch_id "
+ "ORDER BY dm.batch_id")
.binding(uid, path)
.forEach(row -> result.add(decodeMutationBatch(row.getBlob(0))));
.binding(BLOB_MAX_INLINE_LENGTH, uid, path)
.forEach(row -> result.add(decodeInlineMutationBatch(row.getInt(0), row.getBlob(1))));
return result;
}

Expand All @@ -259,10 +274,11 @@ public List<MutationBatch> getAllMutationBatchesAffectingDocumentKeys(
SQLitePersistence.LongQuery longQuery =
new SQLitePersistence.LongQuery(
db,
"SELECT DISTINCT dm.batch_id, m.mutations FROM document_mutations dm, mutations m "
"SELECT DISTINCT dm.batch_id, SUBSTR(m.mutations, 1, ?) "
+ "FROM document_mutations dm, mutations m "
+ "WHERE dm.uid = ? "
+ "AND dm.path IN (",
Arrays.asList(uid),
Arrays.asList(BLOB_MAX_INLINE_LENGTH, uid),
args,
") "
+ "AND dm.uid = m.uid "
Expand All @@ -279,7 +295,7 @@ public List<MutationBatch> getAllMutationBatchesAffectingDocumentKeys(
int batchId = row.getInt(0);
if (!uniqueBatchIds.contains(batchId)) {
uniqueBatchIds.add(batchId);
result.add(decodeMutationBatch(row.getBlob(1)));
result.add(decodeInlineMutationBatch(batchId, row.getBlob(1)));
}
});
}
Expand Down Expand Up @@ -321,14 +337,15 @@ public List<MutationBatch> getAllMutationBatchesAffectingQuery(Query query) {

List<MutationBatch> result = new ArrayList<>();
db.query(
"SELECT dm.batch_id, dm.path, m.mutations FROM document_mutations dm, mutations m "
"SELECT dm.batch_id, dm.path, SUBSTR(m.mutations, 1, ?) "
+ "FROM document_mutations dm, mutations m "
+ "WHERE dm.uid = ? "
+ "AND dm.path >= ? "
+ "AND dm.path < ? "
+ "AND dm.uid = m.uid "
+ "AND dm.batch_id = m.batch_id "
+ "ORDER BY dm.batch_id")
.binding(uid, prefixPath, prefixSuccessorPath)
.binding(BLOB_MAX_INLINE_LENGTH, uid, prefixPath, prefixSuccessorPath)
.forEach(
row -> {
// Ensure unique batches only. This works because the batches come out in order so we
Expand All @@ -350,7 +367,7 @@ public List<MutationBatch> getAllMutationBatchesAffectingQuery(Query query) {
return;
}

result.add(decodeMutationBatch(row.getBlob(2)));
result.add(decodeInlineMutationBatch(batchId, row.getBlob(2)));
});

return result;
Expand Down Expand Up @@ -399,12 +416,79 @@ public void performConsistencyCheck() {
danglingMutationReferences);
}

private MutationBatch decodeMutationBatch(byte[] bytes) {
/**
* Decodes mutation batch bytes obtained via substring. If the blob is smaller than
* BLOB_MAX_INLINE_LENGTH, executes additional queries to load the rest of the blob.
*
* @param batchId The batch ID of the row containing the bytes, for fallback lookup if the value
* is too large.
* @param bytes The bytes of the first chunk of the mutation batch. Should be obtained via
* SUBSTR(mutations, 1, BLOB_MAX_INLINE_LENGTH).
*/
private MutationBatch decodeInlineMutationBatch(int batchId, byte[] bytes) {
try {
if (bytes.length < BLOB_MAX_INLINE_LENGTH) {
return serializer.decodeMutationBatch(
com.google.firebase.firestore.proto.WriteBatch.parseFrom(bytes));
}

BlobAccumulator accumulator = new BlobAccumulator(bytes);
while (accumulator.more) {
// As we read in chunks the start of the next chunk should be the total accumulated length
// plus 1 (since SUBSTR() counts from 1). The second argument is not adjusted because it's
// the length of the chunk, not the end index.
int start = accumulator.numChunks() * BLOB_MAX_INLINE_LENGTH + 1;

db.query("SELECT SUBSTR(mutations, ?, ?) FROM mutations WHERE uid = ? AND batch_id = ?")
.binding(start, BLOB_MAX_INLINE_LENGTH, uid, batchId)
.first(accumulator);
}

ByteString blob = accumulator.result();
return serializer.decodeMutationBatch(
com.google.firebase.firestore.proto.WriteBatch.parseFrom(bytes));
com.google.firebase.firestore.proto.WriteBatch.parseFrom(blob));
} catch (InvalidProtocolBufferException e) {
throw fail("MutationBatch failed to parse: %s", e);
}
}

/**
* Explicit consumer of blob chunks, accumulating the chunks and wrapping them in a single
* ByteString. Accepts a Cursor whose results include the blob in column 0.
*
* <p>(This is a named class here to allow decodeInlineMutationBlock to access the result of the
* accumulation.)
*/
private static class BlobAccumulator implements Consumer<Cursor> {
private final ArrayList<ByteString> chunks = new ArrayList<>();
private boolean more = true;

BlobAccumulator(byte[] firstChunk) {
addChunk(firstChunk);
}

int numChunks() {
return chunks.size();
}

ByteString result() {
// Not actually a copy; this creates a balanced rope-like structure that reuses the given
// ByteStrings as a part of its representation.
return ByteString.copyFrom(chunks);
}

@Override
public void accept(Cursor row) {
byte[] bytes = row.getBlob(0);
addChunk(bytes);
if (bytes.length < BLOB_MAX_INLINE_LENGTH) {
more = false;
}
}

private void addChunk(byte[] bytes) {
ByteString wrapped = ByteString.copyFrom(bytes);
chunks.add(wrapped);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import com.google.firebase.Timestamp;
Expand Down Expand Up @@ -326,7 +325,6 @@ public void testAllMutationBatchesAffectingQuery_withCompoundBatches() {
@Test
public void testRemoveMutationBatches() {
List<MutationBatch> batches = createBatches(10);
MutationBatch last = batches.get(batches.size() - 1);

removeMutationBatches(batches.remove(0));
assertEquals(9, batchCount());
Expand Down