Skip to content

Add subcrate that implements network socket handling with Tokio, bump to 0.0.9 #339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ script:
- cargo build --verbose
- cargo test --verbose
- if [ "$(rustup show | grep default | grep 1.29.2)" != "" ]; then cd fuzz && cargo test --verbose && ./travis-fuzz.sh; fi
- if [ "$(rustup show | grep default | grep stable)" != "" ]; then cd net-tokio && cargo build --verbose; fi
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "lightning"
version = "0.0.8"
version = "0.0.9"
authors = ["Matt Corallo"]
license = "Apache-2.0"
repository = "https://github.com/rust-bitcoin/rust-lightning/"
Expand All @@ -22,13 +22,13 @@ max_level_info = []
max_level_debug = []

[dependencies]
bitcoin = "0.17"
bitcoin = "0.18"
bitcoin_hashes = "0.3"
rand = "0.4"
secp256k1 = "0.12"

[dev-dependencies.bitcoin]
version = "0.17"
version = "0.18"
features = ["bitcoinconsensus"]

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ libfuzzer_fuzz = ["libfuzzer-sys"]
[dependencies]
afl = { version = "0.4", optional = true }
lightning = { path = "..", features = ["fuzztarget"] }
bitcoin = { version = "0.17", features = ["fuzztarget"] }
bitcoin = { version = "0.18", features = ["fuzztarget"] }
bitcoin_hashes = { version = "0.3", features=["fuzztarget"] }
hex = "0.3"
honggfuzz = { version = "0.5", optional = true }
Expand Down
19 changes: 19 additions & 0 deletions net-tokio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "lightning-net-tokio"
version = "0.0.1"
authors = ["Matt Corallo"]
license = "Apache-2.0"
description = """
Implementation of the rust-lightning network stack using Tokio.
For Rust-Lightning clients which wish to make direct connections to Lightning P2P nodes, this is a simple alternative to implementing the nerequired network stack, especially for those already using Tokio.
"""

[dependencies]
bitcoin = "0.18"
bitcoin_hashes = "0.3"
lightning = { version = "0.0.9", path = "../" }
secp256k1 = "0.12"
tokio-codec = "0.1"
futures = "0.1"
tokio = "0.1"
bytes = "0.4"
270 changes: 270 additions & 0 deletions net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
extern crate bytes;
extern crate tokio;
extern crate tokio_codec;
extern crate futures;
extern crate lightning;
extern crate secp256k1;

use bytes::BufMut;

use futures::future;
use futures::future::Future;
use futures::{AsyncSink, Stream, Sink};
use futures::sync::mpsc;

use secp256k1::key::PublicKey;

use tokio::timer::Delay;
use tokio::net::TcpStream;

use lightning::ln::peer_handler;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;

use std::mem;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use std::vec::Vec;
use std::hash::Hash;

static ID_COUNTER: AtomicU64 = AtomicU64::new(0);

/// A connection to a remote peer. Can be constructed either as a remote connection using
/// Connection::setup_outbound o
pub struct Connection {
writer: Option<mpsc::Sender<bytes::Bytes>>,
event_notify: mpsc::Sender<()>,
pending_read: Vec<u8>,
read_blocker: Option<futures::sync::oneshot::Sender<Result<(), ()>>>,
read_paused: bool,
need_disconnect: bool,
id: u64,
}
impl Connection {
fn schedule_read(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
let us_ref = us.clone();
let us_close_ref = us.clone();
let peer_manager_ref = peer_manager.clone();
tokio::spawn(reader.for_each(move |b| {
let pending_read = b.to_vec();
{
let mut lock = us_ref.lock().unwrap();
assert!(lock.pending_read.is_empty());
if lock.read_paused {
lock.pending_read = pending_read;
let (sender, blocker) = futures::sync::oneshot::channel();
lock.read_blocker = Some(sender);
return future::Either::A(blocker.then(|_| { Ok(()) }));
}
}
//TODO: There's a race where we don't meet the requirements of disconnect_socket if its
//called right here, after we release the us_ref lock in the scope above, but before we
//call read_event!
match peer_manager.read_event(&mut SocketDescriptor::new(us_ref.clone(), peer_manager.clone()), pending_read) {
Ok(pause_read) => {
if pause_read {
let mut lock = us_ref.lock().unwrap();
lock.read_paused = true;
}
},
Err(e) => {
us_ref.lock().unwrap().need_disconnect = false;
return future::Either::B(future::result(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e))));
}
}

if let Err(e) = us_ref.lock().unwrap().event_notify.try_send(()) {
// Ignore full errors as we just need them to poll after this point, so if the user
// hasn't received the last send yet, it doesn't matter.
assert!(e.is_full());
}

future::Either::B(future::result(Ok(())))
}).then(move |_| {
if us_close_ref.lock().unwrap().need_disconnect {
peer_manager_ref.disconnect_event(&SocketDescriptor::new(us_close_ref, peer_manager_ref.clone()));
println!("Peer disconnected!");
} else {
println!("We disconnected peer!");
}
Ok(())
}));
}

fn new(event_notify: mpsc::Sender<()>, stream: TcpStream) -> (futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>, Arc<Mutex<Self>>) {
let (writer, reader) = tokio_codec::Framed::new(stream, tokio_codec::BytesCodec::new()).split();
let (send_sink, send_stream) = mpsc::channel(3);
tokio::spawn(writer.send_all(send_stream.map_err(|_| -> std::io::Error {
unreachable!();
})).then(|_| {
future::result(Ok(()))
}));
let us = Arc::new(Mutex::new(Self { writer: Some(send_sink), event_notify, pending_read: Vec::new(), read_blocker: None, read_paused: false, need_disconnect: true, id: ID_COUNTER.fetch_add(1, Ordering::AcqRel) }));

(reader, us)
}

/// Process incoming messages and feed outgoing messages on the provided socket generated by
/// accepting an incoming connection (by scheduling futures with tokio::spawn).
///
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
/// ChannelManager and ChannelMonitor objects.
pub fn setup_inbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
let (reader, us) = Self::new(event_notify, stream);

if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) {
Self::schedule_read(peer_manager, us, reader);
}
}

/// Process incoming messages and feed outgoing messages on the provided socket generated by
/// making an outbound connection which is expected to be accepted by a peer with the given
/// public key (by scheduling futures with tokio::spawn).
///
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
/// ChannelManager and ChannelMonitor objects.
pub fn setup_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
let (reader, us) = Self::new(event_notify, stream);

if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) {
if SocketDescriptor::new(us.clone(), peer_manager.clone()).send_data(&initial_send, 0, true) == initial_send.len() {
Self::schedule_read(peer_manager, us, reader);
} else {
println!("Failed to write first full message to socket!");
}
}
}

/// Process incoming messages and feed outgoing messages on a new connection made to the given
/// socket address which is expected to be accepted by a peer with the given public key (by
/// scheduling futures with tokio::spawn).
///
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
/// ChannelManager and ChannelMonitor objects.
pub fn connect_outbound(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| {
future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
});
tokio::spawn(TcpStream::connect(&addr).select(connect_timeout)
.and_then(move |stream| {
Connection::setup_outbound(peer_manager, event_notify, their_node_id, stream.0);
future::ok(())
}).or_else(|_| {
//TODO: return errors somehow
future::ok(())
}));
}
}

#[derive(Clone)]
pub struct SocketDescriptor {
conn: Arc<Mutex<Connection>>,
id: u64,
peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>,
}
impl SocketDescriptor {
fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor>>) -> Self {
let id = conn.lock().unwrap().id;
Self { conn, id, peer_manager }
}
}
impl peer_handler::SocketDescriptor for SocketDescriptor {
fn send_data(&mut self, data: &Vec<u8>, write_offset: usize, resume_read: bool) -> usize {
macro_rules! schedule_read {
($us_ref: expr) => {
tokio::spawn(future::lazy(move || -> Result<(), ()> {
let mut read_data = Vec::new();
{
let mut us = $us_ref.conn.lock().unwrap();
mem::swap(&mut read_data, &mut us.pending_read);
}
if !read_data.is_empty() {
let mut us_clone = $us_ref.clone();
match $us_ref.peer_manager.read_event(&mut us_clone, read_data) {
Ok(pause_read) => {
if pause_read { return Ok(()); }
},
Err(_) => {
//TODO: Not actually sure how to do this
return Ok(());
}
}
}
let mut us = $us_ref.conn.lock().unwrap();
if let Some(sender) = us.read_blocker.take() {
sender.send(Ok(())).unwrap();
}
us.read_paused = false;
if let Err(e) = us.event_notify.try_send(()) {
// Ignore full errors as we just need them to poll after this point, so if the user
// hasn't received the last send yet, it doesn't matter.
assert!(e.is_full());
}
Ok(())
}));
}
}

let mut us = self.conn.lock().unwrap();
if resume_read {
let us_ref = self.clone();
schedule_read!(us_ref);
}
if data.len() == write_offset { return 0; }
if us.writer.is_none() {
us.read_paused = true;
return 0;
}

let mut bytes = bytes::BytesMut::with_capacity(data.len() - write_offset);
bytes.put(&data[write_offset..]);
let write_res = us.writer.as_mut().unwrap().start_send(bytes.freeze());
match write_res {
Ok(res) => {
match res {
AsyncSink::Ready => {
data.len() - write_offset
},
AsyncSink::NotReady(_) => {
us.read_paused = true;
let us_ref = self.clone();
tokio::spawn(us.writer.take().unwrap().flush().then(move |writer_res| -> Result<(), ()> {
if let Ok(writer) = writer_res {
{
let mut us = us_ref.conn.lock().unwrap();
us.writer = Some(writer);
}
schedule_read!(us_ref);
} // we'll fire the disconnect event on the socket reader end
Ok(())
}));
0
}
}
},
Err(_) => {
// We'll fire the disconnected event on the socket reader end
0
},
}
}

fn disconnect_socket(&mut self) {
let mut us = self.conn.lock().unwrap();
us.need_disconnect = true;
us.read_paused = true;
}
}
impl Eq for SocketDescriptor {}
impl PartialEq for SocketDescriptor {
fn eq(&self, o: &Self) -> bool {
self.id == o.id
}
}
impl Hash for SocketDescriptor {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}