@@ -150,14 +150,27 @@ def update(self, new_routing_table):
150
150
self .ttl = new_routing_table .ttl
151
151
152
152
153
- class RoutingConnectionPool (ConnectionPool ):
154
- """ Connection pool with routing table.
155
- """
153
+ class RoutingSession (BoltSession ):
156
154
157
155
call_get_servers = "CALL dbms.cluster.routing.getServers"
158
156
get_routing_table_param = "context"
159
157
call_get_routing_table = "CALL dbms.cluster.routing.getRoutingTable({%s})" % get_routing_table_param
160
158
159
+ def routing_info_procedure (self , routing_context ):
160
+ if ServerVersion .from_str (self ._connection .server .version ).at_least_version (3 , 2 ):
161
+ return self .call_get_routing_table , {self .get_routing_table_param : routing_context }
162
+ else :
163
+ return self .call_get_servers , {}
164
+
165
+ def __run__ (self , statement , routing_context ):
166
+ # the statement is ignored as it will be get routing table procedure call.
167
+ return self ._run (* self .routing_info_procedure (routing_context ))
168
+
169
+
170
+ class RoutingConnectionPool (ConnectionPool ):
171
+ """ Connection pool with routing table.
172
+ """
173
+
161
174
def __init__ (self , connector , initial_address , routing_context , * routers ):
162
175
super (RoutingConnectionPool , self ).__init__ (connector )
163
176
self .initial_address = initial_address
@@ -166,12 +179,6 @@ def __init__(self, connector, initial_address, routing_context, *routers):
166
179
self .missing_writer = False
167
180
self .refresh_lock = Lock ()
168
181
169
- def routing_info_procedure (self , connection ):
170
- if ServerVersion .from_str (connection .server .version ).at_least_version (3 , 2 ):
171
- return self .call_get_routing_table , {self .get_routing_table_param : self .routing_context }
172
- else :
173
- return self .call_get_servers , {}
174
-
175
182
def fetch_routing_info (self , address ):
176
183
""" Fetch raw routing info from a given router address.
177
184
@@ -182,15 +189,8 @@ def fetch_routing_info(self, address):
182
189
if routing support is broken
183
190
"""
184
191
try :
185
- connections = [None ]
186
-
187
- def connector (_ ):
188
- connection = self .acquire_direct (address )
189
- connections [0 ] = connection
190
- return connection
191
-
192
- with BoltSession (lambda _ : connector ) as session :
193
- return list (session .run (* self .routing_info_procedure (connections [0 ])))
192
+ with RoutingSession (lambda _ : self .acquire_direct (address )) as session :
193
+ return list (session .run ("ignored" , self .routing_context ))
194
194
except CypherError as error :
195
195
if error .code == "Neo.ClientError.Procedure.ProcedureNotFound" :
196
196
raise ServiceUnavailable ("Server {!r} does not support routing" .format (address ))
0 commit comments