@@ -17,6 +17,30 @@ module.exports = function (dependencies) {
17
17
HTTP2_METHOD_POST
18
18
} = http2 . constants ;
19
19
20
+ const safeCloseSession = ( session , callback ) => {
21
+ if ( session && ! session . destroyed && ! session . _nodeApnIsDestroying ) {
22
+ session . _nodeApnIsDestroying = true ;
23
+ const startDestroying = ( ) => {
24
+ if ( ! session . destroyed ) {
25
+ session . _nodeApnIsDestroying = true ;
26
+ session . destroy ( ) ;
27
+ }
28
+ if ( callback ) {
29
+ callback ( ) ;
30
+ }
31
+ } ;
32
+ if ( session . closed ) {
33
+ startDestroying ( ) ;
34
+ } else {
35
+ session . close ( startDestroying ) ;
36
+ }
37
+ } else {
38
+ if ( callback ) {
39
+ callback ( ) ;
40
+ }
41
+ }
42
+ } ;
43
+
20
44
function Client ( options ) {
21
45
this . config = config ( options ) ;
22
46
this . healthCheckInterval = setInterval ( ( ) => {
@@ -31,115 +55,146 @@ module.exports = function (dependencies) {
31
55
} , this . config . heartBeat ) . unref ( ) ;
32
56
}
33
57
34
- Client . prototype . write = function write ( notification , device , count ) {
35
- // Connect session
36
- if ( ! this . session || this . session . destroyed ) {
37
- this . session = http2 . connect ( `https://${ this . config . address } ` , this . config ) ;
58
+ Client . prototype . _createSession = function ( ) {
59
+ const url = this . _mockOverrideUrl || `https://${ this . config . address } ` ;
60
+ // Get the reference to the current session so that
61
+ // we don't unintentionally destroy a different session on an async callback
62
+ const session = http2 . connect ( url , this . config ) ;
38
63
39
- this . session . on ( "socketError" , ( error ) => {
40
- if ( logger . enabled ) {
41
- logger ( `Socket error: ${ error } ` ) ;
42
- }
43
- if ( this . session && ! this . session . destroyed ) {
44
- this . session . destroy ( ) ;
45
- }
64
+ session . on ( "socketError" , ( error ) => {
65
+ if ( logger . enabled ) {
66
+ logger ( `Socket error: ${ error } ` ) ;
67
+ }
68
+ safeCloseSession ( session ) ;
69
+ } ) ;
70
+ session . on ( "error" , ( error ) => {
71
+ if ( logger . enabled ) {
72
+ logger ( `Session error: ${ error } ` ) ;
73
+ }
74
+ safeCloseSession ( session ) ;
75
+ } ) ;
76
+
77
+ session . on ( "goaway" , ( errorCode , lastStreamId , opaqueData ) => {
78
+ logger ( `GOAWAY received: (errorCode ${ errorCode } , lastStreamId: ${ lastStreamId } , opaqueData: ${ opaqueData } )` ) ;
79
+ // gracefully stop accepting new streams
80
+ // This may be redundant, since nodejs http2 client is supposed to shut down automatically on receiving a goaway frame
81
+ safeCloseSession ( session ) ;
82
+ } ) ;
83
+
84
+ if ( logger . enabled ) {
85
+ session . on ( "connect" , ( ) => {
86
+ logger ( "Session connected" ) ;
46
87
} ) ;
47
- this . session . on ( "error" , ( error ) => {
48
- if ( logger . enabled ) {
49
- logger ( `Session error: ${ error } ` ) ;
50
- }
51
- if ( this . session && ! this . session . destroyed ) {
52
- this . session . destroy ( ) ;
53
- }
88
+ session . on ( "close" , ( ) => {
89
+ logger ( "Session closed" ) ;
54
90
} ) ;
91
+ session . on ( "frameError" , ( frameType , errorCode , streamId ) => {
92
+ logger ( `Frame error: (frameType: ${ frameType } , errorCode ${ errorCode } , streamId: ${ streamId } )` ) ;
93
+ } ) ;
94
+ }
95
+ return session ;
96
+ } ;
55
97
56
- this . session . on ( "goaway" , ( errorCode , lastStreamId , opaqueData ) => {
57
- logger ( `GOAWAY received: (errorCode ${ errorCode } , lastStreamId: ${ lastStreamId } , opaqueData: ${ opaqueData } )` ) ;
58
- // gracefully stop accepting new streams
59
- const session = this . session ;
60
- this . session = undefined ;
61
- if ( session && ! session . destroyed ) {
62
- session . close ( ( ) => {
63
- session . destroy ( ) ;
64
- } ) ;
65
- }
66
- } ) ;
67
-
68
- if ( logger . enabled ) {
69
- this . session . on ( "connect" , ( ) => {
70
- logger ( "Session connected" ) ;
71
- } ) ;
72
- this . session . on ( "close" , ( ) => {
73
- logger ( "Session closed" ) ;
74
- } ) ;
75
- this . session . on ( "frameError" , ( frameType , errorCode , streamId ) => {
76
- logger ( `Frame error: (frameType: ${ frameType } , errorCode ${ errorCode } , streamId: ${ streamId } )` ) ;
77
- } ) ;
98
+ /**
99
+ * @param {Notification } notification the notification data to send through APNs
100
+ * @param {string } device the device token
101
+ * @param {number } [count] the number of retries that have occurred so far
102
+ * @returns {Promise<{device:string, error?: VError}> } object with device, optional error.
103
+ */
104
+ Client . prototype . write = function write ( notification , device , count = 0 ) {
105
+ return new Promise ( ( resolve ) => {
106
+ // Connect session
107
+ if ( ! this . session || this . session . destroyed || this . session . _nodeApnIsDestroying ) {
108
+ logger ( 'creating a new APNs session' ) ;
109
+ this . session = this . _createSession ( ) ;
78
110
}
79
- }
80
111
81
- let tokenGeneration = null ;
82
- let status = null ;
83
- let responseData = "" ;
84
- let retryCount = count || 0 ;
85
-
86
- const headers = extend ( {
87
- [ HTTP2_HEADER_SCHEME ] : "https" ,
88
- [ HTTP2_HEADER_METHOD ] : HTTP2_METHOD_POST ,
89
- [ HTTP2_HEADER_AUTHORITY ] : this . config . address ,
90
- [ HTTP2_HEADER_PATH ] : `/3/device/${ device } ` ,
91
- } , notification . headers ) ;
92
-
93
- if ( this . config . token ) {
94
- if ( this . config . token . isExpired ( 3300 ) ) {
95
- this . config . token . regenerate ( this . config . token . generation ) ;
112
+ let tokenGeneration = null ;
113
+ let status = null ;
114
+ let responseData = "" ;
115
+ let retryCount = count || 0 ;
116
+
117
+ const headers = extend ( {
118
+ [ HTTP2_HEADER_SCHEME ] : "https" ,
119
+ [ HTTP2_HEADER_METHOD ] : HTTP2_METHOD_POST ,
120
+ [ HTTP2_HEADER_AUTHORITY ] : this . config . address ,
121
+ [ HTTP2_HEADER_PATH ] : `/3/device/${ device } ` ,
122
+ } , notification . headers ) ;
123
+
124
+ if ( this . config . token ) {
125
+ if ( this . config . token . isExpired ( 3300 ) ) {
126
+ this . config . token . regenerate ( this . config . token . generation ) ;
127
+ }
128
+ headers . authorization = `bearer ${ this . config . token . current } ` ;
129
+ tokenGeneration = this . config . token . generation ;
96
130
}
97
- headers . authorization = `bearer ${ this . config . token . current } ` ;
98
- tokenGeneration = this . config . token . generation ;
99
- }
131
+ const currentSession = this . session ;
100
132
101
- const request = this . session . request ( headers )
133
+ const request = currentSession . request ( headers )
134
+ const timeout = this . config . timeout || 10000 ;
102
135
103
- request . setEncoding ( "utf8" ) ;
136
+ request . setTimeout ( timeout , ( ) => {
137
+ const errorMessage = `Forcibly closing connection to APNs after reaching the request timeout of ${ timeout } milliseconds` ;
138
+ // The first call to resolve will be what the promise resolves to.
139
+ resolve ( { device, error : new VError ( errorMessage ) } ) ;
140
+ if ( currentSession !== this . session ) {
141
+ return ;
142
+ }
143
+ if ( currentSession . destroyed ) {
144
+ return ;
145
+ }
146
+ logger ( errorMessage ) ;
147
+ safeCloseSession ( currentSession ) ;
148
+ this . session = null ;
149
+ } ) ;
104
150
105
- request . on ( "response" , ( headers ) => {
106
- status = headers [ HTTP2_HEADER_STATUS ] ;
107
- } ) ;
151
+ request . setEncoding ( "utf8" ) ;
108
152
109
- request . on ( "data " , ( data ) => {
110
- responseData += data ;
111
- } ) ;
153
+ request . on ( "response " , ( headers ) => {
154
+ status = headers [ HTTP2_HEADER_STATUS ] ;
155
+ } ) ;
112
156
113
- request . write ( notification . body ) ;
157
+ request . on ( "data" , ( data ) => {
158
+ responseData += data ;
159
+ } ) ;
160
+
161
+ request . write ( notification . body ) ;
114
162
115
- return new Promise ( resolve => {
116
163
request . on ( "end" , ( ) => {
117
- if ( logger . enabled ) {
118
- logger ( `Request ended with status ${ status } and responseData: ${ responseData } ` ) ;
119
- }
164
+ try {
165
+ if ( logger . enabled ) {
166
+ logger ( `Request ended with status ${ status } and responseData: ${ responseData } ` ) ;
167
+ }
120
168
121
- if ( status === 200 ) {
122
- resolve ( { device } ) ;
123
- } else if ( responseData !== "" ) {
124
- const response = JSON . parse ( responseData ) ;
125
-
126
- if ( status === 403 && response . reason === "ExpiredProviderToken" && retryCount < 2 ) {
127
- this . config . token . regenerate ( tokenGeneration ) ;
128
- resolve ( this . write ( notification , device , retryCount + 1 ) ) ;
129
- return ;
130
- } else if ( status === 500 && response . reason === "InternalServerError" ) {
131
- this . session . destroy ( ) ;
132
- let error = new VError ( "Error 500, stream ended unexpectedly" ) ;
169
+ if ( status === 200 ) {
170
+ resolve ( { device } ) ;
171
+ } else if ( responseData !== "" ) {
172
+ const response = JSON . parse ( responseData ) ;
173
+
174
+ if ( status === 403 && response . reason === "ExpiredProviderToken" && retryCount < 2 ) {
175
+ this . config . token . regenerate ( tokenGeneration ) ;
176
+ resolve ( this . write ( notification , device , retryCount + 1 ) ) ;
177
+ return ;
178
+ } else if ( status === 500 && response . reason === "InternalServerError" ) {
179
+ let error = new VError ( "Error 500, stream ended unexpectedly" ) ;
180
+ resolve ( { device, error } ) ;
181
+
182
+ safeCloseSession ( currentSession ) ;
183
+ this . session = null ;
184
+ return ;
185
+ }
186
+
187
+ resolve ( { device, status, response } ) ;
188
+ } else {
189
+ let error = new VError ( "stream ended unexpectedly" ) ;
133
190
resolve ( { device, error } ) ;
134
- return ;
135
191
}
136
-
137
- resolve ( { device, status, response } ) ;
138
- } else {
139
- let error = new VError ( "stream ended unexpectedly" ) ;
192
+ } catch ( e ) {
193
+ const error = new VError ( e , 'Unexpected error processing APNs response' ) ;
194
+ logger ( `Unexpected error processing APNs response: ${ e . message } ` ) ;
140
195
resolve ( { device, error } ) ;
141
196
}
142
- } )
197
+ } ) ;
143
198
144
199
request . on ( "error" , ( error ) => {
145
200
if ( logger . enabled ) {
@@ -159,16 +214,17 @@ module.exports = function (dependencies) {
159
214
} ;
160
215
161
216
Client . prototype . shutdown = function shutdown ( callback ) {
217
+ logger ( 'Called client.shutdown()' ) ;
162
218
if ( this . healthCheckInterval ) {
163
219
clearInterval ( this . healthCheckInterval ) ;
164
220
}
165
- if ( this . session && ! this . session . destroyed ) {
166
- this . session . close ( ( ) => {
167
- this . session . destroy ( ) ;
168
- if ( callback ) {
169
- callback ( ) ;
170
- }
171
- } ) ;
221
+ if ( this . session ) {
222
+ safeCloseSession ( this . session , callback ) ;
223
+ this . session = null ;
224
+ } else {
225
+ if ( callback ) {
226
+ callback ( ) ;
227
+ }
172
228
}
173
229
} ;
174
230
0 commit comments