@@ -734,6 +734,13 @@ cdef class Message:
734
734
buf.write_ub8(state | TNS_SESSION_STATE_EXPLICIT_BOUNDARY)
735
735
self .conn_impl._session_state_desired = 0
736
736
737
+ cdef int on_out_of_packets(self ) except - 1 :
738
+ """
739
+ Called when an OufOfPackets exception is raised indicating that further
740
+ packets are required to continue processing of this message.
741
+ """
742
+ pass
743
+
737
744
cdef int postprocess(self ) except - 1 :
738
745
pass
739
746
@@ -1307,6 +1314,7 @@ cdef class MessageWithData(Message):
1307
1314
self .cursor_impl._last_row_index = self .row_index - 1
1308
1315
self .cursor_impl._buffer_rowcount = self .row_index
1309
1316
self .bit_vector = NULL
1317
+ self .on_row_completed()
1310
1318
1311
1319
cdef int _process_row_header(self , ReadBuffer buf) except - 1 :
1312
1320
cdef uint32_t num_bytes
@@ -1511,6 +1519,39 @@ cdef class MessageWithData(Message):
1511
1519
continue
1512
1520
self ._write_bind_params_column(buf, var_impl, pos + offset)
1513
1521
1522
+ cdef int on_out_of_packets(self ) except - 1 :
1523
+ """
1524
+ Called when an OufOfPackets exception is raised indicating that further
1525
+ packets are required to continue processing of this message.
1526
+ """
1527
+ cdef ThinVarImpl var_impl
1528
+
1529
+ # when fetching Arrow data, if the column has already been processed
1530
+ # and no saved array already exists, the array is saved so that
1531
+ # subsequent processing will not append to the array further; once the
1532
+ # complete row has been processed, the saved arrays are restored and
1533
+ # processing continues
1534
+ if self .cursor_impl.fetching_arrow:
1535
+ for var_impl in self .cursor_impl.fetch_var_impls:
1536
+ if var_impl._saved_arrow_array is not None :
1537
+ continue
1538
+ elif var_impl._arrow_array.arrow_array.length > self .row_index:
1539
+ var_impl._saved_arrow_array = var_impl._arrow_array
1540
+ var_impl._arrow_array = None
1541
+ var_impl._create_arrow_array()
1542
+
1543
+ cdef int on_row_completed(self ) except - 1 :
1544
+ """
1545
+ Called when a row has been successfully completed. This allows for any
1546
+ saved Arrow arrays to be restored.
1547
+ """
1548
+ cdef ThinVarImpl var_impl
1549
+ if self .cursor_impl.fetching_arrow:
1550
+ for var_impl in self .cursor_impl.fetch_var_impls:
1551
+ if var_impl._saved_arrow_array is not None :
1552
+ var_impl._arrow_array = var_impl._saved_arrow_array
1553
+ var_impl._saved_arrow_array = None
1554
+
1514
1555
cdef int postprocess(self ) except - 1 :
1515
1556
"""
1516
1557
Run any variable out converter functions on all non-null values that
0 commit comments