diff --git a/Cargo.toml b/Cargo.toml index 521e7ae03c5..a91544e36da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,7 @@ rustc-workspace-hack = "1.0.0" [dev-dependencies] difference = "2" tempfile = "3" -lsp-codec = "0.1.1" +lsp-codec = "0.1.2" tokio = "0.1" futures = "0.1" tokio-process = "0.2" diff --git a/tests/client.rs b/tests/client.rs index 8cff3231aa0..8497d651475 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -44,8 +44,6 @@ fn client_test_infer_bin() { assert!(diag.uri.as_str().ends_with("src/main.rs")); assert!(diag.diagnostics[0].message.contains("struct is never constructed: `UnusedBin`")); - - rls.shutdown(); } #[test] @@ -60,8 +58,6 @@ fn client_test_infer_lib() { assert!(diag.uri.as_str().ends_with("src/lib.rs")); assert!(diag.diagnostics[0].message.contains("struct is never constructed: `UnusedLib`")); - - rls.shutdown(); } #[test] @@ -77,8 +73,6 @@ fn client_test_infer_custom_bin() { assert!(diag.uri.as_str().ends_with("src/custom_bin.rs")); assert!(diag.diagnostics[0].message.contains("struct is never constructed: `UnusedCustomBin`")); - - rls.shutdown(); } /// Test includes window/progress regression testing @@ -178,8 +172,6 @@ fn client_test_simple_workspace() { }) .count(); assert_eq!(count, 4); - - rls.shutdown(); } #[test] @@ -304,8 +296,6 @@ fn client_changing_workspace_lib_retains_diagnostics() { assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `test_val`"))); assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `unused`"))); assert!(bin.diagnostics[0].message.contains("unused variable: `val`")); - - rls.shutdown(); } #[test] @@ -397,8 +387,6 @@ fn client_implicit_workspace_pick_up_lib_changes() { let bin = rls.future_diagnostics("src/main.rs"); let bin = rls.block_on(bin).unwrap(); assert!(bin.diagnostics[0].message.contains("unused variable: `val`")); - - rls.shutdown(); } #[test] @@ -465,8 +453,6 @@ fn client_test_complete_self_crate_name() { let item = items.into_iter().nth(0).expect("Racer autocompletion failed"); assert_eq!(item.detail.unwrap(), "pub fn function() -> usize"); - - rls.shutdown(); } #[test] @@ -559,8 +545,6 @@ fn client_completion_suggests_arguments_in_statements() { let item = items.into_iter().nth(0).expect("Racer autocompletion failed"); assert_eq!(item.insert_text.unwrap(), "function()"); - - rls.shutdown(); } #[test] @@ -627,8 +611,6 @@ fn client_use_statement_completion_doesnt_suggest_arguments() { let item = items.into_iter().nth(0).expect("Racer autocompletion failed"); assert_eq!(item.insert_text.unwrap(), "function"); - - rls.shutdown(); } /// Test simulates typing in a dependency wrongly in a couple of ways before finally getting it @@ -713,8 +695,6 @@ fn client_dependency_typo_and_fix() { diag.diagnostics.iter().find(|d| d.severity == Some(DiagnosticSeverity::Error)), None ); - - rls.shutdown(); } /// Tests correct positioning of a toml parse error, use of `==` instead of `=`. @@ -757,8 +737,6 @@ fn client_invalid_toml_manifest() { end: Position { line: 2, character: 22 }, } ); - - rls.shutdown(); } /// Tests correct file highlighting of workspace member manifest with invalid path dependency. @@ -816,8 +794,6 @@ fn client_invalid_member_toml_manifest() { assert_eq!(diag.diagnostics.len(), 1); assert_eq!(diag.diagnostics[0].severity, Some(DiagnosticSeverity::Error)); assert!(diag.diagnostics[0].message.contains("failed to read")); - - rls.shutdown(); } #[test] @@ -877,8 +853,6 @@ fn client_invalid_member_dependency_resolution() { assert_eq!(diag.diagnostics.len(), 1); assert_eq!(diag.diagnostics[0].severity, Some(DiagnosticSeverity::Error)); assert!(diag.diagnostics[0].message.contains("no matching package named `nosuchdep123`")); - - rls.shutdown(); } #[test] @@ -917,8 +891,6 @@ fn client_handle_utf16_unit_text_edits() { text: "".to_string(), }], }); - - rls.shutdown(); } /// Ensures that wide characters do not prevent RLS from calculating correct @@ -962,8 +934,6 @@ fn client_format_utf16_range() { // Actual formatting isn't important - what is, is that the buffer isn't // malformed and code stays semantically equivalent. assert_eq!(new_text, vec!["/* 😢😢😢😢😢😢😢 */\nfn main() {}\n"]); - - rls.shutdown(); } #[test] @@ -1015,8 +985,6 @@ fn client_lens_run() { }; assert_eq!(lens, Some(vec![expected])); - - rls.shutdown(); } #[test] @@ -1077,7 +1045,6 @@ fn client_find_definitions() { } } } - rls.shutdown(); // Foo let foo_definition = Range { @@ -1288,8 +1255,6 @@ fn client_deglob() { }) .collect::>() ); - - rls.shutdown(); } fn is_notification_for_unknown_config(msg: &serde_json::Value) -> bool { @@ -1339,7 +1304,6 @@ fn client_init_duplicated_and_unknown_settings() { assert!(rls.messages().iter().any(is_notification_for_unknown_config)); assert!(rls.messages().iter().any(is_notification_for_duplicated_config)); - rls.shutdown(); } #[test] @@ -1384,8 +1348,6 @@ fn client_did_change_configuration_duplicated_and_unknown_settings() { if !rls.messages().iter().any(is_notification_for_duplicated_config) { rls.wait_for_message(is_notification_for_duplicated_config); } - - rls.shutdown(); } #[test] @@ -1395,8 +1357,6 @@ fn client_shutdown() { let mut rls = p.spawn_rls_async(); rls.request::(0, initialize_params(root_path)); - - rls.shutdown(); } #[test] @@ -1430,8 +1390,6 @@ fn client_goto_def() { .collect(); assert!(ranges.iter().any(|r| r.start == Position { line: 11, character: 8 })); - - rls.shutdown(); } #[test] @@ -1463,8 +1421,6 @@ fn client_hover() { let contents = contents.map(MarkedString::LanguageString).collect(); assert_eq!(result.contents, HoverContents::Array(contents)); - - rls.shutdown(); } /// Test hover continues to work after the source has moved line @@ -1534,8 +1490,6 @@ fn client_hover_after_src_line_change() { .unwrap(); assert_eq!(result.contents, HoverContents::Array(contents)); - - rls.shutdown(); } #[test] @@ -1576,8 +1530,6 @@ fn client_workspace_symbol() { dbg!(&sym); assert!(symbols.iter().any(|s| *s == sym)); } - - rls.shutdown(); } #[test] @@ -1612,8 +1564,6 @@ fn client_workspace_symbol_duplicates() { }; assert_eq!(symbols, vec![symbol]); - - rls.shutdown(); } #[ignore] // FIXME(#1265): This is spurious (we don't pick up reference under #[cfg(test)])-ed code - why? @@ -1651,8 +1601,6 @@ fn client_find_all_refs_test() { dbg!(range); assert!(result.iter().any(|x| x.range == range)); } - - rls.shutdown(); } #[test] @@ -1691,8 +1639,6 @@ fn client_find_all_refs_no_cfg_test() { dbg!(range); assert!(result.iter().any(|x| x.range == range)); } - - rls.shutdown(); } #[test] @@ -1707,8 +1653,6 @@ fn client_borrow_error() { let msg = "cannot borrow `x` as mutable more than once at a time"; assert!(diag.diagnostics.iter().any(|diag| diag.message.contains(msg))); - - rls.shutdown(); } #[test] @@ -1743,8 +1687,6 @@ fn client_highlight() { dbg!(range); assert!(result.iter().any(|x| x.range == range)); } - - rls.shutdown(); } #[test] @@ -1785,8 +1727,6 @@ fn client_rename() { let changes = std::iter::once((uri, ranges.collect())).collect(); assert_eq!(result.changes, Some(changes)); - - rls.shutdown(); } #[test] @@ -1820,8 +1760,6 @@ fn client_reformat() { }, new_text: "// Copyright 2017 The Rust Project Developers. See the COPYRIGHT\n// file at the top-level directory of this distribution and at\n// http://rust-lang.org/COPYRIGHT.\n//\n// Licensed under the Apache License, Version 2.0 or the MIT license\n// , at your\n// option. This file may not be copied, modified, or distributed\n// except according to those terms.\n\npub mod foo;\npub fn main() {\n let world = \"world\";\n println!(\"Hello, {}!\", world);\n}\n".to_string(), }); - - rls.shutdown(); } #[test] @@ -1859,8 +1797,6 @@ fn client_reformat_with_range() { .replace("\n", newline); assert_eq!(result.unwrap()[0].new_text, formatted); - - rls.shutdown(); } #[test] @@ -1888,8 +1824,6 @@ fn client_multiple_binaries() { assert!(diags.iter().any(|message| message.starts_with(msg))); } } - - rls.shutdown(); } #[ignore] // Requires `rust-src` component, which isn't available in Rust CI. @@ -1944,8 +1878,6 @@ fn client_completion() { ); let items = completions(result.unwrap()); assert!(items.iter().any(|item| item_eq!(item, expected[1]))); - - rls.shutdown(); } #[test] @@ -1963,8 +1895,6 @@ fn client_bin_lib_project() { assert_eq!(diag.diagnostics.len(), 1); assert_eq!(diag.diagnostics[0].severity, Some(DiagnosticSeverity::Warning)); assert!(diag.diagnostics[0].message.contains("unused variable: `unused_var`")); - - rls.shutdown(); } #[test] @@ -1981,8 +1911,6 @@ fn client_infer_lib() { assert_eq!(diag.diagnostics.len(), 1); assert_eq!(diag.diagnostics[0].severity, Some(DiagnosticSeverity::Warning)); assert!(diag.diagnostics[0].message.contains("struct is never constructed: `UnusedLib`")); - - rls.shutdown(); } #[test] @@ -2003,8 +1931,6 @@ fn client_omit_init_build() { rls.block_on(response).unwrap(); assert_eq!(rls.messages().iter().count(), 1); - - rls.shutdown(); } #[test] @@ -2066,8 +1992,6 @@ fn client_find_impls() { for exp in expected { assert!(locs.iter().any(|x| *x == exp)); } - - rls.shutdown(); } #[test] @@ -2085,8 +2009,6 @@ fn client_features() { assert_eq!(diag.diagnostics[0].severity, Some(DiagnosticSeverity::Error)); let msg = "cannot find struct, variant or union type `Foo` in this scope"; assert!(diag.diagnostics[0].message.contains(msg)); - - rls.shutdown(); } #[test] @@ -2104,8 +2026,6 @@ fn client_all_features() { rls.messages().iter().filter(|x| x["method"] == PublishDiagnostics::METHOD).count(), 0 ); - - rls.shutdown(); } #[test] @@ -2124,8 +2044,6 @@ fn client_no_default_features() { assert_eq!(diag.diagnostics[0].severity, Some(DiagnosticSeverity::Error)); let msg = "cannot find struct, variant or union type `Baz` in this scope"; assert!(diag.diagnostics[0].message.contains(msg)); - - rls.shutdown(); } #[test] @@ -2143,8 +2061,6 @@ fn client_all_targets() { assert_eq!(diag.diagnostics.len(), 1); assert_eq!(diag.diagnostics[0].severity, Some(DiagnosticSeverity::Warning)); assert!(diag.diagnostics[0].message.contains("unused variable: `unused_var`")); - - rls.shutdown(); } /// Handle receiving a notification before the `initialize` request by ignoring and @@ -2159,8 +2075,6 @@ fn client_ignore_uninitialized_notification() { rls.request::(0, initialize_params(root_path)); rls.wait_for_indexing(); - - rls.shutdown(); } /// Handle receiving requests before the `initialize` request by returning an error response @@ -2189,8 +2103,6 @@ fn client_fail_uninitialized_request() { assert_eq!(err.id, jsonrpc_core::Id::Num(ID)); assert_eq!(err.error.code, jsonrpc_core::ErrorCode::ServerError(-32002)); assert_eq!(err.error.message, "not yet received `initialize` request"); - - rls.shutdown(); } // Test that RLS can accept configuration with config keys in 4 different cases: @@ -2223,8 +2135,6 @@ fn client_init_impl(convert_case: fn(&str) -> String) { assert_eq!(diag.diagnostics[0].severity, Some(DiagnosticSeverity::Error)); let msg = "cannot find type `PathBuf` in this scope"; assert!(diag.diagnostics[0].message.contains(msg)); - - rls.shutdown(); } #[test] diff --git a/tests/support/client/child_process.rs b/tests/support/client/child_process.rs new file mode 100644 index 00000000000..a8f2f35f079 --- /dev/null +++ b/tests/support/client/child_process.rs @@ -0,0 +1,55 @@ +use std::io::{Read, Write}; +use std::process::{Command, Stdio}; +use std::rc::Rc; + +use futures::Poll; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_process::{Child, CommandExt}; + +pub struct ChildProcess { + stdin: tokio_process::ChildStdin, + stdout: tokio_process::ChildStdout, + child: Rc, +} + +impl Read for ChildProcess { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + Read::read(&mut self.stdout, buf) + } +} + +impl Write for ChildProcess { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + Write::write(&mut self.stdin, buf) + } + fn flush(&mut self) -> std::io::Result<()> { + Write::flush(&mut self.stdin) + } +} + +impl AsyncRead for ChildProcess {} +impl AsyncWrite for ChildProcess { + fn shutdown(&mut self) -> Poll<(), std::io::Error> { + AsyncWrite::shutdown(&mut self.stdin) + } +} + +impl ChildProcess { + pub fn spawn_from_command(mut cmd: Command) -> Result { + cmd.stdin(Stdio::piped()); + cmd.stdout(Stdio::piped()); + let mut child = cmd.spawn_async()?; + + Ok(ChildProcess { + stdout: child.stdout().take().unwrap(), + stdin: child.stdin().take().unwrap(), + child: Rc::new(child), + }) + } + + /// Returns a handle to the underlying `Child` process. + /// Useful when waiting until child process exits. + pub fn child(&self) -> Rc { + Rc::clone(&self.child) + } +} diff --git a/tests/support/client.rs b/tests/support/client/mod.rs similarity index 72% rename from tests/support/client.rs rename to tests/support/client/mod.rs index 84dd4090b11..53c88cb5ea3 100644 --- a/tests/support/client.rs +++ b/tests/support/client/mod.rs @@ -15,22 +15,25 @@ use std::process::{Command, Stdio}; use std::rc::Rc; use futures::sink::Sink; -use futures::stream::Stream; +use futures::stream::{SplitSink, Stream}; use futures::unsync::oneshot; use futures::Future; -use lsp_codec::{LspDecoder, LspEncoder}; +use lsp_codec::LspCodec; use lsp_types::notification::{Notification, PublishDiagnostics}; use lsp_types::PublishDiagnosticsParams; use serde::Deserialize; use serde_json::{json, Value}; -use tokio::codec::{FramedRead, FramedWrite}; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::runtime::current_thread::Runtime; -use tokio::util::FutureExt; -use tokio_process::{Child, ChildStdin, CommandExt}; +use tokio::util::{FutureExt, StreamExt}; use super::project_builder::Project; use super::{rls_exe, rls_timeout}; +use child_process::ChildProcess; + +mod child_process; + // `Rc` because we share those in message reader stream and the RlsHandle. // `RefCell` because borrows don't overlap. This is safe, because `process_msg` // is only called (synchronously) when we execute some work on the runtime, @@ -39,40 +42,64 @@ use super::{rls_exe, rls_timeout}; type Messages = Rc>>; type Channels = Rc bool>, oneshot::Sender)>>>; +type LspFramed = tokio::codec::Framed; + +trait LspFramedExt { + fn from_transport(transport: T) -> Self; +} + +impl LspFramedExt for LspFramed { + fn from_transport(transport: T) -> Self { + tokio::codec::Framed::new(transport, LspCodec::default()) + } +} + impl Project { - pub fn spawn_rls_async(&self) -> RlsHandle { + pub fn rls_cmd(&self) -> Command { let mut cmd = Command::new(rls_exe()); - cmd.current_dir(self.root()) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::inherit()); + cmd.current_dir(self.root()); + cmd.stderr(Stdio::inherit()); + + cmd + } - let mut child = cmd.spawn_async().expect("Couldn't spawn RLS"); - let stdin = child.stdin().take().unwrap(); - let stdout = child.stdout().take().unwrap(); + pub fn spawn_rls_async(&self) -> RlsHandle { + let rt = Runtime::new().unwrap(); + let cmd = self.rls_cmd(); + let process = ChildProcess::spawn_from_command(cmd).unwrap(); + + self.spawn_rls_with_params(rt, process) + } + + fn spawn_rls_with_params(&self, mut rt: Runtime, transport: T) -> RlsHandle + where + T: AsyncRead + AsyncWrite + 'static, + { + let (finished_reading, reader_closed) = oneshot::channel(); let msgs = Messages::default(); let chans = Channels::default(); - let reader = FramedRead::new(std::io::BufReader::new(stdout), LspDecoder::default()) + let (sink, stream) = LspFramed::from_transport(transport).split(); + + let reader = stream + .timeout(rls_timeout()) .map_err(|_| ()) .for_each({ let msgs = Rc::clone(&msgs); let chans = Rc::clone(&chans); - move |msg| process_msg(msg, msgs.clone(), chans.clone()) + move |msg| Ok(process_msg(msg, msgs.clone(), chans.clone())) }) - .timeout(rls_timeout()); - - let writer = Some(FramedWrite::new(stdin, LspEncoder)); + .and_then(move |_| finished_reading.send(())); + rt.spawn(reader); - let mut rt = Runtime::new().unwrap(); - rt.spawn(reader.map_err(|_| ())); + let sink = Some(sink); - RlsHandle { writer, child, runtime: rt, messages: msgs, channels: chans } + RlsHandle { writer: sink, runtime: rt, reader_closed, messages: msgs, channels: chans } } } -fn process_msg(msg: Value, msgs: Messages, chans: Channels) -> Result<(), ()> { +fn process_msg(msg: Value, msgs: Messages, chans: Channels) { eprintln!("Processing message: {:?}", msg); let mut chans = chans.borrow_mut(); @@ -89,7 +116,9 @@ fn process_msg(msg: Value, msgs: Messages, chans: Channels) -> Result<(), ()> { while idx >= 0 { let (pred, tx) = chans.swap_remove(idx as usize); if pred(&msg) { - tx.send(msg.clone()).map_err(|_| ())?; + // This can error when the receiving end has been deallocated - + // in this case we just have noone to notify and that's okay. + let _ = tx.send(msg.clone()); } else { chans.push((pred, tx)); } @@ -101,18 +130,17 @@ fn process_msg(msg: Value, msgs: Messages, chans: Channels) -> Result<(), ()> { } msgs.borrow_mut().push(msg); - - Ok(()) } -/// Holds the handle to a spawned RLS child process and allows to send and -/// receive messages to and from the process. -pub struct RlsHandle { - /// Asynchronous LSP writer for the spawned process. - writer: Option>, - /// Handle to the spawned child. - child: Child, - /// Tokio single-thread runtime onto which LSP message reading stream has +/// Holds the handle to an RLS connection and allows to send and receive +/// messages to and from the process. +pub struct RlsHandle { + /// Notified when the reader connection is closed. Used when waiting as + /// sanity check, after sending Shutdown request. + reader_closed: oneshot::Receiver<()>, + /// Asynchronous LSP writer. + writer: Option>>, + /// Tokio single-thread runtime onto which LSP message reading task has /// been spawned. Allows to synchronously write messages via `writer` and /// block on received messages matching an enqueued predicate in `channels`. runtime: Runtime, @@ -123,7 +151,7 @@ pub struct RlsHandle { channels: Channels, } -impl RlsHandle { +impl RlsHandle { /// Returns messages received until the moment of the call. pub fn messages(&self) -> Ref> { self.messages.borrow() @@ -234,15 +262,18 @@ impl RlsHandle { lsp_types::PublishDiagnosticsParams::deserialize(&msg["params"]) .unwrap_or_else(|_| panic!("Can't deserialize params: {:?}", msg)) } +} - /// Requests the RLS to shut down and waits (with a timeout) until the child - /// process is terminated. - pub fn shutdown(mut self) { +impl Drop for RlsHandle { + fn drop(&mut self) { self.request::(99999, ()); self.notify::(()); - let fut = self.child.wait_with_output().timeout(rls_timeout()); + // Wait until the underlying connection is closed. + let (_, dummy) = oneshot::channel(); + let reader_closed = std::mem::replace(&mut self.reader_closed, dummy); + let reader_closed = reader_closed.timeout(rls_timeout()); - self.runtime.block_on(fut).unwrap(); + self.runtime.block_on(reader_closed).unwrap(); } }