From 20145341b2e0b9f2ca368574239531b72aad2b0d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 7 May 2019 19:25:14 +0200 Subject: [PATCH 1/4] use try-for-each-concurrent Signed-off-by: Yoshua Wuyts --- examples/tcp-echo.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/examples/tcp-echo.rs b/examples/tcp-echo.rs index f1ae8d93..750cbf22 100644 --- a/examples/tcp-echo.rs +++ b/examples/tcp-echo.rs @@ -14,17 +14,14 @@ async fn main() -> std::io::Result<()> { println!("Listening on {}", listener.local_addr()?); // accept connections and process them in parallel - let mut incoming = listener.incoming(); - while let Some(stream) = incoming.next().await { + listener.incoming().try_for_each_concurrent(!0, async move |stream| { runtime::spawn(async move { - let stream = stream?; println!("Accepting from: {}", stream.peer_addr()?); let (reader, writer) = &mut stream.split(); reader.copy_into(writer).await?; Ok::<(), std::io::Error>(()) - }) - .await?; - } + }).await + }).await?; Ok(()) } From e96129175bfb75e79e95df4eee5650a5d6213a77 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 7 May 2019 19:27:41 +0200 Subject: [PATCH 2/4] move proxy example over Signed-off-by: Yoshua Wuyts --- examples/tcp-proxy.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/examples/tcp-proxy.rs b/examples/tcp-proxy.rs index 2f711536..9b9ff1c1 100644 --- a/examples/tcp-proxy.rs +++ b/examples/tcp-proxy.rs @@ -11,11 +11,9 @@ async fn main() -> std::io::Result<()> { let mut listener = TcpListener::bind("127.0.0.1:8081")?; println!("Listening on {}", listener.local_addr()?); - // accept connections and process them serially - let mut incoming = listener.incoming(); - while let Some(client) = incoming.next().await { - let handle = runtime::spawn(async move { - let client = client?; + // accept connections and process them in parallel + listener.incoming().try_for_each_concurrent(!0, async move |client| { + runtime::spawn(async move { let server = TcpStream::connect("127.0.0.1:8080").await?; println!( "Proxying {} to {}", @@ -30,9 +28,7 @@ async fn main() -> std::io::Result<()> { try_join!(a, b)?; Ok::<(), std::io::Error>(()) - }); - - handle.await?; - } + }).await + }).await?; Ok(()) } From b29fc8f508ea752fa57882712de9b47f74e2319e Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 7 May 2019 19:39:41 +0200 Subject: [PATCH 3/4] fmt and guessing Signed-off-by: Yoshua Wuyts --- examples/guessing.rs | 11 +++++++---- examples/tcp-echo.rs | 20 ++++++++++++-------- examples/tcp-proxy.rs | 36 ++++++++++++++++++++---------------- 3 files changed, 39 insertions(+), 28 deletions(-) diff --git a/examples/guessing.rs b/examples/guessing.rs index b8b6eff6..4697456c 100644 --- a/examples/guessing.rs +++ b/examples/guessing.rs @@ -61,9 +61,12 @@ async fn main() -> Result<(), failure::Error> { let mut listener = TcpListener::bind("127.0.0.1:8080")?; println!("Listening on {}", &listener.local_addr()?); - let mut incoming = listener.incoming(); - while let Some(stream) = incoming.next().await { - runtime::spawn(play(stream?)).await?; - } + let incoming = listener.incoming().map_err(|e| e.into()); + incoming + .try_for_each_concurrent(!0, async move |stream| { + runtime::spawn(play(stream)).await?; + Ok::<(), failure::Error>(()) + }) + .await?; Ok(()) } diff --git a/examples/tcp-echo.rs b/examples/tcp-echo.rs index 750cbf22..2a90aea2 100644 --- a/examples/tcp-echo.rs +++ b/examples/tcp-echo.rs @@ -14,14 +14,18 @@ async fn main() -> std::io::Result<()> { println!("Listening on {}", listener.local_addr()?); // accept connections and process them in parallel - listener.incoming().try_for_each_concurrent(!0, async move |stream| { - runtime::spawn(async move { - println!("Accepting from: {}", stream.peer_addr()?); + listener + .incoming() + .try_for_each_concurrent(!0, async move |stream| { + runtime::spawn(async move { + println!("Accepting from: {}", stream.peer_addr()?); - let (reader, writer) = &mut stream.split(); - reader.copy_into(writer).await?; - Ok::<(), std::io::Error>(()) - }).await - }).await?; + let (reader, writer) = &mut stream.split(); + reader.copy_into(writer).await?; + Ok::<(), std::io::Error>(()) + }) + .await + }) + .await?; Ok(()) } diff --git a/examples/tcp-proxy.rs b/examples/tcp-proxy.rs index 9b9ff1c1..10461006 100644 --- a/examples/tcp-proxy.rs +++ b/examples/tcp-proxy.rs @@ -12,23 +12,27 @@ async fn main() -> std::io::Result<()> { println!("Listening on {}", listener.local_addr()?); // accept connections and process them in parallel - listener.incoming().try_for_each_concurrent(!0, async move |client| { - runtime::spawn(async move { - let server = TcpStream::connect("127.0.0.1:8080").await?; - println!( - "Proxying {} to {}", - client.peer_addr()?, - server.peer_addr()? - ); + listener + .incoming() + .try_for_each_concurrent(!0, async move |client| { + runtime::spawn(async move { + let server = TcpStream::connect("127.0.0.1:8080").await?; + println!( + "Proxying {} to {}", + client.peer_addr()?, + server.peer_addr()? + ); - let (cr, cw) = &mut client.split(); - let (sr, sw) = &mut server.split(); - let a = cr.copy_into(sw); - let b = sr.copy_into(cw); - try_join!(a, b)?; + let (cr, cw) = &mut client.split(); + let (sr, sw) = &mut server.split(); + let a = cr.copy_into(sw); + let b = sr.copy_into(cw); + try_join!(a, b)?; - Ok::<(), std::io::Error>(()) - }).await - }).await?; + Ok::<(), std::io::Error>(()) + }) + .await + }) + .await?; Ok(()) } From 33ae5a813f0418b2d35264db8387a6af3f573c26 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 31 May 2019 14:55:10 +0200 Subject: [PATCH 4/4] improve upper bound Signed-off-by: Yoshua Wuyts --- examples/guessing.rs | 2 +- examples/tcp-echo.rs | 2 +- examples/tcp-proxy.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/guessing.rs b/examples/guessing.rs index 4697456c..b22b9121 100644 --- a/examples/guessing.rs +++ b/examples/guessing.rs @@ -63,7 +63,7 @@ async fn main() -> Result<(), failure::Error> { let incoming = listener.incoming().map_err(|e| e.into()); incoming - .try_for_each_concurrent(!0, async move |stream| { + .try_for_each_concurrent(None, async move |stream| { runtime::spawn(play(stream)).await?; Ok::<(), failure::Error>(()) }) diff --git a/examples/tcp-echo.rs b/examples/tcp-echo.rs index 2a90aea2..6a369a1d 100644 --- a/examples/tcp-echo.rs +++ b/examples/tcp-echo.rs @@ -16,7 +16,7 @@ async fn main() -> std::io::Result<()> { // accept connections and process them in parallel listener .incoming() - .try_for_each_concurrent(!0, async move |stream| { + .try_for_each_concurrent(None, async move |stream| { runtime::spawn(async move { println!("Accepting from: {}", stream.peer_addr()?); diff --git a/examples/tcp-proxy.rs b/examples/tcp-proxy.rs index 10461006..fdaed8c8 100644 --- a/examples/tcp-proxy.rs +++ b/examples/tcp-proxy.rs @@ -14,7 +14,7 @@ async fn main() -> std::io::Result<()> { // accept connections and process them in parallel listener .incoming() - .try_for_each_concurrent(!0, async move |client| { + .try_for_each_concurrent(None, async move |client| { runtime::spawn(async move { let server = TcpStream::connect("127.0.0.1:8080").await?; println!(