Skip to content

Commit 880cea8

Browse files
committed
wip: almost fixed now
Just need to check for stream state change after connection receives a packet. Ideally we only check finishing streams. * Related #10 [ci skip]
1 parent 76c144f commit 880cea8

File tree

5 files changed

+109
-42
lines changed

5 files changed

+109
-42
lines changed

src/QUICClient.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ class QUICClient extends EventTarget {
147147
},
148148
config: quicConfig,
149149
logger: logger.getChild(
150-
`${QUICConnection.name} ${scid.toString().slice(32)}`,
150+
`${QUICConnection.name} ${scid.toString().slice(32)}-${Math.floor(Math.random() * 100)}`,
151151
),
152152
});
153153
connection.addEventListener('error', handleConnectionError, { once: true });

src/QUICConnection.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ class QUICConnection extends EventTarget {
343343
try {
344344
this.logger.debug(`Did a recv ${data.byteLength}`);
345345
this.conn.recv(data, recvInfo);
346+
this.logger.info(`RECEIVED ${data.byteLength} of data`)
346347
} catch (e) {
347348
this.logger.error(`recv error ${e.message}`);
348349
// Depending on the exception, the `this.conn.recv`
@@ -385,14 +386,18 @@ class QUICConnection extends EventTarget {
385386
!this.conn.isDraining() &&
386387
(this.conn.isInEarlyData() || this.conn.isEstablished())
387388
) {
389+
this.logger.info('JKASDFHKSJDHFJKSDHFJK');
390+
QUICStream.logStreamState(0 as StreamId, this.conn, this.logger);
388391
for (const streamId of this.conn.readable() as Iterable<StreamId>) {
389392
let quicStream = this.streamMap.get(streamId);
393+
this.logger.info(`Checking stream readable ${streamId}`);
394+
QUICStream.logStreamState(streamId, this.conn, this.logger);
390395
if (quicStream == null) {
391396
// The creation will set itself to the stream map
392397
quicStream = await QUICStream.createQUICStream({
393398
streamId,
394399
connection: this,
395-
logger: this.logger.getChild(`${QUICStream.name} ${streamId}`),
400+
logger: this.logger.getChild(`${QUICStream.name} ${streamId}-${Math.floor(Math.random() * 100)}`),
396401
});
397402
this.dispatchEvent(
398403
new events.QUICConnectionStreamEvent({ detail: quicStream }),
@@ -401,13 +406,15 @@ class QUICConnection extends EventTarget {
401406
quicStream.read();
402407
}
403408
for (const streamId of this.conn.writable() as Iterable<StreamId>) {
409+
this.logger.info(`Checking stream writable ${streamId}`);
410+
QUICStream.logStreamState(streamId, this.conn, this.logger);
404411
let quicStream = this.streamMap.get(streamId);
405412
if (quicStream == null) {
406413
// The creation will set itself to the stream map
407414
quicStream = await QUICStream.createQUICStream({
408415
streamId,
409416
connection: this,
410-
logger: this.logger.getChild(`${QUICStream.name} ${streamId}`),
417+
logger: this.logger.getChild(`${QUICStream.name} ${streamId}-${Math.floor(Math.random() * 100)}`),
411418
});
412419
this.dispatchEvent(
413420
new events.QUICConnectionStreamEvent({ detail: quicStream }),
@@ -530,6 +537,7 @@ class QUICConnection extends EventTarget {
530537
sendInfo.to.port,
531538
sendInfo.to.host,
532539
);
540+
this.logger.info(`SENT ${sendLength} of data`)
533541
} catch (e) {
534542
this.logger.error(`send error ${e.message}`);
535543
this.dispatchEvent(
@@ -601,7 +609,7 @@ class QUICConnection extends EventTarget {
601609
const quicStream = await QUICStream.createQUICStream({
602610
streamId: streamId!,
603611
connection: this,
604-
logger: this.logger.getChild(`${QUICStream.name} ${streamId!}`),
612+
logger: this.logger.getChild(`${QUICStream.name} ${streamId!}-${Math.floor(Math.random() * 100)}`),
605613
});
606614
const writer = quicStream.writable.getWriter();
607615

src/QUICServer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ class QUICServer extends EventTarget {
265265
remoteInfo,
266266
config: this.config,
267267
logger: this.logger.getChild(
268-
`${QUICConnection.name} ${scid.toString().slice(32)}`,
268+
`${QUICConnection.name} ${scid.toString().slice(32)}-${Math.floor(Math.random() * 100)}`,
269269
),
270270
});
271271

src/QUICStream.ts

Lines changed: 91 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ class QUICStream
4848
protected _recvClosed: boolean = false;
4949
protected _recvPaused: boolean = false;
5050
protected resolveWritableP?: () => void;
51-
public readonly finishedP: Promise<void>;
52-
protected resolveFinishedP: () => void;
51+
// This resolves when `streamSend` would result in a `StreamStopped(u64)` error indicating sending has ended
52+
protected sendFinishedProm = utils.promise<void>()
53+
// This resolves when `streamRecv` results in a `StreamReset(u64)` or a fin flag indicating receiving has ended
54+
protected recvFinishedProm = utils.promise<void>()
5355

5456
/**
5557
* For `reasonToCode`, return 0 means "unknown reason"
@@ -106,12 +108,6 @@ class QUICStream
106108
this.streamMap = connection.streamMap;
107109
this.reasonToCode = reasonToCode;
108110
this.codeToReason = codeToReason;
109-
const {
110-
p: finishedP,
111-
resolveP: resolveFinishedP
112-
} = utils.promise<void>();
113-
this.finishedP = finishedP;
114-
this.resolveFinishedP = resolveFinishedP;
115111

116112
// Try the BYOB later, it seems more performant
117113

@@ -218,12 +214,17 @@ class QUICStream
218214
await this.closeSend(true, e);
219215
}
220216
}
221-
await this.streamSend(new Uint8Array(0), true);
217+
// await this.streamSend(new Uint8Array(0), true).catch(e => console.error(e));
218+
QUICStream.logStreamState(this.streamId, this.conn, this.logger);
219+
this.logger.info('Waiting for FINISH')
220+
await Promise.all([
221+
this.sendFinishedProm.p.then(() => console.log('send finished')),
222+
this.recvFinishedProm.p.then(() => console.log('recv finished')),
223+
])
224+
this.logger.info('DONE waiting for FINISH')
225+
QUICStream.logStreamState(this.streamId, this.conn, this.logger);
222226
this.streamMap.delete(this.streamId);
223-
this.logger.info(`finished ${this.conn.streamFinished(this.streamId)}`);
224227
// We need to wait for the connection to finish before fully destroying
225-
this.logger.info('Waiting for finish')
226-
await this.finishedP;
227228
this.dispatchEvent(new events.QUICStreamDestroyEvent());
228229
this.logger.info(`Destroyed ${this.constructor.name}`);
229230
}
@@ -234,13 +235,34 @@ class QUICStream
234235
*/
235236
@ready(new errors.ErrorQUICStreamDestroyed(), false, ['destroying'])
236237
public read(): void {
237-
// TODO: check if the readable has errored out?
238-
this.logger.info(`finished read ${this.conn.streamFinished(this.streamId)}`);
238+
// After reading it's possible the writer had a state change.
239+
try {
240+
this.conn.streamWritable(this.streamId, 0)
241+
this.logger.info('writable still good')
242+
} catch (e) {
243+
this.logger.info(e.message);
244+
// const reason = await this.processSendStreamError(e, 'send');
245+
// If the writable has ended, we need to close the writable.
246+
this.logger.info('writable has ended')
247+
this.sendFinishedProm.resolveP();
248+
if (!this._sendClosed) {
249+
const err = Error('reason');
250+
this.writableController.error(err)
251+
void this.closeSend(true, err);
252+
}
253+
}
239254
if (this.conn.streamFinished(this.streamId)) {
240-
this.resolveFinishedP();
255+
// Stream has finished
256+
this.recvFinishedProm.resolveP();
257+
if (!this._recvClosed) {
258+
const err = Error('TMP ERROR');
259+
this.readableController.error(err)
260+
void this.closeRecv(true, err);
261+
}
241262
}
242263
if (this._recvPaused) {
243264
// Do nothing if we are paused
265+
this.logger.info('Skipping read, paused');
244266
return;
245267
}
246268
void this.streamRecv();
@@ -252,18 +274,28 @@ class QUICStream
252274
*/
253275
@ready(new errors.ErrorQUICStreamDestroyed(), false, ['destroying'])
254276
public write(): void {
255-
this.logger.info(`finished write ${this.conn.streamFinished(this.streamId)}`);
256-
if (this.conn.streamFinished(this.streamId)) {
257-
this.resolveFinishedP();
258-
}
259277
try {
260278
this.conn.streamWritable(this.streamId, 0)
261279
} catch (e) {
262280
this.logger.info(e.message);
263281
// const reason = await this.processSendStreamError(e, 'send');
264282
// If the writable has ended, we need to close the writable.
265-
this.writableController.error(Error('TMP ERROR'))
266-
void this.closeSend();
283+
this.logger.info('writable has ended')
284+
this.sendFinishedProm.resolveP();
285+
if (!this._sendClosed) {
286+
const err = Error('TMP ERROR');
287+
this.writableController.error(err)
288+
void this.closeSend(true, err);
289+
}
290+
}
291+
if (this.conn.streamFinished(this.streamId)) {
292+
// Stream has finished
293+
this.recvFinishedProm.resolveP();
294+
if (!this._recvClosed) {
295+
const err = Error('TMP ERROR');
296+
this.readableController.error(err)
297+
void this.closeRecv(true, err);
298+
}
267299
}
268300
if (this.resolveWritableP != null) {
269301
this.resolveWritableP();
@@ -273,6 +305,7 @@ class QUICStream
273305
protected async streamRecv(): Promise<void> {
274306
const buf = Buffer.alloc(1024);
275307
let recvLength: number, fin: boolean;
308+
this.logger.info('trying receiving');
276309
try {
277310
[recvLength, fin] = this.conn.streamRecv(this.streamId, buf);
278311
} catch (e) {
@@ -285,18 +318,18 @@ class QUICStream
285318
// or through an exception here where the stream reports an error
286319
// Since we don't call this method unless it is readable
287320
// This should never be reported... (this branch should be dead code)
288-
this.logger.debug('Stream reported: done');
321+
this.logger.info('Stream reported: done');
289322
return;
290323
} else {
291-
this.logger.debug('Stream reported: error');
292-
const match = e.message.match(/StreamReset\((.+)\)/);
293-
if (match != null) {
324+
this.logger.info('Stream reported: error');
325+
// Signal receiving has ended
326+
this.recvFinishedProm.resolveP();
327+
const reason = await this.processSendStreamError(e, 'recv');
328+
if (reason != null) {
294329
// If it is `StreamReset(u64)` error, then the peer has closed
295330
// the stream, and we are receiving the error code
296-
const code = parseInt(match[1]);
297-
const reason = await this.codeToReason('recv', code);
298331
this.readableController.error(reason);
299-
await this.closeRecv();
332+
await this.closeRecv(true, reason);
300333
} else {
301334
// If it is not a `StreamReset(u64)`, then something else broke
302335
// and we need to propagate the error up and down the stream
@@ -320,6 +353,8 @@ class QUICStream
320353
this.logger.info('Stream reported: fin');
321354
if (!this._recvClosed) this.readableController.close();
322355
await this.closeRecv();
356+
// Signal receiving has ended
357+
this.recvFinishedProm.resolveP();
323358
return;
324359
}
325360
// Now we pause receiving if the queue is full
@@ -351,6 +386,8 @@ class QUICStream
351386
// This ensures that we are always blocked below.
352387
sentLength = -1;
353388
} else {
389+
// Signal sending has ended
390+
this.sendFinishedProm.resolveP();
354391
// We may receive a `StreamStopped(u64)` exception
355392
// meaning the peer has signalled for us to stop writing
356393
// If this occurs, we need to go back to the writable stream
@@ -360,7 +397,7 @@ class QUICStream
360397
const reason = await this.processSendStreamError(e, 'send');
361398
if (reason != null) {
362399
// We have to close the send side (but the stream is already closed)
363-
await this.closeSend();
400+
await this.closeSend(true, e);
364401
// Throws the exception back to the writer
365402
throw reason;
366403
} else {
@@ -410,13 +447,13 @@ class QUICStream
410447
if (e.message !== 'Done') throw e;
411448
}
412449
await this.connection.send();
450+
QUICStream.logStreamState(this.streamId, this.conn, this.logger);
413451
if (this[status] !== 'destroying' && this._recvClosed && this._sendClosed) {
414452
// Only destroy if we are not already destroying
415453
// and that both recv and send is closed
416-
await this.destroy();
454+
void this.destroy();
417455
}
418456
this.logger.info(`Closed Recv`);
419-
this.logger.info(`finished ${this.conn.streamFinished(this.streamId)}`);
420457
}
421458

422459
/**
@@ -448,10 +485,9 @@ class QUICStream
448485
if (this[status] !== 'destroying' && this._recvClosed && this._sendClosed) {
449486
// Only destroy if we are not already destroying
450487
// and that both recv and send is closed
451-
await this.destroy();
488+
void this.destroy();
452489
}
453490
this.logger.info(`Closed Send`);
454-
this.logger.info(`finished ${this.conn.streamFinished(this.streamId)}`);
455491
}
456492

457493
protected async processSendStreamError(e: Error, type: 'recv' | 'send' ): Promise<any | null> {
@@ -464,6 +500,28 @@ class QUICStream
464500
}
465501
return null;
466502
}
503+
504+
public static logStreamState(streamId: StreamId, conn: Connection, logger: Logger) {
505+
let message = 'STATE:';
506+
try {
507+
conn.streamWritable(streamId, 0);
508+
message += 'W'
509+
} catch (e) {
510+
message += '!W'
511+
}
512+
try {
513+
if (conn.streamReadable(streamId)) message += 'R';
514+
else message += '!R';
515+
} catch (e) {
516+
message += '!R'
517+
}
518+
if (conn.streamFinished(streamId)) {
519+
message += 'F'
520+
}else {
521+
message += '!F'
522+
}
523+
logger.info(message);
524+
}
467525
}
468526

469527
export default QUICStream;

tests/QUICStream.test.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,19 +207,20 @@ describe(QUICStream.name, () => {
207207
console.log('a');
208208
await Promise.race([
209209
streamEndedProm.p,
210-
sleep(5000).then(() => {
211-
throw Error("Creation timed out");
210+
sleep(1000).then(() => {
211+
throw Error("Ending timed out");
212212
}),
213213
])
214214
console.log('a');
215215
expect(streamCreatedCount).toBe(streamsNum);
216216
expect(streamEndedCount).toBe(streamsNum);
217217
console.log('TEST DONE')
218218
} catch (e) {
219-
console.error(e);
219+
console.log(e.message);
220+
throw e;
220221
} finally {
221222
console.log('Clean up');
222-
await sleep(5000);
223+
await sleep(1000);
223224
console.log('cleaning client');
224225
await client?.destroy({ force: true });
225226
console.log('cleaning server');

0 commit comments

Comments
 (0)