Skip to content

Commit 124af15

Browse files
committed
Added CommandStatus and CommandResult structs to make it easier to handle command results
1 parent 65e4de3 commit 124af15

File tree

5 files changed

+211
-131
lines changed

5 files changed

+211
-131
lines changed

core/examples/basic.rs

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,45 +13,50 @@ use intecture_api::prelude::*;
1313
use tokio_core::reactor::Core;
1414

1515
fn main() {
16-
// These two lines are part of `tokio-core` and can be safely
17-
// ignored. So long as they appear at the top of your code,
18-
// all is fine with the world.
16+
// These two lines are part of `tokio-core` and can be safely ignored. So
17+
// long as they appear at the top of your code, all is fine with the world.
1918
let mut core = Core::new().unwrap();
2019
let handle = core.handle();
2120

22-
// Here's the meat of your project. In this example we're talking
23-
// to our local machine, so we use the `Local` host type.
21+
// Here's the meat of your project. In this example we're talking to our
22+
// local machine, so we use the `Local` host type.
2423
let host = Local::new(&handle).and_then(|host| {
25-
// Ok, we're in! Now we can pass our `host` handle to other
26-
// endpoints, which informs them of the server we mean to
27-
// talk to.
24+
// Ok, we're in! Now we can pass our `host` handle to other endpoints,
25+
// which informs them of the server we mean to talk to.
2826

2927
// Let's start with something basic - a shell command.
3028
let cmd = Command::new(&host, "whoami", None);
31-
cmd.exec().and_then(|(stream, status)| {
32-
// At this point, our command is running. As the API
33-
// is asynchronous, we don't have to wait for it to
34-
// finish before inspecting its output. This is called
35-
// "streaming".
36-
37-
// Our first argument, `stream`, is a stream of strings,
38-
// each of which represents a line of output. We can use
39-
// the `for_each` combinator to print these lines to
40-
// stdout.
29+
cmd.exec().and_then(|mut status| {
30+
// At this point, our command is running. As the API is
31+
// asynchronous, we don't have to wait for it to finish before
32+
// inspecting its output. This is called "streaming".
33+
34+
// First let's grab the stream from `CommandStatus`. This stream is
35+
// a stream of strings, each of which represents a line of command
36+
// output. We can use the `for_each` combinator to print these
37+
// lines to stdout.
4138
//
42-
// If printing isn't your thing, you are also
43-
// free to lick them or whatever you're into. I'm not
44-
// here to judge.
45-
stream.for_each(|line| { println!("{}", line); Ok(()) })
39+
// If printing isn't your thing, you are also free to lick them or
40+
// whatever you're into. I'm not here to judge.
41+
let stream = status.take_stream()
42+
.unwrap() // Unwrap is fine here as we haven't called it before
43+
.for_each(|line| { println!("{}", line); Ok(()) });
4644

47-
// The second argument is a `Future` that represents the
48-
// command's exit status. Let's print that too*.
45+
// Next, let's check on the result of our command.
46+
// `CommandStatus` is a `Future` that represents the command's
47+
// exit status. We can use the `map` combinator to print it out.*
4948
//
5049
// * Same caveat as above RE: printing. This is a safe
5150
// place.
52-
.join(status.map(|s| println!("This command {} {}",
53-
if s.success { "succeeded" } else { "failed" },
54-
if let Some(e) = s.code { format!("with code {}", e) } else { String::new() })))
51+
let status = status.map(|s| println!("This command {} {}",
52+
if s.success { "succeeded" } else { "failed" },
53+
if let Some(e) = s.code { format!("with code {}", e) } else { String::new() }));
54+
55+
// Finally, we need to return these two `Future`s (stream and
56+
// status) so that they will be executed by the event loop. Sadly
57+
// we can't return them both as a tuple, so we use the join
58+
// combinator instead to turn them into a single `Future`. Easy!
59+
stream.join(status)
5560
})
5661
});
5762

core/src/command/mod.rs

Lines changed: 125 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
pub mod providers;
1313

1414
use errors::*;
15-
use futures::Future;
15+
use futures::{future, Future, Poll};
1616
use futures::stream::Stream;
1717
use futures::sync::oneshot;
1818
use host::Host;
@@ -27,11 +27,6 @@ const DEFAULT_SHELL: [&'static str; 2] = ["/bin/sh", "-c"];
2727
#[cfg(windows)]
2828
const DEFAULT_SHELL: [&'static str; 1] = ["yeah...we don't currently support windows :("];
2929

30-
pub type ExecResult = Box<Future<Item = (
31-
Box<Stream<Item = String, Error = Error>>,
32-
Box<Future<Item = ExitStatus, Error = Error>>
33-
), Error = Error>>;
34-
3530
/// Represents a shell command to be executed on a host.
3631
///
3732
///## Examples
@@ -54,13 +49,20 @@ pub type ExecResult = Box<Future<Item = (
5449
///let host = Local::new(&handle).wait().unwrap();
5550
///
5651
///let cmd = Command::new(&host, "ls /path/to/foo", None);
57-
///let result = cmd.exec().and_then(|(stream, status)| {
52+
///let result = cmd.exec().and_then(|mut status| {
5853
/// // Print the command's stdout/stderr to stdout
59-
/// stream.for_each(|line| { println!("{}", line); Ok(()) })
60-
/// // When it's ready, also print the exit status
54+
/// status.take_stream().unwrap()
55+
/// .for_each(|line| { println!("{}", line); Ok(()) })
56+
/// // On its own, the stream will not do anything, so we need to make
57+
/// // sure it gets returned along with the status future. `join()` will
58+
/// // mash the two together so we can return them as one.
6159
/// .join(status.map(|s| println!("This command {} {}",
6260
/// if s.success { "succeeded" } else { "failed" },
63-
/// if let Some(e) = s.code { format!("with code {}", e) } else { String::new() })))
61+
/// if let Some(e) = s.code {
62+
/// format!("with code {}", e)
63+
/// } else {
64+
/// String::new()
65+
/// })))
6466
///});
6567
///
6668
///core.run(result).unwrap();
@@ -71,13 +73,13 @@ pub type ExecResult = Box<Future<Item = (
7173
/// this as you could run out of memory on your heap if the output buffer is
7274
/// too big.
7375
///
74-
///```
76+
///```no_run
7577
///extern crate futures;
7678
///extern crate intecture_api;
7779
///extern crate tokio_core;
7880
///
79-
///use futures::{future, Future, Stream};
80-
///use intecture_api::errors::Error;
81+
///use futures::Future;
82+
///use intecture_api::errors::*;
8183
///use intecture_api::prelude::*;
8284
///use tokio_core::reactor::Core;
8385
///
@@ -88,14 +90,20 @@ pub type ExecResult = Box<Future<Item = (
8890
///let host = Local::new(&handle).wait().unwrap();
8991
///
9092
///let cmd = Command::new(&host, "ls /path/to/foo", None);
91-
///let result = cmd.exec().and_then(|(stream, _)| {
92-
/// // Concatenate the buffer into a `String`
93-
/// stream.fold(String::new(), |mut acc, line| {
94-
/// acc.push_str(&line);
95-
/// future::ok::<_, Error>(acc)
96-
/// })
93+
///let result = cmd.exec().and_then(|status| {
94+
/// status.result().unwrap()
9795
/// .map(|_output| {
98-
/// // The binding `output` is our accumulated buffer
96+
/// // Our command finished successfully. Now we can do something
97+
/// // with our output here.
98+
/// })
99+
/// .map_err(|e| {
100+
/// // Our command errored out. Let's grab the output and see what
101+
/// // went wrong.
102+
/// match *e.kind() {
103+
/// ErrorKind::Command(ref output) => println!("Oh noes! {}", output),
104+
/// _ => unreachable!(),
105+
/// }
106+
/// e
99107
/// })
100108
///});
101109
///
@@ -122,12 +130,10 @@ pub type ExecResult = Box<Future<Item = (
122130
///let host = Local::new(&handle).wait().unwrap();
123131
///
124132
///let cmd = Command::new(&host, "ls /path/to/foo", None);
125-
///let result = cmd.exec().and_then(|(stream, status)| {
126-
/// // Discard the buffer
127-
/// stream.for_each(|_| Ok(()))
128-
/// .join(status.map(|_status| {
129-
/// // Enjoy the status, baby...
130-
/// }))
133+
///let result = cmd.exec().and_then(|mut status| {
134+
/// status.map(|_exit_status| {
135+
/// // Enjoy the status, baby!
136+
/// })
131137
///});
132138
///
133139
///core.run(result).unwrap();
@@ -139,6 +145,20 @@ pub struct Command<H: Host> {
139145
cmd: Vec<String>,
140146
}
141147

148+
/// Represents the status of a running `Command`, including the output stream
149+
/// and exit status.
150+
pub struct CommandStatus {
151+
stream: Option<Box<Stream<Item = String, Error = Error>>>,
152+
exit_status: Option<Box<Future<Item = ExitStatus, Error = Error>>>,
153+
}
154+
155+
/// Represents the exit status of a `Command` as a `Result`-like `Future`. If
156+
/// the command succeeded, the command output is returned. If it failed, an
157+
/// error containing the command's output is returned.
158+
pub struct CommandResult {
159+
inner: Box<Future<Item = String, Error = Error>>,
160+
}
161+
142162
/// The status of a finished command.
143163
///
144164
/// This is a serializable replica of
@@ -231,42 +251,33 @@ impl<H: Host + 'static> Command<H> {
231251
///
232252
/// This is the error you'll see if you prematurely drop the output `Stream`
233253
/// while trying to resolve the `Future<Item = ExitStatus, ...>`.
234-
pub fn exec(&self) -> ExecResult {
254+
pub fn exec(&self) -> Box<Future<Item = CommandStatus, Error = Error>> {
235255
let request = Request::CommandExec(self.provider.as_ref().map(|p| p.name()), self.cmd.clone());
236256
Box::new(self.host.request(request)
237257
.chain_err(|| ErrorKind::Request { endpoint: "Command", func: "exec" })
238258
.map(|msg| {
239-
parse_body_stream(msg)
259+
CommandStatus::new(msg)
240260
}))
241261
}
242262
}
243263

244-
// Abstract this logic so other endpoints can parse CommandProvider::exec()
245-
// streams too.
246-
#[doc(hidden)]
247-
pub fn parse_body_stream(mut msg: Message<Response, Body<Vec<u8>, io::Error>>) ->
248-
(
249-
Box<Stream<Item = String, Error = Error>>,
250-
Box<Future<Item = ExitStatus, Error = Error>>
251-
)
252-
{
253-
let (tx, rx) = oneshot::channel::<ExitStatus>();
254-
let mut tx_share = Some(tx);
255-
let mut found = false;
256-
(
257-
Box::new(msg.take_body()
264+
impl CommandStatus {
265+
#[doc(hidden)]
266+
pub fn new(mut msg: Message<Response, Body<Vec<u8>, io::Error>>) -> CommandStatus {
267+
let (tx, rx) = oneshot::channel::<ExitStatus>();
268+
let mut tx = Some(tx);
269+
let stream = msg.take_body()
258270
.expect("Command::exec reply missing body stream")
259271
.filter_map(move |v| {
260272
let s = String::from_utf8_lossy(&v).to_string();
261273

262274
// @todo This is a heuristical approach which is fallible
263-
if !found && s.starts_with("ExitStatus:") {
275+
if s.starts_with("ExitStatus:") {
264276
let (_, json) = s.split_at(11);
265277
match serde_json::from_str(json) {
266278
Ok(status) => {
267279
// @todo What should happen if this fails?
268-
let _ = tx_share.take().unwrap().send(status);
269-
found = true;
280+
let _ = tx.take().unwrap().send(status);
270281
return None;
271282
},
272283
_ => (),
@@ -275,9 +286,73 @@ pub fn parse_body_stream(mut msg: Message<Response, Body<Vec<u8>, io::Error>>) -
275286

276287
Some(s)
277288
})
278-
.then(|r| r.chain_err(|| "Command execution failed"))
279-
) as Box<Stream<Item = String, Error = Error>>,
280-
Box::new(rx.chain_err(|| "Buffer dropped before ExitStatus was sent"))
281-
as Box<Future<Item = ExitStatus, Error = Error>>
282-
)
289+
.then(|r| r.chain_err(|| "Command execution failed"));
290+
291+
let exit_status = rx.chain_err(|| "Buffer dropped before ExitStatus was sent");
292+
293+
CommandStatus {
294+
stream: Some(Box::new(stream)),
295+
exit_status: Some(Box::new(exit_status)),
296+
}
297+
}
298+
299+
/// Take ownership of the output stream.
300+
///
301+
/// The stream is guaranteed to be present only if this is the first call
302+
/// to `take_stream()` and the future has not yet been polled.
303+
pub fn take_stream(&mut self) -> Option<Box<Stream<Item = String, Error = Error>>> {
304+
self.stream.take()
305+
}
306+
307+
/// Convert this to a `CommandResult`, which returns the output string on
308+
/// success and an error containing the command's output on failure. If the
309+
/// stream has already been taken by `take_stream()` then this function
310+
/// will return `None`.
311+
///
312+
/// Note that "success" is determined by examining the `ExitStatus::success`
313+
/// bool. See `ExitStatus` docs for details.
314+
pub fn result(self) -> Option<CommandResult> {
315+
if let Some(stream) = self.stream {
316+
let inner = stream.fold(String::new(), |mut acc, line| {
317+
acc.push_str(&line);
318+
future::ok::<_, Error>(acc)
319+
})
320+
.join(self.exit_status.unwrap())
321+
.and_then(|(output, status)| if status.success {
322+
future::ok(output)
323+
} else {
324+
future::err(ErrorKind::Command(output).into())
325+
});
326+
327+
Some(CommandResult {
328+
inner: Box::new(inner) as Box<Future<Item = String, Error = Error>>
329+
})
330+
} else {
331+
None
332+
}
333+
}
334+
}
335+
336+
impl Future for CommandStatus {
337+
type Item = ExitStatus;
338+
type Error = Error;
339+
340+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
341+
if let Some(stream) = self.stream.take() {
342+
self.exit_status = Some(Box::new(stream.for_each(|_| Ok(()))
343+
.join(self.exit_status.take().unwrap())
344+
.map(|(_, status)| status)));
345+
}
346+
347+
self.exit_status.as_mut().unwrap().poll()
348+
}
349+
}
350+
351+
impl Future for CommandResult {
352+
type Item = String;
353+
type Error = Error;
354+
355+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
356+
self.inner.poll()
357+
}
283358
}

core/src/errors.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ error_chain! {
1717
}
1818

1919
errors {
20+
Command(out: String) {
21+
description("Command returned non-zero exit code"),
22+
display("Command returned non-zero exit code with output: {}", out),
23+
}
24+
2025
InvalidTelemetryKey {
2126
cmd: &'static str,
2227
key: String,

0 commit comments

Comments
 (0)