-
Notifications
You must be signed in to change notification settings - Fork 197
Input buffer capacity exceeded when reading large values #201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
HI @liutec, Thanks for bring this issue to us. It should not be needed to enlarge the capacity manually as the buffer should automatically grow for big records. Could you share us what error did you got? Cheers, |
Hello, sorry for the late reply as well as for not including all the steps to reproduce this issue. neo4j server v3.0.5 import sys
import neo4j
from neo4j.v1 import GraphDatabase
print(sys.version_info)
print("neo4j-driver", neo4j.__version__)
neo4j_driver = GraphDatabase.driver(
uri="bolt://xxx.xxx.xxx.xxx:7687",
auth=("neo4j", "neo4j")
)
cypher = "MERGE (n:TestNode) SET n.data = $data"
params = {
"data": "x" * 524111 # does not reproduce below 524111
}
with neo4j_driver.session() as tx:
tx.run(cypher, params)
del params
cypher = "MATCH (n:TestNode) RETURN n.data AS data LIMIT 1"
with neo4j_driver.session() as tx:
records = tx.run(cypher)
for record in records:
print(len(record['data']))
|
The problem is most likely here: |
This should fix the issue, but for performance reasons I would still prefer to be able to set the buffer capacity: def receive(self, socket, n):
"""
Note: may modify buffer size, should error if frame exists
"""
new_extent = self._extent + n
overflow = new_extent - len(self._data)
if overflow > 0:
if self._recycle():
return self.receive(socket, n)
self._view = None
new_data = bytearray(new_extent)
new_data[:self._extent] = self._data
self._data = new_data
self._view = memoryview(self._data)
data_size = socket.recv_into(self._view[self._extent:new_extent])
new_extent = self._extent + data_size
self._extent = new_extent
return data_size |
Hi @liutec Thanks for your rich info. I got a possible idea to fix the issue you have. Would you be able to try the fix out in your code? The changes are described as follows:
Please let us know if the suggested change would fix your issue! Thanks again, |
Hello, both changes fix the issue up to 6MB (not hitting TransientError: dbms.memory.heap.max_size). After adding your changes, the problems became apparent:
def write(self, b):
new_data_start = 0
new_data_size = len(b)
while new_data_start < new_data_size:
chunk_occupied = self._end - self._start
chunk_remaining = self._max_chunk_size - chunk_occupied
if chunk_remaining == 0:
self.chunk()
chunk_remaining = self._max_chunk_size
chunk_write_size = min(chunk_remaining, new_data_size - new_data_start)
new_end = self._end + chunk_write_size
new_chunk_size = new_end - self._start
self._data[self._end:new_end] = b[new_data_start:(new_data_start + chunk_write_size)]
new_data_start += chunk_write_size
self._end = new_end
self._data[self._header:(self._header + 2)] = struct_pack(">H", new_chunk_size) this seems to fix the problem for Python 2.7 as well as it drastically lowers the memory consumption but quite often the GC seems to have a hard time with the 8192 increment in ChunkedInputBuffer.receive which leads to excessive delays and/or hangs. data_size = self._end - self._start
if data_size > new_data_size:
new_end = self._end + new_data_size
self._data[self._end:new_end] = bytearray(data_size) this dirty fix seems to eliminate the hangs but with larger values it may be impossible to have contiguous memory blocks. Unfortunately, for Python 3.5 the next issue is raised:
Thank you for your prompt response, and again, sorry for replying so late. Here are the tested changes: liutec@7b56264#diff-67ae6a6f455d60cc2bbf6c2ba4aa3ec1 |
Forgot to add the output for the remaining issue for Python 3:
|
Hello, I've also changed MessageFrame.read and removed the recursion. With this I no longer have any issues: The performance seems to have improved significantly based on this test: import sys
import neo4j
import time
from neo4j.v1 import GraphDatabase, TransientError
print(sys.version_info)
print("neo4j-driver", neo4j.__version__)
neo4j_driver = GraphDatabase.driver(
uri="bolt://localhost:7687",
auth=("neo4j", "neo4j")
)
for i in range(1, 10):
try:
start = time.time()
data = "x" * (i * 2 ** 20)
cypher = "RETURN '{}' AS data".format(data)
with neo4j_driver.session() as tx:
records = tx.run(cypher)
for record in records:
if record['data'] != data:
print('ERROR')
print('%d MB time = %.2f sec' % (i, time.time() - start))
except TransientError as e:
print(str(e))
break With your proposed changes and no recursion:
With just your proposed changes:
|
@liutec Zhen |
When reading results with values larger than the ChunkedInputBuffer's default capacity, a non-descriptive error gets triggered.
It would be nice to have the buffer capacities configurable along with a specific error message.
I'm currently using this as a workaround:
I realize it's not the best idea to store large values in Neo4j but as long as it's not prohibited they should be readable.
liutec@165946f
The text was updated successfully, but these errors were encountered: