Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

[WIP] Move to pull-streams #18

Merged
merged 3 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 54 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,61 @@
js-libp2p-websockets
====================
# js-libp2p-websockets

[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
![](https://img.shields.io/badge/coverage-%3F-yellow.svg?style=flat-square)
[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-websockets.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-websockets)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
[![Coverage Status](https://coveralls.io/repos/github/libp2p/js-libp2p-websockets/badge.svg?branch=master)](https://coveralls.io/github/libp2p/js-libp2p-websockets?branch=master)
[![Travis CI](https://travis-ci.org/libp2p/js-libp2p-websockets.svg?branch=master)](https://travis-ci.org/libp2p/js-libp2p-websockets)
[![Circle CI](https://circleci.com/gh/libp2p/js-libp2p-websockets.svg?style=svg)](https://circleci.com/gh/libp2p/js-libp2p-websockets)
[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-websockets.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-websockets) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)

![](https://github.com/raw/libp2p/interface-connection/master/img/badge.png)
![](https://github.com/raw/libp2p/interface-transport/master/img/badge.png)

> JavaScript implementation of the WebSockets module that libp2p uses and that implements the interface-transport interface

## Description

`libp2p-websockets` is the WebSockets implementation compatible with libp2p.

**Note:** This module uses [pull-streams](https://pull-stream.github.io) for all stream based interfaces.

## Example

```
TODO
```

## Installation

### npm

```sh
> npm i libp2p-websockets
```

## This module uses `pull-streams`

We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about what took us to make this migration, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362).

You can learn more about pull-streams at:

- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ)
- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams)
- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple)
- [pull-streams documentation](https://pull-stream.github.io/)

### Converting `pull-streams` to Node.js Streams

If you are a Node.js streams user, you can convert a pull-stream to Node.js Stream using the module `pull-stream-to-stream`, giving you an instance of a Node.js stream that is linked to the pull-stream. Example:

```
const pullToStream = require('pull-stream-to-stream')

const nodeStreamInstance = pullToStream(pullStreamInstance)
// nodeStreamInstance is an instance of a Node.js Stream
```

To learn more about his utility, visit https://pull-stream.github.io/#pull-stream-to-stream

## API

[![](https://github.com/raw/diasdavid/interface-transport/master/img/badge.png)](https://github.com/diasdavid/interface-transport)
4 changes: 3 additions & 1 deletion gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const gulp = require('gulp')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')

const WS = require('./src')

let listener
Expand All @@ -10,7 +12,7 @@ gulp.task('test:browser:before', (done) => {
const ws = new WS()
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
listener = ws.createListener((conn) => {
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, done)
})
Expand Down
26 changes: 13 additions & 13 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,38 @@
],
"repository": {
"type": "git",
"url": "git+https://github.com/diasdavid/js-libp2p-websockets.git"
"url": "git+https://github.com/libp2p/js-libp2p-websockets.git"
},
"keywords": [
"IPFS"
],
"author": "David Dias <[email protected]>",
"license": "MIT",
"bugs": {
"url": "https://github.com/diasdavid/js-libp2p-websockets/issues"
"url": "https://github.com/libp2p/js-libp2p-websockets/issues"
},
"homepage": "https://github.com/diasdavid/js-libp2p-websockets#readme",
"homepage": "https://github.com/libp2p/js-libp2p-websockets#readme",
"dependencies": {
"detect-node": "^2.0.3",
"interface-connection": "^0.1.8",
"interface-connection": "^0.2.1",
"lodash.contains": "^2.4.3",
"mafmt": "^2.1.0",
"run-parallel": "^1.1.6",
"simple-websocket": "^4.1.0",
"simple-websocket-server": "^0.1.4"
"mafmt": "^2.1.1",
"pull-ws": "^3.2.3"
},
"devDependencies": {
"aegir": "^6.0.0",
"multiaddr": "^2.0.2",
"aegir": "^6.0.1",
"chai": "^3.5.0",
"gulp": "^3.9.1",
"interface-transport": "^0.2.0",
"pre-commit": "^1.1.2"
"interface-transport": "^0.3.3",
"multiaddr": "^2.0.2",
"pre-commit": "^1.1.3",
"pull-goodbye": "0.0.1",
"pull-stream": "^3.4.3"
},
"contributors": [
"David Dias <[email protected]>",
"Francisco Baio Dias <[email protected]>",
"Friedel Ziegelmayer <[email protected]>",
"greenkeeperio-bot <[email protected]>"
]
}
}
130 changes: 19 additions & 111 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,147 +1,55 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:websockets')
const SW = require('simple-websocket')
const isNode = require('detect-node')
let SWS
if (isNode) {
SWS = require('simple-websocket-server')
} else {
SWS = {}
}
const connect = require('pull-ws/client')
const mafmt = require('mafmt')
const contains = require('lodash.contains')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('libp2p:websockets:dialer')

const CLOSE_TIMEOUT = 2000
// const IPFS_CODE = 421

exports = module.exports = WebSockets

function WebSockets () {
if (!(this instanceof WebSockets)) {
return new WebSockets()
}
const createListener = require('./listener')

this.dial = function (ma, options, callback) {
module.exports = class WebSockets {
dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

if (!callback) {
callback = function noop () {}
callback = () => {}
}

const maOpts = ma.toOptions()

const socket = new SW('ws://' + maOpts.host + ':' + maOpts.port)

const conn = new Connection(socket)

socket.on('timeout', () => {
conn.emit('timeout')
})

socket.on('error', (err) => {
callback(err)
conn.emit('error', err)
})

socket.on('connect', () => {
callback(null, conn)
conn.emit('connect')
const url = `ws://${maOpts.host}:${maOpts.port}`
log('dialing %s', url)
const socket = connect(url, {
binary: true,
onConnect: callback
})

conn.getObservedAddrs = (cb) => {
return cb(null, [ma])
}
const conn = new Connection(socket)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conn.getObservedAddrs = (cb) => cb(null, [ma])
conn.close = (cb) => socket.close(cb)

return conn
}

this.createListener = (options, handler) => {
createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}

const listener = SWS.createServer((socket) => {
const conn = new Connection(socket)

conn.getObservedAddrs = (cb) => {
// TODO research if we can reuse the address in anyway
return cb(null, [])
}
handler(conn)
})

let listeningMultiaddr

listener._listen = listener.listen
listener.listen = (ma, callback) => {
if (!callback) {
callback = function noop () {}
}

listeningMultiaddr = ma

if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
}

listener._listen(ma.toOptions(), callback)
}

listener._close = listener.close
listener.close = (options, callback) => {
if (typeof options === 'function') {
callback = options
options = { timeout: CLOSE_TIMEOUT }
}
if (!callback) { callback = function noop () {} }
if (!options) { options = { timeout: CLOSE_TIMEOUT } }

let closed = false
listener.once('close', () => {
closed = true
})
listener._close(callback)
setTimeout(() => {
if (closed) {
return
}
log('unable to close graciously, destroying conns')
Object.keys(listener.__connections).forEach((key) => {
log('destroying %s', key)
listener.__connections[key].destroy()
})
}, options.timeout || CLOSE_TIMEOUT)
}

// Keep track of open connections to destroy in case of timeout
listener.__connections = {}
listener.on('connection', (socket) => {
const key = (~~(Math.random() * 1e9)).toString(36) + Date.now()
listener.__connections[key] = socket

socket.on('close', () => {
delete listener.__connections[key]
})
})

listener.getAddrs = (callback) => {
callback(null, [listeningMultiaddr])
}

return listener
return createListener(options, handler)
}

this.filter = (multiaddrs) => {
filter (multiaddrs) {
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}

return multiaddrs.filter((ma) => {
if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
Expand Down
46 changes: 46 additions & 0 deletions src/listener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const isNode = require('detect-node')
const Connection = require('interface-connection').Connection
const contains = require('lodash.contains')

// const IPFS_CODE = 421

let createServer

if (isNode) {
createServer = require('pull-ws/server')
} else {
createServer = () => {}
}

module.exports = (options, handler) => {
const listener = createServer((socket) => {
socket.getObservedAddrs = (cb) => {
// TODO research if we can reuse the address in anyway
return cb(null, [])
}

handler(new Connection(socket))
})

let listeningMultiaddr

listener._listen = listener.listen
listener.listen = (ma, cb) => {
cb = cb || (() => {})
listeningMultiaddr = ma

if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
}

listener._listen(ma.toOptions(), cb)
}

listener.getAddrs = (cb) => {
cb(null, [listeningMultiaddr])
}

return listener
Copy link
Member

@daviddias daviddias Sep 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is missing to bubble up the events from the listener:

  • event: 'listening'
  • event: 'close'
  • event: 'connection'
  • event: 'error'

See interface-transport.

  • Add tests for the above

}
Loading