Skip to content
This repository was archived by the owner on Aug 29, 2023. It is now read-only.

Commit af7b8e2

Browse files
fix: port listener to ES6 class syntax (#214)
Co-authored-by: achingbrain <[email protected]>
1 parent 493219c commit af7b8e2

File tree

2 files changed

+89
-97
lines changed

2 files changed

+89
-97
lines changed

src/index.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ import * as mafmt from '@multiformats/mafmt'
33
import errCode from 'err-code'
44
import { logger } from '@libp2p/logger'
55
import { toMultiaddrConnection } from './socket-to-conn.js'
6-
import { createListener } from './listener.js'
6+
import { TCPListener } from './listener.js'
77
import { multiaddrToNetConfig } from './utils.js'
88
import { AbortError } from '@libp2p/interfaces/errors'
99
import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js'
10-
import { CreateListenerOptions, DialOptions, symbol, Transport } from '@libp2p/interface-transport'
10+
import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from '@libp2p/interface-transport'
1111
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
1212
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
1313
import type { Connection } from '@libp2p/interface-connection'
@@ -155,8 +155,8 @@ export class TCP implements Transport {
155155
* anytime a new incoming Connection has been successfully upgraded via
156156
* `upgrader.upgradeInbound`.
157157
*/
158-
createListener (options: TCPCreateListenerOptions) {
159-
return createListener({
158+
createListener (options: TCPCreateListenerOptions): Listener {
159+
return new TCPListener({
160160
...options,
161161
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
162162
socketCloseTimeout: this.opts.socketCloseTimeout
@@ -166,7 +166,7 @@ export class TCP implements Transport {
166166
/**
167167
* Takes a list of `Multiaddr`s and returns only valid TCP addresses
168168
*/
169-
filter (multiaddrs: Multiaddr[]) {
169+
filter (multiaddrs: Multiaddr[]): Multiaddr[] {
170170
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]
171171

172172
return multiaddrs.filter(ma => {

src/listener.ts

Lines changed: 84 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,12 @@ import {
88
} from './utils.js'
99
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events'
1010
import type { MultiaddrConnection, Connection } from '@libp2p/interface-connection'
11-
import type { Upgrader, Listener } from '@libp2p/interface-transport'
12-
import type { Server } from 'net'
11+
import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport'
1312
import type { Multiaddr } from '@multiformats/multiaddr'
1413
import type { TCPCreateListenerOptions } from './index.js'
1514

1615
const log = logger('libp2p:tcp:listener')
1716

18-
interface ServerWithMultiaddrConnections extends Server {
19-
__connections: MultiaddrConnection[]
20-
}
21-
2217
/**
2318
* Attempts to close the given maConn. If a failure occurs, it will be logged
2419
*/
@@ -37,20 +32,29 @@ interface Context extends TCPCreateListenerOptions {
3732
socketCloseTimeout?: number
3833
}
3934

40-
/**
41-
* Create listener
42-
*/
43-
export function createListener (context: Context) {
44-
const {
45-
handler, upgrader, socketInactivityTimeout, socketCloseTimeout
46-
} = context
35+
type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null }
36+
37+
export class TCPListener extends EventEmitter<ListenerEvents> implements Listener {
38+
private readonly server: net.Server
39+
/** Keep track of open connections to destroy in case of timeout */
40+
private readonly connections = new Set<MultiaddrConnection>()
4741

48-
context.keepAlive = context.keepAlive ?? true
42+
private status: Status = { started: false }
4943

50-
let peerId: string | null
51-
let listeningAddr: Multiaddr
44+
constructor (private readonly context: Context) {
45+
super()
46+
47+
context.keepAlive = context.keepAlive ?? true
48+
49+
this.server = net.createServer(context, this.onSocket.bind(this))
50+
51+
this.server
52+
.on('listening', () => this.dispatchEvent(new CustomEvent('listening')))
53+
.on('error', err => this.dispatchEvent(new CustomEvent<Error>('error', { detail: err })))
54+
.on('close', () => this.dispatchEvent(new CustomEvent('close')))
55+
}
5256

53-
const server: ServerWithMultiaddrConnections = Object.assign(net.createServer(context, socket => {
57+
private onSocket (socket: net.Socket) {
5458
// Avoid uncaught errors caused by unstable connections
5559
socket.on('error', err => {
5660
log('socket error', err)
@@ -59,9 +63,9 @@ export function createListener (context: Context) {
5963
let maConn: MultiaddrConnection
6064
try {
6165
maConn = toMultiaddrConnection(socket, {
62-
listeningAddr,
63-
socketInactivityTimeout,
64-
socketCloseTimeout
66+
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
67+
socketInactivityTimeout: this.context.socketInactivityTimeout,
68+
socketCloseTimeout: this.context.socketCloseTimeout
6569
})
6670
} catch (err) {
6771
log.error('inbound connection failed', err)
@@ -70,16 +74,20 @@ export function createListener (context: Context) {
7074

7175
log('new inbound connection %s', maConn.remoteAddr)
7276
try {
73-
upgrader.upgradeInbound(maConn)
77+
this.context.upgrader.upgradeInbound(maConn)
7478
.then((conn) => {
7579
log('inbound connection %s upgraded', maConn.remoteAddr)
76-
trackConn(server, maConn, socket)
80+
this.connections.add(maConn)
7781

78-
if (handler != null) {
79-
handler(conn)
82+
socket.once('close', () => {
83+
this.connections.delete(maConn)
84+
})
85+
86+
if (this.context.handler != null) {
87+
this.context.handler(conn)
8088
}
8189

82-
listener.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
90+
this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
8391
})
8492
.catch(async err => {
8593
log.error('inbound connection failed', err)
@@ -97,85 +105,69 @@ export function createListener (context: Context) {
97105
log.error('closing inbound connection failed', err)
98106
})
99107
}
100-
}),
101-
// Keep track of open connections to destroy in case of timeout
102-
{ __connections: [] })
108+
}
103109

104-
const listener: Listener = Object.assign(new EventEmitter(), {
105-
getAddrs: () => {
106-
let addrs: Multiaddr[] = []
107-
const address = server.address()
110+
getAddrs () {
111+
if (!this.status.started) {
112+
return []
113+
}
108114

109-
if (address == null) {
110-
return []
111-
}
115+
let addrs: Multiaddr[] = []
116+
const address = this.server.address()
117+
const { listeningAddr, peerId } = this.status
112118

113-
if (typeof address === 'string') {
114-
addrs = [listeningAddr]
115-
} else {
116-
try {
117-
// Because TCP will only return the IPv6 version
118-
// we need to capture from the passed multiaddr
119-
if (listeningAddr.toString().startsWith('/ip4')) {
120-
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
121-
} else if (address.family === 'IPv6') {
122-
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
123-
}
124-
} catch (err) {
125-
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
119+
if (address == null) {
120+
return []
121+
}
122+
123+
if (typeof address === 'string') {
124+
addrs = [listeningAddr]
125+
} else {
126+
try {
127+
// Because TCP will only return the IPv6 version
128+
// we need to capture from the passed multiaddr
129+
if (listeningAddr.toString().startsWith('/ip4')) {
130+
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
131+
} else if (address.family === 'IPv6') {
132+
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
126133
}
134+
} catch (err) {
135+
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
127136
}
137+
}
128138

129-
return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma)
130-
},
131-
listen: async (ma: Multiaddr) => {
132-
listeningAddr = ma
133-
peerId = ma.getPeerId()
134-
135-
if (peerId == null) {
136-
listeningAddr = ma.decapsulateCode(CODE_P2P)
137-
}
139+
return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma)
140+
}
138141

139-
return await new Promise<void>((resolve, reject) => {
140-
const options = multiaddrToNetConfig(listeningAddr)
141-
server.listen(options, (err?: any) => {
142-
if (err != null) {
143-
return reject(err)
144-
}
145-
log('Listening on %s', server.address())
146-
resolve()
147-
})
148-
})
149-
},
150-
close: async () => {
151-
if (!server.listening) {
152-
return
153-
}
142+
async listen (ma: Multiaddr) {
143+
const peerId = ma.getPeerId()
144+
const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma
154145

155-
await Promise.all(
156-
server.__connections.map(async maConn => await attemptClose(maConn))
157-
)
146+
this.status = { started: true, listeningAddr, peerId }
158147

159-
await new Promise<void>((resolve, reject) => {
160-
server.close(err => (err != null) ? reject(err) : resolve())
148+
return await new Promise<void>((resolve, reject) => {
149+
const options = multiaddrToNetConfig(listeningAddr)
150+
this.server.listen(options, (err?: any) => {
151+
if (err != null) {
152+
return reject(err)
153+
}
154+
log('Listening on %s', this.server.address())
155+
resolve()
161156
})
162-
}
163-
})
164-
165-
server
166-
.on('listening', () => listener.dispatchEvent(new CustomEvent('listening')))
167-
.on('error', err => listener.dispatchEvent(new CustomEvent<Error>('error', { detail: err })))
168-
.on('close', () => listener.dispatchEvent(new CustomEvent('close')))
157+
})
158+
}
169159

170-
return listener
171-
}
160+
async close () {
161+
if (!this.server.listening) {
162+
return
163+
}
172164

173-
function trackConn (server: ServerWithMultiaddrConnections, maConn: MultiaddrConnection, socket: net.Socket) {
174-
server.__connections.push(maConn)
165+
await Promise.all(
166+
Array.from(this.connections.values()).map(async maConn => await attemptClose(maConn))
167+
)
175168

176-
const untrackConn = () => {
177-
server.__connections = server.__connections.filter(c => c !== maConn)
169+
await new Promise<void>((resolve, reject) => {
170+
this.server.close(err => (err != null) ? reject(err) : resolve())
171+
})
178172
}
179-
180-
socket.once('close', untrackConn)
181173
}

0 commit comments

Comments
 (0)