Skip to content

Commit 8f70698

Browse files
committed
feat: reimplementing hole punch logic
* Related #527 [ci skip]
1 parent 3a12344 commit 8f70698

File tree

5 files changed

+122
-30
lines changed

5 files changed

+122
-30
lines changed

src/nodes/NodeConnectionManager.ts

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ import type { RPCStream } from '../rpc/types';
2121
import type { TLSConfig } from '../network/types';
2222
import type { QuicConfig } from './types';
2323
import type { ServerCrypto, events as QuicEvents } from '@matrixai/quic';
24+
import type { PromiseCancellable } from '@matrixai/async-cancellable';
2425
import { withF } from '@matrixai/resources';
2526
import Logger from '@matrixai/logger';
2627
import { ready, StartStop } from '@matrixai/async-init/dist/StartStop';
2728
import { IdInternal } from '@matrixai/id';
2829
import { Lock, LockBox } from '@matrixai/async-locks';
2930
import { Timer } from '@matrixai/timer';
3031
import { timedCancellable, context } from '@matrixai/contexts/dist/decorators';
31-
import { PromiseCancellable } from '@matrixai/async-cancellable';
3232
import { QUICServer } from '@matrixai/quic';
3333
import NodeConnection from './NodeConnection';
3434
import * as nodesUtils from './utils';
@@ -598,7 +598,7 @@ class NodeConnectionManager {
598598
this.logger.debug(
599599
`establishing single connection for address ${address.host}:${address.port}`,
600600
);
601-
const iceProm = this.initiateIce();
601+
const iceProm = this.initiateHolePunch(nodeIds, ctx);
602602
const connection =
603603
await NodeConnection.createNodeConnection<AgentClientManifest>(
604604
{
@@ -624,7 +624,7 @@ class NodeConnectionManager {
624624
throw e;
625625
})
626626
.finally(async () => {
627-
iceProm.cancel();
627+
iceProm.cancel('Connection was established');
628628
await iceProm;
629629
});
630630
// 2. if established then add to result map
@@ -1425,11 +1425,50 @@ class NodeConnectionManager {
14251425
this.nodesBackoffMap.delete(nodeId.toString());
14261426
}
14271427

1428-
protected initiateIce(): PromiseCancellable<void> {
1429-
// TODO: this is a placeholder for ICE operation
1430-
return new PromiseCancellable<void>((resolve) => {
1431-
resolve();
1432-
});
1428+
/**
1429+
* This attempts the NAT hole punch procedure. It will return a
1430+
* `PromiseCancellable` that will resolve once the procedure times out, is
1431+
* cancelled or the other end responds.
1432+
*
1433+
* This is pretty simple, it will contact all known seed nodes and get them to
1434+
* relay a punch signal message.
1435+
*
1436+
* Note: Avoid using a large set of target nodes, It could trigger a large
1437+
* amount of pings to a single target.
1438+
*/
1439+
protected initiateHolePunch(
1440+
targetNodeIds: Array<NodeId>,
1441+
ctx?: Partial<ContextTimedInput>,
1442+
): PromiseCancellable<void>;
1443+
@timedCancellable(true)
1444+
protected async initiateHolePunch(
1445+
targetNodeIds: Array<NodeId>,
1446+
@context ctx: ContextTimed,
1447+
): Promise<void> {
1448+
const seedNodes = this.getSeedNodes();
1449+
const allProms: Array<Promise<Array<void>>> = [];
1450+
for (const targetNodeId of targetNodeIds) {
1451+
if (!this.isSeedNode(targetNodeId)) {
1452+
const holePunchProms = seedNodes.map((seedNodeId) => {
1453+
return (
1454+
this.sendSignalingMessage(
1455+
seedNodeId,
1456+
this.keyRing.getNodeId(),
1457+
targetNodeId,
1458+
undefined,
1459+
ctx,
1460+
)
1461+
// Ignore results
1462+
.then(
1463+
() => {},
1464+
() => {},
1465+
)
1466+
);
1467+
});
1468+
allProms.push(Promise.all(holePunchProms));
1469+
}
1470+
}
1471+
await Promise.all(allProms).catch();
14331472
}
14341473
}
14351474

src/rpc/RPCClient.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -257,15 +257,12 @@ class RPCClient<M extends ClientManifest> {
257257
const abortController = new AbortController();
258258
const signal = abortController.signal;
259259
// A promise that will reject if there is an abort signal or timeout
260-
const abortRaceProm = promise<never>();
261260
// Prevent unhandled rejection when we're done with the promise
262-
abortRaceProm.p.catch(() => {});
263261
let abortHandler: () => void;
264262
if (ctx.signal != null) {
265263
// Propagate signal events
266264
abortHandler = () => {
267265
abortController.abort(ctx.signal?.reason);
268-
abortRaceProm.rejectP(ctx.signal?.reason);
269266
};
270267
if (ctx.signal.aborted) abortHandler();
271268
ctx.signal.addEventListener('abort', abortHandler);
@@ -288,21 +285,25 @@ class RPCClient<M extends ClientManifest> {
288285
void timer.then(
289286
() => {
290287
abortController.abort(timeoutError);
291-
abortRaceProm.rejectP(timeoutError);
292288
},
293289
() => {}, // Ignore cancellation error
294290
);
295291
// Hooking up agnostic stream side
296292
let rpcStream: RPCStream<Uint8Array, Uint8Array>;
297293
try {
298-
rpcStream = await Promise.race([
299-
this.streamFactory({ signal, timer }),
300-
abortRaceProm.p,
301-
]);
294+
rpcStream = await this.streamFactory({ signal, timer });
302295
} catch (e) {
303296
cleanUp();
304297
throw e;
305298
}
299+
const cancelStream = () => {
300+
rpcStream.cancel(signal.reason);
301+
};
302+
if (signal.aborted) {
303+
cancelStream();
304+
} else {
305+
signal.addEventListener('abort', cancelStream);
306+
}
306307
// Setting up event for stream timeout
307308
void timer.then(
308309
() => {
@@ -345,6 +346,7 @@ class RPCClient<M extends ClientManifest> {
345346
.catch(() => {}),
346347
]).finally(() => {
347348
cleanUp();
349+
signal.removeEventListener('abort', cancelStream);
348350
});
349351

350352
// Returning interface

src/rpc/RPCServer.ts

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { ReadableStreamDefaultReadResult } from 'stream/web';
12
import type {
23
ClientHandlerImplementation,
34
DuplexHandlerImplementation,
@@ -442,7 +443,6 @@ class RPCServer extends EventTarget {
442443
});
443444
};
444445
abortController.signal.addEventListener('abort', handleAbort);
445-
446446
const prom = (async () => {
447447
const headTransformStream = rpcUtilsMiddleware.binaryToJsonMessageStream(
448448
rpcUtils.parseJSONRPCRequest,
@@ -469,30 +469,80 @@ class RPCServer extends EventTarget {
469469
await inputStream.cancel(reason);
470470
await rpcStream.writable.abort(reason);
471471
await inputStreamEndProm;
472+
timer.cancel(cleanupReason);
473+
graceTimer?.cancel(cleanupReason);
474+
await timer.catch(() => {});
475+
await graceTimer?.catch(() => {});
472476
};
473477
// Read a single empty value to consume the first message
474478
const reader = headTransformStream.readable.getReader();
475479
// Allows timing out when waiting for the first message
476-
const headerMessage = await Promise.race([
477-
reader.read(),
478-
timer.then(
479-
() => undefined,
480-
() => {},
481-
),
482-
]);
480+
let headerMessage:
481+
| ReadableStreamDefaultReadResult<JSONRPCRequest>
482+
| undefined
483+
| void;
484+
try {
485+
headerMessage = await Promise.race([
486+
reader.read(),
487+
timer.then(
488+
() => undefined,
489+
() => {},
490+
),
491+
]);
492+
} catch (e) {
493+
const newErr = new rpcErrors.ErrorRPCHandlerFailed(
494+
'Stream failed waiting for header',
495+
{ cause: e },
496+
);
497+
await inputStreamEndProm;
498+
timer.cancel(cleanupReason);
499+
graceTimer?.cancel(cleanupReason);
500+
await timer.catch(() => {});
501+
await graceTimer?.catch(() => {});
502+
this.dispatchEvent(
503+
new rpcEvents.RPCErrorEvent({
504+
detail: new rpcErrors.ErrorRPCOutputStreamError(
505+
'Stream failed waiting for header',
506+
{
507+
cause: newErr,
508+
},
509+
),
510+
}),
511+
);
512+
return;
513+
}
483514
// Downgrade back to the raw stream
484515
await reader.cancel();
485516
// There are 2 conditions where we just end here
486517
// 1. The timeout timer resolves before the first message
487518
// 2. the stream ends before the first message
488519
if (headerMessage == null) {
489-
await cleanUp(
490-
new rpcErrors.ErrorRPCHandlerFailed('Timed out waiting for header'),
520+
const newErr = new rpcErrors.ErrorRPCHandlerFailed(
521+
'Timed out waiting for header',
522+
);
523+
await cleanUp(newErr);
524+
this.dispatchEvent(
525+
new rpcEvents.RPCErrorEvent({
526+
detail: new rpcErrors.ErrorRPCOutputStreamError(
527+
'Timed out waiting for header',
528+
{
529+
cause: newErr,
530+
},
531+
),
532+
}),
491533
);
492534
return;
493535
}
494536
if (headerMessage.done) {
495-
await cleanUp(new rpcErrors.ErrorRPCHandlerFailed('Missing header'));
537+
const newErr = new rpcErrors.ErrorRPCHandlerFailed('Missing header');
538+
await cleanUp(newErr);
539+
this.dispatchEvent(
540+
new rpcEvents.RPCErrorEvent({
541+
detail: new rpcErrors.ErrorRPCOutputStreamError('Missing header', {
542+
cause: newErr,
543+
}),
544+
}),
545+
);
496546
return;
497547
}
498548
const method = headerMessage.value.method;
@@ -514,6 +564,7 @@ class RPCServer extends EventTarget {
514564
// Otherwise refresh
515565
timer.refresh();
516566
}
567+
this.logger.info(`Handling stream with method (${method})`);
517568
const outputStream = handler(
518569
[headerMessage.value, inputStream],
519570
rpcStream.cancel,
@@ -524,6 +575,7 @@ class RPCServer extends EventTarget {
524575
.pipeTo(rpcStream.writable)
525576
.catch(() => {}); // Ignore any errors, we only care that it finished
526577
await Promise.allSettled([inputStreamEndProm, outputStreamEndProm]);
578+
this.logger.info(`Handled stream with method (${method})`);
527579
// Cleaning up abort and timer
528580
timer.cancel(cleanupReason);
529581
abortController.signal.removeEventListener('abort', handleAbort);

tests/nodes/NodeConnectionManager.seednodes.test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import Sigchain from '@/sigchain/Sigchain';
2121
import TaskManager from '@/tasks/TaskManager';
2222
import NodeManager from '@/nodes/NodeManager';
2323
import PolykeyAgent from '@/PolykeyAgent';
24-
import { sleep } from '@/utils';
2524
import * as testNodesUtils from './utils';
2625
import * as tlsTestUtils from '../utils/tls';
2726

@@ -314,7 +313,6 @@ describe(`${NodeConnectionManager.name} seednodes test`, () => {
314313
await remotePolykeyAgent1.nodeGraph.setNode(remoteNodeId2, remoteAddress2);
315314

316315
await nodeManager.syncNodeGraph(true, 100);
317-
await sleep(1000);
318316
expect(mockedRefreshBucket).toHaveBeenCalled();
319317

320318
await nodeConnectionManager.stop();

tests/nodes/NodeManager.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,8 @@ describe(`${NodeManager.name} test`, () => {
382382
await nodeConnectionManager.withConnF(serverNodeId, async () => {
383383
// Do nothing
384384
});
385-
385+
// Wait for background logic to settle
386+
await sleep(100);
386387
const nodeData2 = await server.nodeGraph.getNode(expectedNodeId);
387388
expect(nodeData2).toBeDefined();
388389
expect(nodeData2?.address.host).toEqual(expectedHost);

0 commit comments

Comments
 (0)