Skip to content

Commit b0faed3

Browse files
committed
Cursor API
1 parent e57aef2 commit b0faed3

File tree

5 files changed

+253
-147
lines changed

5 files changed

+253
-147
lines changed

docs/source/index.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ Session API
2626

2727
.. autofunction:: neo4j.v1.record
2828

29-
.. autoclass:: neo4j.v1.Result
29+
.. autoclass:: neo4j.v1.ResultCursor
3030
:members:
3131

3232
.. autoclass:: neo4j.v1.ResultSummary

neo4j/v1/session.py

+134-44
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def recycle(self, session):
133133
pool.appendleft(session)
134134

135135

136-
class Result(list):
136+
class ResultCursor(object):
137137
""" A handler for the result of Cypher statement execution.
138138
"""
139139

@@ -143,60 +143,152 @@ class Result(list):
143143
#: Dictionary of parameters passed with the statement.
144144
parameters = None
145145

146-
def __init__(self, session, statement, parameters):
147-
super(Result, self).__init__()
148-
self.session = session
146+
def __init__(self, connection, statement, parameters):
147+
super(ResultCursor, self).__init__()
149148
self.statement = statement
150149
self.parameters = parameters
151150
self.keys = None
152-
self.more = True
153-
self.summary = None
154-
self.bench_test = None
151+
self._connection = connection
152+
self._current = None
153+
self._next = deque()
154+
self._position = -1
155+
self._summary = None
156+
self._bench_test = None
157+
158+
def is_open(self):
159+
""" Return ``True`` if this cursor is still open, ``False`` otherwise.
160+
"""
161+
return bool(self._connection)
155162

156-
def on_header(self, metadata):
157-
""" Called on receipt of the result header.
163+
def close(self):
164+
""" Consume the remainder of this result and detach the connection
165+
from this cursor.
158166
"""
159-
self.keys = metadata["fields"]
160-
if self.bench_test:
161-
self.bench_test.start_recv = perf_counter()
167+
if self._connection and not self._connection.closed:
168+
self._consume()
169+
self._connection = None
162170

163-
def on_record(self, values):
164-
""" Called on receipt of each result record.
171+
def next(self):
172+
""" Advance to the next record, if available, and return a boolean
173+
to indicate whether or not the cursor has moved.
165174
"""
166-
self.append(Record(self.keys, tuple(map(hydrated, values))))
175+
if self._next:
176+
values = self._next.popleft()
177+
self._current = Record(self.keys, tuple(map(hydrated, values)))
178+
self._position += 1
179+
return True
180+
elif self._summary:
181+
return False
182+
else:
183+
self._connection.fetch_next()
184+
return self.next()
167185

168-
def on_footer(self, metadata):
169-
""" Called on receipt of the result footer.
186+
def record(self):
187+
""" Return the current record.
170188
"""
171-
self.more = False
172-
self.summary = ResultSummary(self.statement, self.parameters, **metadata)
173-
if self.bench_test:
174-
self.bench_test.end_recv = perf_counter()
189+
return self._current
175190

176-
def on_failure(self, metadata):
177-
""" Called on execution failure.
191+
def position(self):
192+
""" Return the current cursor position.
178193
"""
179-
raise CypherError(metadata)
194+
return self._position
180195

181-
def consume(self):
182-
""" Consume the remainder of this result, triggering all appropriate
183-
callback functions.
196+
def at_end(self):
197+
""" Return ``True`` if at the end of the record stream, ``False``
198+
otherwise.
184199
"""
185-
fetch_next = self.session.connection.fetch_next
186-
while self.more:
187-
fetch_next()
200+
if self._next:
201+
return False
202+
elif self._summary:
203+
return True
204+
else:
205+
self._connection.fetch_next()
206+
return self.at_end()
207+
208+
def records(self):
209+
""" Yield all subsequent records.
210+
"""
211+
while self.next():
212+
yield self.record()
213+
214+
def __getitem__(self, item):
215+
current = self._current
216+
if current is None:
217+
raise TypeError("No current record")
218+
return current[item]
219+
220+
def get(self, item, default=None):
221+
current = self._current
222+
if current is None:
223+
raise TypeError("No current record")
224+
try:
225+
return current[item]
226+
except (IndexError, KeyError):
227+
return default
188228

189229
def summarize(self):
190230
""" Consume the remainder of this result and produce a summary.
191231
192232
:rtype: ResultSummary
193233
"""
194-
self.consume()
195-
return self.summary
234+
self._consume()
235+
return self._summary
236+
237+
def skip(self, records):
238+
""" Move the cursor forward by a number of records.
239+
240+
:arg records: the number of records to step over
241+
"""
242+
if records < 0:
243+
raise ValueError("Cannot skip a negative number of records")
244+
skipped = 0
245+
while skipped < records and self.next():
246+
skipped += 1
247+
return skipped
248+
249+
def first(self):
250+
""" Attempt to navigate to the first record in this stream, returning
251+
``True`` if this is possible, ``False`` otherwise.
252+
"""
253+
if self._position < 0:
254+
return self.next()
255+
else:
256+
return self._position == 0
257+
258+
def single(self):
259+
""" Return ``True`` if able to navigate to first and only record.
260+
"""
261+
return self.first() and self.at_end()
262+
263+
def _consume(self):
264+
# Consume the remainder of this result, triggering all appropriate callback functions.
265+
fetch_next = self._connection.fetch_next
266+
while self._summary is None:
267+
fetch_next()
268+
269+
def _on_header(self, metadata):
270+
# Called on receipt of the result header.
271+
self.keys = metadata["fields"]
272+
if self._bench_test:
273+
self._bench_test.start_recv = perf_counter()
274+
275+
def _on_record(self, values):
276+
# Called on receipt of each result record.
277+
self._next.append(values)
278+
279+
def _on_footer(self, metadata):
280+
# Called on receipt of the result footer.
281+
self._summary = ResultSummary(self.statement, self.parameters, **metadata)
282+
if self._bench_test:
283+
self._bench_test.end_recv = perf_counter()
284+
285+
def _on_failure(self, metadata):
286+
# Called on execution failure.
287+
raise CypherError(metadata)
196288

197289

198290
class ResultSummary(object):
199-
""" A summary of execution returned with a :class:`.Result` object.
291+
""" A summary of execution returned with a :class:`.ResultCursor` object.
200292
"""
201293

202294
#: The statement that was executed to produce this result.
@@ -391,7 +483,7 @@ def run(self, statement, parameters=None):
391483
:param statement: Cypher statement to execute
392484
:param parameters: dictionary of parameters
393485
:return: Cypher result
394-
:rtype: :class:`.Result`
486+
:rtype: :class:`.ResultCursor`
395487
"""
396488

397489
# Ensure the statement is a Unicode value
@@ -411,30 +503,28 @@ def run(self, statement, parameters=None):
411503
t = BenchTest()
412504
t.init = perf_counter()
413505

414-
result = Result(self, statement, parameters)
415-
result.bench_test = t
506+
cursor = ResultCursor(self.connection, statement, parameters)
507+
cursor._bench_test = t
416508

417509
run_response = Response(self.connection)
418-
run_response.on_success = result.on_header
419-
run_response.on_failure = result.on_failure
510+
run_response.on_success = cursor._on_header
511+
run_response.on_failure = cursor._on_failure
420512

421513
pull_all_response = Response(self.connection)
422-
pull_all_response.on_record = result.on_record
423-
pull_all_response.on_success = result.on_footer
424-
pull_all_response.on_failure = result.on_failure
514+
pull_all_response.on_record = cursor._on_record
515+
pull_all_response.on_success = cursor._on_footer
516+
pull_all_response.on_failure = cursor._on_failure
425517

426518
self.connection.append(RUN, (statement, parameters), response=run_response)
427519
self.connection.append(PULL_ALL, response=pull_all_response)
428520
t.start_send = perf_counter()
429521
self.connection.send()
430522
t.end_send = perf_counter()
431523

432-
result.consume()
433-
434524
t.done = perf_counter()
435525
self.bench_tests.append(t)
436526

437-
return result
527+
return cursor
438528

439529
def close(self):
440530
""" If still usable, return this session to the driver pool it came from.

test/tck/tck_util.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99

1010
def send_string(text):
1111
session = driver.session()
12-
result = session.run(text)
12+
cursor = session.run(text)
1313
session.close()
14-
return result
14+
return list(cursor.records())
1515

1616

1717
def send_parameters(statement, parameters):
1818
session = driver.session()
19-
result = session.run(statement, parameters)
19+
cursor = session.run(statement, parameters)
2020
session.close()
21-
return result
21+
return list(cursor.records())
2222

2323

2424
def get_bolt_value(type, value):

test/test_examples.py

+11-9
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def test_minimum_snippet(self):
3737
session = driver.session()
3838
session.run("CREATE (neo:Person {name:'Neo', age:23})")
3939

40-
for record in session.run("MATCH (p:Person) WHERE p.name = 'Neo' RETURN p.age"):
40+
for record in session.run("MATCH (p:Person) WHERE p.name = 'Neo' RETURN p.age").records():
4141
print("Neo is {0} years old.".format(record["p.age"]))
4242
session.close()
4343
#end::minimum-snippet[]
@@ -46,8 +46,8 @@ def test_statement_with_parameters(self):
4646
driver = GraphDatabase.driver("bolt://localhost")
4747
session = driver.session()
4848
#tag::statement[]
49-
result = session.run("CREATE (p:Person { name: {name} })", {"name": "The One"})
50-
ones_created = result.summary.statistics.nodes_created
49+
cursor = session.run("CREATE (p:Person { name: {name} })", {"name": "The One"})
50+
ones_created = cursor.summarize().statistics.nodes_created
5151
print("There were {0} the ones created.".format(ones_created))
5252
#end::statement[]
5353
assert ones_created == 1
@@ -57,8 +57,8 @@ def test_statement_without_parameters(self):
5757
driver = GraphDatabase.driver("bolt://localhost")
5858
session = driver.session()
5959
#tag::statement-without-parameters[]
60-
result = session.run("CREATE (p:Person { name: 'The One' })")
61-
ones_created = result.summary.statistics.nodes_created
60+
cursor = session.run("CREATE (p:Person { name: 'The One' })")
61+
ones_created = cursor.summarize().statistics.nodes_created
6262
print("There were {0} the ones created.".format(ones_created))
6363
#end::statement-without-parameters[]
6464
assert ones_created == 1
@@ -72,8 +72,9 @@ def test_commit_a_transaction(self):
7272
tx.run("CREATE (p:Person { name: 'The One' })")
7373
tx.commit()
7474
#end::transaction-commit[]
75-
res = session.run("MATCH (p:Person { name: 'The One' }) RETURN count(p)")
76-
assert res[0]["count(p)"] == 1
75+
cursor = session.run("MATCH (p:Person { name: 'The One' }) RETURN count(p)")
76+
assert cursor.single()
77+
assert cursor.record()["count(p)"] == 1
7778
session.close()
7879

7980
def test_rollback_a_transaction(self):
@@ -84,8 +85,9 @@ def test_rollback_a_transaction(self):
8485
tx.run("CREATE (p:Person { name: 'The One' })")
8586
tx.rollback()
8687
#end::transaction-rollback[]
87-
res = session.run("MATCH (p:Person { name: 'The One' }) RETURN count(p)")
88-
assert res[0]["count(p)"] == 0
88+
cursor = session.run("MATCH (p:Person { name: 'The One' }) RETURN count(p)")
89+
assert cursor.single()
90+
assert cursor.record()["count(p)"] == 0
8991
session.close()
9092

9193
def test_require_encryption(self):

0 commit comments

Comments
 (0)