1
1
import EventEmitter from "events" ;
2
2
import { RedisClientOptions } from "." ;
3
3
import RedisCommandsQueue from "./commands-queue" ;
4
- import RedisSocket , { RedisSocketOptions , RedisTcpSocketOptions } from "./socket" ;
4
+ import RedisSocket from "./socket" ;
5
5
import { RedisArgument } from "../.." ;
6
6
import { isIP } from "net" ;
7
7
import { lookup } from "dns/promises" ;
@@ -25,6 +25,11 @@ export interface SocketTimeoutUpdate {
25
25
timeout ?: number ;
26
26
}
27
27
28
+ export const dbgMaintenance = ( ...args : any [ ] ) => {
29
+ if ( ! process . env . DEBUG_MAINTENANCE ) return ;
30
+ return console . log ( '[MNT]' , ...args ) ;
31
+ }
32
+
28
33
export default class EnterpriseMaintenanceManager extends EventEmitter {
29
34
#commandsQueue: RedisCommandsQueue ;
30
35
#options: RedisClientOptions ;
@@ -47,16 +52,19 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
47
52
case PN . MOVING : {
48
53
const [ _ , afterMs , url ] = push ;
49
54
const [ host , port ] = url . toString ( ) . split ( ":" ) ;
55
+ dbgMaintenance ( 'Received MOVING:' , afterMs , host , Number ( port ) ) ;
50
56
this . #onMoving( afterMs , host , Number ( port ) ) ;
51
57
return true ;
52
58
}
53
59
case PN . MIGRATING :
54
60
case PN . FAILING_OVER : {
61
+ dbgMaintenance ( 'Received MIGRATING|FAILING_OVER' ) ;
55
62
this . #onMigrating( ) ;
56
63
return true ;
57
64
}
58
65
case PN . MIGRATED :
59
66
case PN . FAILED_OVER : {
67
+ dbgMaintenance ( 'Received MIGRATED|FAILED_OVER' ) ;
60
68
this . #onMigrated( ) ;
61
69
return true ;
62
70
}
@@ -83,6 +91,7 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
83
91
) : Promise < void > => {
84
92
// 1 [EVENT] MOVING PN received
85
93
// 2 [ACTION] Pause writing
94
+ dbgMaintenance ( 'Pausing writing of new commands to old socket' ) ;
86
95
this . emit ( MAINTENANCE_EVENTS . PAUSE_WRITING ) ;
87
96
this . #onMigrating( ) ;
88
97
@@ -93,13 +102,18 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
93
102
} ) ;
94
103
//todo
95
104
newSocket . setMaintenanceTimeout ( ) ;
105
+ dbgMaintenance ( `Connecting to new socket: ${ host } :${ port } ` ) ;
96
106
await newSocket . connect ( ) ;
107
+ dbgMaintenance ( `Connected to new socket` ) ;
97
108
// 3 [EVENT] New socket connected
98
109
110
+ dbgMaintenance ( `Wait for all in-flight commands to complete` ) ;
99
111
await this . #commandsQueue. waitForInflightCommandsToComplete ( ) ;
112
+ dbgMaintenance ( `In-flight commands completed` ) ;
100
113
// 4 [EVENT] In-flight commands completed
101
114
102
115
// 5 + 6
116
+ dbgMaintenance ( 'Resume writing' )
103
117
this . emit ( MAINTENANCE_EVENTS . RESUME_WRITING , newSocket ) ;
104
118
this . #onMigrated( ) ;
105
119
} ;
0 commit comments