@@ -516,9 +516,20 @@ async def execute_command(self, *args, **options):
516
516
command_name = args [0 ]
517
517
conn = self .connection or await pool .get_connection (command_name , ** options )
518
518
519
- return await asyncio .shield (
520
- self ._try_send_command_parse_response (conn , * args , ** options )
521
- )
519
+ if self .single_connection_client :
520
+ await self ._single_conn_lock .acquire ()
521
+ try :
522
+ return await conn .retry .call_with_retry (
523
+ lambda : self ._send_command_parse_response (
524
+ conn , command_name , * args , ** options
525
+ ),
526
+ lambda error : self ._disconnect_raise (conn , error ),
527
+ )
528
+ finally :
529
+ if self .single_connection_client :
530
+ self ._single_conn_lock .release ()
531
+ if not self .connection :
532
+ await pool .release (conn )
522
533
523
534
async def parse_response (
524
535
self , connection : Connection , command_name : Union [str , bytes ], ** options
@@ -757,18 +768,10 @@ async def _disconnect_raise_connect(self, conn, error):
757
768
is not a TimeoutError. Otherwise, try to reconnect
758
769
"""
759
770
await conn .disconnect ()
760
-
761
771
if not (conn .retry_on_timeout and isinstance (error , TimeoutError )):
762
772
raise error
763
773
await conn .connect ()
764
774
765
- async def _try_execute (self , conn , command , * arg , ** kwargs ):
766
- try :
767
- return await command (* arg , ** kwargs )
768
- except asyncio .CancelledError :
769
- await conn .disconnect ()
770
- raise
771
-
772
775
async def _execute (self , conn , command , * args , ** kwargs ):
773
776
"""
774
777
Connect manually upon disconnection. If the Redis server is down,
@@ -777,11 +780,9 @@ async def _execute(self, conn, command, *args, **kwargs):
777
780
called by the # connection to resubscribe us to any channels and
778
781
patterns we were previously listening to
779
782
"""
780
- return await asyncio .shield (
781
- conn .retry .call_with_retry (
782
- lambda : self ._try_execute (conn , command , * args , ** kwargs ),
783
- lambda error : self ._disconnect_raise_connect (conn , error ),
784
- )
783
+ return await conn .retry .call_with_retry (
784
+ lambda : command (* args , ** kwargs ),
785
+ lambda error : self ._disconnect_raise_connect (conn , error ),
785
786
)
786
787
787
788
async def parse_response (self , block : bool = True , timeout : float = 0 ):
@@ -799,7 +800,9 @@ async def parse_response(self, block: bool = True, timeout: float = 0):
799
800
await conn .connect ()
800
801
801
802
read_timeout = None if block else timeout
802
- response = await self ._execute (conn , conn .read_response , timeout = read_timeout )
803
+ response = await self ._execute (
804
+ conn , conn .read_response , timeout = read_timeout , disconnect_on_error = False
805
+ )
803
806
804
807
if conn .health_check_interval and response == self .health_check_response :
805
808
# ignore the health check message as user might not expect it
@@ -1183,18 +1186,6 @@ async def _disconnect_reset_raise(self, conn, error):
1183
1186
await self .reset ()
1184
1187
raise
1185
1188
1186
- async def _try_send_command_parse_response (self , conn , * args , ** options ):
1187
- try :
1188
- return await conn .retry .call_with_retry (
1189
- lambda : self ._send_command_parse_response (
1190
- conn , args [0 ], * args , ** options
1191
- ),
1192
- lambda error : self ._disconnect_reset_raise (conn , error ),
1193
- )
1194
- except asyncio .CancelledError :
1195
- await conn .disconnect ()
1196
- raise
1197
-
1198
1189
async def immediate_execute_command (self , * args , ** options ):
1199
1190
"""
1200
1191
Execute a command immediately, but don't auto-retry on a
@@ -1210,8 +1201,12 @@ async def immediate_execute_command(self, *args, **options):
1210
1201
command_name , self .shard_hint
1211
1202
)
1212
1203
self .connection = conn
1213
- return await asyncio .shield (
1214
- self ._try_send_command_parse_response (conn , * args , ** options )
1204
+
1205
+ return await conn .retry .call_with_retry (
1206
+ lambda : self ._send_command_parse_response (
1207
+ conn , command_name , * args , ** options
1208
+ ),
1209
+ lambda error : self ._disconnect_reset_raise (conn , error ),
1215
1210
)
1216
1211
1217
1212
def pipeline_execute_command (self , * args , ** options ):
@@ -1379,19 +1374,6 @@ async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
1379
1374
await self .reset ()
1380
1375
raise
1381
1376
1382
- async def _try_execute (self , conn , execute , stack , raise_on_error ):
1383
- try :
1384
- return await conn .retry .call_with_retry (
1385
- lambda : execute (conn , stack , raise_on_error ),
1386
- lambda error : self ._disconnect_raise_reset (conn , error ),
1387
- )
1388
- except asyncio .CancelledError :
1389
- # not supposed to be possible, yet here we are
1390
- await conn .disconnect (nowait = True )
1391
- raise
1392
- finally :
1393
- await self .reset ()
1394
-
1395
1377
async def execute (self , raise_on_error : bool = True ):
1396
1378
"""Execute all the commands in the current pipeline"""
1397
1379
stack = self .command_stack
@@ -1413,11 +1395,10 @@ async def execute(self, raise_on_error: bool = True):
1413
1395
conn = cast (Connection , conn )
1414
1396
1415
1397
try :
1416
- return await asyncio .shield (
1417
- self ._try_execute (conn , execute , stack , raise_on_error )
1398
+ return await conn .retry .call_with_retry (
1399
+ lambda : execute (conn , stack , raise_on_error ),
1400
+ lambda error : self ._disconnect_raise_reset (conn , error ),
1418
1401
)
1419
- except RuntimeError :
1420
- await self .reset ()
1421
1402
finally :
1422
1403
await self .reset ()
1423
1404
0 commit comments