Skip to content

Commit 2fc4282

Browse files
authored
PYTHON-4146 Improve GridFS upload performance by batch writing chunks with insert_many (mongodb#1478)
1 parent c3458e9 commit 2fc4282

File tree

3 files changed

+100
-16
lines changed

3 files changed

+100
-16
lines changed

gridfs/grid_file.py

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,23 @@
2121
import os
2222
from typing import Any, Iterable, Mapping, NoReturn, Optional
2323

24-
from bson.binary import Binary
2524
from bson.int64 import Int64
2625
from bson.objectid import ObjectId
2726
from gridfs.errors import CorruptGridFile, FileExists, NoFile
2827
from pymongo import ASCENDING
2928
from pymongo.client_session import ClientSession
3029
from pymongo.collection import Collection
30+
from pymongo.common import MAX_MESSAGE_SIZE
3131
from pymongo.cursor import Cursor
3232
from pymongo.errors import (
33+
BulkWriteError,
3334
ConfigurationError,
3435
CursorNotFound,
3536
DuplicateKeyError,
3637
InvalidOperation,
3738
OperationFailure,
3839
)
40+
from pymongo.helpers import _check_write_command_response
3941
from pymongo.read_preferences import ReadPreference
4042

4143
_SEEK_SET = os.SEEK_SET
@@ -48,6 +50,13 @@
4850
"""Default chunk size, in bytes."""
4951
# Slightly under a power of 2, to work well with server's record allocations.
5052
DEFAULT_CHUNK_SIZE = 255 * 1024
53+
# The number of chunked bytes to buffer before calling insert_many.
54+
_UPLOAD_BUFFER_SIZE = MAX_MESSAGE_SIZE
55+
# The number of chunk documents to buffer before calling insert_many.
56+
_UPLOAD_BUFFER_CHUNKS = 100000
57+
# Rough BSON overhead of a chunk document not including the chunk data itself.
58+
# Essentially len(encode({"_id": ObjectId(), "files_id": ObjectId(), "n": 1, "data": ""}))
59+
_CHUNK_OVERHEAD = 60
5160

5261
_C_INDEX: dict[str, Any] = {"files_id": ASCENDING, "n": ASCENDING}
5362
_F_INDEX: dict[str, Any] = {"filename": ASCENDING, "uploadDate": ASCENDING}
@@ -198,6 +207,8 @@ def __init__(
198207
object.__setattr__(self, "_chunk_number", 0)
199208
object.__setattr__(self, "_closed", False)
200209
object.__setattr__(self, "_ensured_index", False)
210+
object.__setattr__(self, "_buffered_docs", [])
211+
object.__setattr__(self, "_buffered_docs_size", 0)
201212

202213
def __create_index(self, collection: Collection, index_key: Any, unique: bool) -> None:
203214
doc = collection.find_one(projection={"_id": 1}, session=self._session)
@@ -249,6 +260,8 @@ def closed(self) -> bool:
249260

250261
_buffer: io.BytesIO
251262
_closed: bool
263+
_buffered_docs: list[dict[str, Any]]
264+
_buffered_docs_size: int
252265

253266
def __getattr__(self, name: str) -> Any:
254267
if name in self._file:
@@ -268,32 +281,52 @@ def __setattr__(self, name: str, value: Any) -> None:
268281
if self._closed:
269282
self._coll.files.update_one({"_id": self._file["_id"]}, {"$set": {name: value}})
270283

271-
def __flush_data(self, data: Any) -> None:
284+
def __flush_data(self, data: Any, force: bool = False) -> None:
272285
"""Flush `data` to a chunk."""
273286
self.__ensure_indexes()
274-
if not data:
275-
return
276287
assert len(data) <= self.chunk_size
277-
278-
chunk = {"files_id": self._file["_id"], "n": self._chunk_number, "data": Binary(data)}
279-
280-
try:
281-
self._chunks.insert_one(chunk, session=self._session)
282-
except DuplicateKeyError:
283-
self._raise_file_exists(self._file["_id"])
288+
if data:
289+
self._buffered_docs.append(
290+
{"files_id": self._file["_id"], "n": self._chunk_number, "data": data}
291+
)
292+
self._buffered_docs_size += len(data) + _CHUNK_OVERHEAD
293+
if not self._buffered_docs:
294+
return
295+
# Limit to 100,000 chunks or 32MB (+1 chunk) of data.
296+
if (
297+
force
298+
or self._buffered_docs_size >= _UPLOAD_BUFFER_SIZE
299+
or len(self._buffered_docs) >= _UPLOAD_BUFFER_CHUNKS
300+
):
301+
try:
302+
self._chunks.insert_many(self._buffered_docs, session=self._session)
303+
except BulkWriteError as exc:
304+
# For backwards compatibility, raise an insert_one style exception.
305+
write_errors = exc.details["writeErrors"]
306+
for err in write_errors:
307+
if err.get("code") in (11000, 11001, 12582): # Duplicate key errors
308+
self._raise_file_exists(self._file["_id"])
309+
result = {"writeErrors": write_errors}
310+
wces = exc.details["writeConcernErrors"]
311+
if wces:
312+
result["writeConcernError"] = wces[-1]
313+
_check_write_command_response(result)
314+
raise
315+
self._buffered_docs = []
316+
self._buffered_docs_size = 0
284317
self._chunk_number += 1
285318
self._position += len(data)
286319

287-
def __flush_buffer(self) -> None:
320+
def __flush_buffer(self, force: bool = False) -> None:
288321
"""Flush the buffer contents out to a chunk."""
289-
self.__flush_data(self._buffer.getvalue())
322+
self.__flush_data(self._buffer.getvalue(), force=force)
290323
self._buffer.close()
291324
self._buffer = io.BytesIO()
292325

293326
def __flush(self) -> Any:
294327
"""Flush the file to the database."""
295328
try:
296-
self.__flush_buffer()
329+
self.__flush_buffer(force=True)
297330
# The GridFS spec says length SHOULD be an Int64.
298331
self._file["length"] = Int64(self._position)
299332
self._file["uploadDate"] = datetime.datetime.now(tz=datetime.timezone.utc)

test/test_gridfs.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import threading
2222
import time
2323
from io import BytesIO
24+
from unittest.mock import patch
2425

2526
sys.path[0:0] = [""]
2627

@@ -30,7 +31,7 @@
3031
import gridfs
3132
from bson.binary import Binary
3233
from gridfs.errors import CorruptGridFile, FileExists, NoFile
33-
from gridfs.grid_file import GridOutCursor
34+
from gridfs.grid_file import DEFAULT_CHUNK_SIZE, GridOutCursor
3435
from pymongo.database import Database
3536
from pymongo.errors import (
3637
ConfigurationError,
@@ -344,8 +345,18 @@ def test_file_exists(self):
344345
one.write(b"some content")
345346
one.close()
346347

348+
# Attempt to upload a file with more chunks to the same _id.
349+
with patch("gridfs.grid_file._UPLOAD_BUFFER_SIZE", DEFAULT_CHUNK_SIZE):
350+
two = self.fs.new_file(_id=123)
351+
self.assertRaises(FileExists, two.write, b"x" * DEFAULT_CHUNK_SIZE * 3)
352+
# Original file is still readable (no extra chunks were uploaded).
353+
self.assertEqual(self.fs.get(123).read(), b"some content")
354+
347355
two = self.fs.new_file(_id=123)
348-
self.assertRaises(FileExists, two.write, b"x" * 262146)
356+
two.write(b"some content")
357+
self.assertRaises(FileExists, two.close)
358+
# Original file is still readable.
359+
self.assertEqual(self.fs.get(123).read(), b"some content")
349360

350361
def test_exists(self):
351362
oid = self.fs.put(b"hello")

test/test_gridfs_bucket.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@
1818

1919
import datetime
2020
import itertools
21+
import sys
2122
import threading
2223
import time
2324
from io import BytesIO
25+
from unittest.mock import patch
26+
27+
sys.path[0:0] = [""]
28+
2429
from test import IntegrationTest, client_context, unittest
2530
from test.utils import joinall, one, rs_client, rs_or_single_client, single_client
2631

@@ -34,6 +39,7 @@
3439
ConfigurationError,
3540
NotPrimaryError,
3641
ServerSelectionTimeoutError,
42+
WriteConcernError,
3743
)
3844
from pymongo.mongo_client import MongoClient
3945
from pymongo.read_preferences import ReadPreference
@@ -276,6 +282,39 @@ def test_upload_from_stream_with_id(self):
276282
)
277283
self.assertEqual(b"custom id", self.fs.open_download_stream(oid).read())
278284

285+
@patch("gridfs.grid_file._UPLOAD_BUFFER_CHUNKS", 3)
286+
@client_context.require_failCommand_fail_point
287+
def test_upload_bulk_write_error(self):
288+
# Test BulkWriteError from insert_many is converted to an insert_one style error.
289+
expected_wce = {
290+
"code": 100,
291+
"codeName": "UnsatisfiableWriteConcern",
292+
"errmsg": "Not enough data-bearing nodes",
293+
}
294+
cause_wce = {
295+
"configureFailPoint": "failCommand",
296+
"mode": {"times": 2},
297+
"data": {"failCommands": ["insert"], "writeConcernError": expected_wce},
298+
}
299+
gin = self.fs.open_upload_stream("test_file", chunk_size_bytes=1)
300+
with self.fail_point(cause_wce):
301+
# Assert we raise WriteConcernError, not BulkWriteError.
302+
with self.assertRaises(WriteConcernError):
303+
gin.write(b"hello world")
304+
# 3 chunks were uploaded.
305+
self.assertEqual(3, self.db.fs.chunks.count_documents({"files_id": gin._id}))
306+
gin.abort()
307+
308+
@patch("gridfs.grid_file._UPLOAD_BUFFER_CHUNKS", 10)
309+
def test_upload_batching(self):
310+
with self.fs.open_upload_stream("test_file", chunk_size_bytes=1) as gin:
311+
gin.write(b"s" * (10 - 1))
312+
# No chunks were uploaded yet.
313+
self.assertEqual(0, self.db.fs.chunks.count_documents({"files_id": gin._id}))
314+
gin.write(b"s")
315+
# All chunks were uploaded since we hit the _UPLOAD_BUFFER_CHUNKS limit.
316+
self.assertEqual(10, self.db.fs.chunks.count_documents({"files_id": gin._id}))
317+
279318
def test_open_upload_stream(self):
280319
gin = self.fs.open_upload_stream("from_stream")
281320
gin.write(b"from stream")
@@ -362,6 +401,7 @@ def test_rename(self):
362401
self.assertRaises(NoFile, self.fs.open_download_stream_by_name, "first_name")
363402
self.assertEqual(b"testing", self.fs.open_download_stream_by_name("second_name").read())
364403

404+
@patch("gridfs.grid_file._UPLOAD_BUFFER_SIZE", 5)
365405
def test_abort(self):
366406
gin = self.fs.open_upload_stream("test_filename", chunk_size_bytes=5)
367407
gin.write(b"test1")

0 commit comments

Comments
 (0)