diff --git a/Cargo.toml b/Cargo.toml index 84eed9c..8c696d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ tokio = { features = ["fs", "io-util", "macros", "process", "rt", "sync"], versi tracing = "0.1" # We support versions 2, 3 and 4 cap-std-ext = ">= 2.0, <= 4.0" +itertools = "0.14.0" [dev-dependencies] anyhow = "1.0" diff --git a/examples/client.rs b/examples/client.rs index b18ad9b..a8ac375 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -28,13 +28,24 @@ struct GetBlobOpts { size: u64, } +#[derive(clap::Parser, Debug)] +struct FetchContainerToDevNullOpts { + #[clap(flatten)] + metaopts: GetMetadataOpts, + + /// Use the "raw" path for fetching blobs + #[clap(long)] + raw_blobs: bool, +} + /// Simple program to greet a person #[derive(clap::Parser, Debug)] #[command(version, about, long_about = None)] enum Opt { GetMetadata(GetMetadataOpts), GetBlob(GetBlobOpts), - FetchContainerToDevNull(GetMetadataOpts), + GetBlobRaw(GetBlobOpts), + FetchContainerToDevNull(FetchContainerToDevNullOpts), } #[derive(serde::Serialize, Debug)] @@ -86,18 +97,49 @@ async fn get_blob(o: GetBlobOpts) -> Result<()> { Ok(()) } -async fn fetch_container_to_devnull(o: GetMetadataOpts) -> Result<()> { - let config = o.proxy_opts(); +async fn get_blob_raw(o: GetBlobOpts) -> Result<()> { + let proxy = containers_image_proxy::ImageProxy::new().await?; + let img = proxy.open_image(&o.reference).await?; + let (_, mut datafd, err) = proxy.get_raw_blob(&img, &o.digest).await?; + + let mut stdout = std::io::stdout().lock(); + let reader = async move { + let mut buffer = [0u8; 8192]; + loop { + let n = datafd.read(&mut buffer).await?; + if n == 0 { + return anyhow::Ok(()); + } + stdout.write_all(&buffer[..n])?; + } + }; + + let (a, b) = tokio::join!(reader, err); + a?; + b?; + Ok(()) +} + +async fn fetch_container_to_devnull(o: FetchContainerToDevNullOpts) -> Result<()> { + let config = o.metaopts.proxy_opts(); let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?; - let img = &proxy.open_image(&o.reference).await?; + let img = &proxy.open_image(&o.metaopts.reference).await?; let manifest = proxy.fetch_manifest(img).await?.1; for layer in manifest.layers() { - let (mut blob, driver) = proxy.get_descriptor(img, layer).await?; let mut devnull = tokio::io::sink(); - let copier = tokio::io::copy(&mut blob, &mut devnull); - let (copier, driver) = tokio::join!(copier, driver); - copier?; - driver?; + if o.raw_blobs { + let (_, mut blob, err) = proxy.get_raw_blob(img, layer.digest()).await?; + let copier = tokio::io::copy(&mut blob, &mut devnull); + let (copier, err) = tokio::join!(copier, err); + copier?; + err?; + } else { + let (mut blob, driver) = proxy.get_descriptor(img, layer).await?; + let copier = tokio::io::copy(&mut blob, &mut devnull); + let (copier, driver) = tokio::join!(copier, driver); + copier?; + driver?; + } } Ok(()) } @@ -106,6 +148,7 @@ async fn run() -> Result<()> { match Opt::parse() { Opt::GetMetadata(o) => get_metadata(o).await, Opt::GetBlob(o) => get_blob(o).await, + Opt::GetBlobRaw(o) => get_blob_raw(o).await, Opt::FetchContainerToDevNull(o) => fetch_container_to_devnull(o).await, } } diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 53d3b72..d3fe99e 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -6,10 +6,13 @@ use cap_std_ext::prelude::CapStdExtCommandExt; use cap_std_ext::{cap_std, cap_tempfile}; -use futures_util::Future; +use futures_util::{Future, FutureExt}; +use itertools::Itertools; use oci_spec::image::{Descriptor, Digest}; use serde::{Deserialize, Serialize}; use std::fs::File; +use std::iter::FusedIterator; +use std::num::NonZeroU32; use std::ops::Range; use std::os::fd::OwnedFd; use std::os::unix::prelude::CommandExt; @@ -64,6 +67,18 @@ impl Error { } } +/// Errors returned by get_raw_blob +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum GetBlobError { + /// A client may reasonably retry on this type of error. + #[error("retryable error")] + Retryable(Box), + #[error("error")] + /// An unknown other error + Other(Box), +} + impl From for Error { fn from(value: rustix::io::Errno) -> Self { Self::Io(value.into()) @@ -160,6 +175,15 @@ impl std::fmt::Debug for ImageProxy { #[derive(Debug, PartialEq, Eq)] pub struct OpenedImage(u32); +#[derive(Debug, PartialEq, Eq)] +struct PipeId(NonZeroU32); + +impl PipeId { + fn try_new(pipeid: u32) -> Option { + Some(Self(NonZeroU32::new(pipeid)?)) + } +} + /// Configuration for the proxy. #[derive(Debug, Default)] pub struct ImageProxyConfig { @@ -318,6 +342,72 @@ pub struct ConvertedLayerInfo { pub media_type: oci_spec::image::MediaType, } +/// A single fd; requires invoking FinishPipe +#[derive(Debug)] +struct FinishPipe { + pipeid: PipeId, + datafd: OwnedFd, +} + +/// There is a data FD and an error FD. The error FD will be JSON. +#[derive(Debug)] +struct DualFds { + datafd: OwnedFd, + errfd: OwnedFd, +} + +/// Helper trait for parsing the pipeid and/or file descriptors of a reply +trait FromReplyFds: Send + 'static +where + Self: Sized, +{ + fn from_reply( + iterable: impl IntoIterator, + pipeid: u32, + ) -> Result; +} + +/// No file descriptors or pipeid expected +impl FromReplyFds for () { + fn from_reply(fds: impl IntoIterator, pipeid: u32) -> Result { + if fds.into_iter().next().is_some() { + return Err(Error::Other("expected no fds".into())); + } + if pipeid != 0 { + return Err(Error::Other("unexpected pipeid".into())); + } + Ok(()) + } +} + +/// A FinishPipe instance +impl FromReplyFds for FinishPipe { + fn from_reply(fds: impl IntoIterator, pipeid: u32) -> Result { + let Some(pipeid) = PipeId::try_new(pipeid) else { + return Err(Error::Other("Expected pipeid for FinishPipe".into())); + }; + let datafd = fds + .into_iter() + .exactly_one() + .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?; + Ok(Self { pipeid, datafd }) + } +} + +/// A DualFds instance +impl FromReplyFds for DualFds { + fn from_reply(fds: impl IntoIterator, pipeid: u32) -> Result { + if pipeid != 0 { + return Err(Error::Other("Unexpected pipeid with DualFds".into())); + } + let [datafd, errfd] = fds + .into_iter() + .collect_array() + .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?; + Ok(Self { datafd, errfd }) + } +} + impl ImageProxy { /// Create an image proxy that fetches the target image, using default configuration. pub async fn new() -> Result { @@ -354,7 +444,7 @@ impl ImageProxy { }; // Verify semantic version - let protover = r.impl_request::("Initialize", []).await?.0; + let protover: String = r.impl_request("Initialize", [(); 0]).await?; tracing::debug!("Remote protocol version: {protover}"); let protover = semver::Version::parse(protover.as_str())?; // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`. @@ -370,10 +460,11 @@ impl ImageProxy { Ok(r) } - async fn impl_request_raw( + /// Create and send a request. Should only be used by impl_request. + async fn impl_request_raw( sockfd: Arc>, req: Request, - ) -> Result<(T, Option<(OwnedFd, u32)>)> { + ) -> Result<(T, F)> { tracing::trace!("sending request {}", req.method.as_str()); // TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio let r = tokio::task::spawn_blocking(move || { @@ -401,8 +492,7 @@ impl ImageProxy { rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f), _ => None, }) - .flatten() - .next(); + .flatten(); let buf = &buf[..nread]; let reply: Reply = serde_json::from_slice(buf)?; if !reply.success { @@ -411,22 +501,8 @@ impl ImageProxy { error: reply.error.into(), }); } - let fdret = match (fdret, reply.pipeid) { - (Some(fd), n) => { - if n == 0 { - return Err(Error::Other("got fd but no pipeid".into())); - } - Some((fd, n)) - } - (None, n) => { - if n != 0 { - return Err(Error::Other(format!("got no fd with pipeid {}", n).into())); - } - None - } - }; - let reply = serde_json::from_value(reply.value)?; - Ok((reply, fdret)) + let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?; + Ok((serde_json::from_value(reply.value)?, fds)) }) .await .map_err(|e| Error::Other(e.to_string().into()))??; @@ -434,16 +510,17 @@ impl ImageProxy { Ok(r) } + /// Create a request that may return file descriptors, and also check for an unexpected + /// termination of the child process. #[instrument(skip(args))] - async fn impl_request( + async fn impl_request_with_fds< + T: serde::de::DeserializeOwned + Send + 'static, + F: FromReplyFds, + >( &self, method: &str, - args: T, - ) -> Result<(R, Option<(OwnedFd, u32)>)> - where - T: IntoIterator, - I: Into, - { + args: impl IntoIterator>, + ) -> Result<(T, F)> { let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args)); let mut childwait = self.childwait.lock().await; tokio::select! { @@ -456,31 +533,36 @@ impl ImageProxy { } } + /// A synchronous invocation which does not return any file descriptors. + async fn impl_request( + &self, + method: &str, + args: impl IntoIterator>, + ) -> Result { + let (r, ()) = self.impl_request_with_fds(method, args).await?; + Ok(r) + } + #[instrument] - async fn finish_pipe(&self, pipeid: u32) -> Result<()> { + async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> { tracing::debug!("closing pipe"); - let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?; - if fd.is_some() { - return Err(Error::Other("Unexpected fd in finish_pipe reply".into())); - } + let (r, ()) = self + .impl_request_with_fds("FinishPipe", [pipeid.0.get()]) + .await?; Ok(r) } #[instrument] pub async fn open_image(&self, imgref: &str) -> Result { tracing::debug!("opening image"); - let (imgid, _) = self - .impl_request::("OpenImage", [imgref]) - .await?; + let imgid = self.impl_request("OpenImage", [imgref]).await?; Ok(OpenedImage(imgid)) } #[instrument] pub async fn open_image_optional(&self, imgref: &str) -> Result> { tracing::debug!("opening image"); - let (imgid, _) = self - .impl_request::("OpenImageOptional", [imgref]) - .await?; + let imgid = self.impl_request("OpenImageOptional", [imgref]).await?; if imgid == 0 { Ok(None) } else { @@ -490,18 +572,15 @@ impl ImageProxy { #[instrument] pub async fn close_image(&self, img: &OpenedImage) -> Result<()> { - tracing::debug!("closing image"); - let (r, _) = self.impl_request("CloseImage", [img.0]).await?; - Ok(r) + self.impl_request("CloseImage", [img.0]).await } - async fn read_all_fd(&self, fd: Option<(OwnedFd, u32)>) -> Result> { - let (fd, pipeid) = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?; - let fd = tokio::fs::File::from_std(std::fs::File::from(fd)); + async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result> { + let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd)); let mut fd = tokio::io::BufReader::new(fd); let mut r = Vec::new(); let reader = fd.read_to_end(&mut r); - let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipeid)); + let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid)); finish?; assert_eq!(nbytes?, r.len()); Ok(r) @@ -511,8 +590,8 @@ impl ImageProxy { /// The original digest of the unconverted manifest is also returned. /// For more information on OCI manifests, see pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec)> { - let (digest, fd) = self.impl_request("GetManifest", [img.0]).await?; - Ok((digest, self.read_all_fd(fd).await?)) + let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?; + Ok((digest, self.read_finish_pipe(pipefd).await?)) } /// Fetch the manifest. @@ -529,10 +608,8 @@ impl ImageProxy { /// Fetch the config. /// For more information on OCI config, see pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result> { - let (_, fd) = self - .impl_request::<(), _, _>("GetFullConfig", [img.0]) - .await?; - self.read_all_fd(fd).await + let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?; + self.read_finish_pipe(pipe).await } /// Fetch the config. @@ -569,14 +646,62 @@ impl ImageProxy { tracing::debug!("fetching blob"); let args: Vec = vec![img.0.into(), digest.to_string().into(), size.into()]; - let (_bloblen, fd) = self.impl_request::("GetBlob", args).await?; - let (fd, pipeid) = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?; - let fd = tokio::fs::File::from_std(std::fs::File::from(fd)); + let (bloblen, pipe): (u64, FinishPipe) = + self.impl_request_with_fds("GetBlob", args).await?; + let _: u64 = bloblen; + let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd)); let fd = tokio::io::BufReader::new(fd); - let finish = Box::pin(self.finish_pipe(pipeid)); + let finish = Box::pin(self.finish_pipe(pipe.pipeid)); Ok((fd, finish)) } + async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> { + let fd = tokio::fs::File::from_std(std::fs::File::from(fd)); + let mut errfd = tokio::io::BufReader::new(fd); + let mut buf = Vec::new(); + errfd + .read_to_end(&mut buf) + .await + .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?; + if buf.is_empty() { + return Ok(()); + } + #[derive(Deserialize)] + struct RemoteError { + code: String, + message: String, + } + let e: RemoteError = serde_json::from_slice(&buf) + .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?; + match e.code.as_str() { + // Actually this is OK + "EPIPE" => Ok(()), + "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())), + _ => Err(GetBlobError::Other(e.message.into_boxed_str())), + } + } + + /// Fetch a blob identified by e.g. `sha256:`; does not perform + /// any verification that the blob matches the digest. The size of the + /// blob and a pipe file descriptor are returned. + #[instrument] + pub async fn get_raw_blob( + &self, + img: &OpenedImage, + digest: &Digest, + ) -> Result<( + u64, + tokio::fs::File, + impl Future> + Unpin + '_, + )> { + tracing::debug!("fetching blob"); + let args: Vec = vec![img.0.into(), digest.to_string().into()]; + let (bloblen, fds): (u64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?; + let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd)); + let err = Self::read_blob_error(fds.errfd).boxed(); + Ok((bloblen, fd, err)) + } + /// Fetch a descriptor. The requested size and digest are verified (by the proxy process). #[instrument] pub async fn get_descriptor( @@ -599,17 +724,16 @@ impl ImageProxy { ) -> Result>> { tracing::debug!("Getting layer info"); if layer_info_piped_proto_version().matches(&self.protover) { - let (_, fd) = self - .impl_request::<(), _, _>("GetLayerInfoPiped", [img.0]) + let ((), pipe) = self + .impl_request_with_fds("GetLayerInfoPiped", [img.0]) .await?; - let buf = self.read_all_fd(fd).await?; + let buf = self.read_finish_pipe(pipe).await?; return Ok(Some(serde_json::from_slice(&buf)?)); } if !layer_info_proto_version().matches(&self.protover) { return Ok(None); } - let reply = self.impl_request("GetLayerInfo", [img.0]).await?; - let layers: Vec = reply.0; + let layers = self.impl_request("GetLayerInfo", [img.0]).await?; Ok(Some(layers)) } @@ -642,9 +766,12 @@ impl ImageProxy { #[cfg(test)] mod tests { - use std::io::{Seek, Write}; + use std::io::{BufWriter, Seek, Write}; + use std::os::fd::{AsRawFd, OwnedFd}; use super::*; + use cap_std_ext::cap_std::fs::Dir; + use rustix::fs::{memfd_create, MemfdFlags}; fn validate(c: Command, contains: &[&str], not_contains: &[&str]) { // Format via debug, because @@ -745,4 +872,143 @@ mod tests { assert_send_sync(&opened); assert_send_sync(opened); } + + fn generate_err_fd(v: serde_json::Value) -> Result { + let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?; + let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?; + serde_json::to_writer(&mut tf, &v)?; + let mut tf = tf.into_inner().map_err(|e| e.into_error())?; + tf.seek(std::io::SeekFrom::Start(0))?; + let r = tf.into_std().into(); + Ok(r) + } + + #[tokio::test] + async fn test_read_blob_error_retryable() -> Result<()> { + let retryable = serde_json::json!({ + "code": "retryable", + "message": "foo", + }); + let retryable = generate_err_fd(retryable)?; + let err = ImageProxy::read_blob_error(retryable).boxed(); + let e = err.await.unwrap_err(); + match e { + GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"), + _ => panic!("Unexpected error {e:?}"), + } + Ok(()) + } + + #[tokio::test] + async fn test_read_blob_error_none() -> Result<()> { + let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?; + let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std(); + let err = ImageProxy::read_blob_error(tf.into()).boxed(); + err.await.unwrap(); + Ok(()) + } + + #[tokio::test] + async fn test_read_blob_error_other() -> Result<()> { + let other = serde_json::json!({ + "code": "other", + "message": "bar", + }); + let other = generate_err_fd(other)?; + let err = ImageProxy::read_blob_error(other).boxed(); + let e = err.await.unwrap_err(); + match e { + GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"), + _ => panic!("Unexpected error {e:?}"), + } + Ok(()) + } + + #[tokio::test] + async fn test_read_blob_error_epipe() -> Result<()> { + let epipe = serde_json::json!({ + "code": "EPIPE", + "message": "baz", + }); + let epipe = generate_err_fd(epipe)?; + let err = ImageProxy::read_blob_error(epipe).boxed(); + err.await.unwrap(); + Ok(()) + } + + // Helper to create a dummy OwnedFd using memfd_create for testing. + fn create_dummy_fd() -> OwnedFd { + memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap() + } + + #[test] + fn test_new_from_raw_values_finish_pipe() { + let datafd = create_dummy_fd(); + // Keep a raw fd to compare later, as fds_and_pipeid consumes datafd + let raw_datafd_val = datafd.as_raw_fd(); + let fds = vec![datafd]; + let v = FinishPipe::from_reply(fds, 1).unwrap(); + assert_eq!(v.pipeid.0.get(), 1); + assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val); + } + + #[test] + fn test_new_from_raw_values_dual_fds() { + let datafd = create_dummy_fd(); + let errfd = create_dummy_fd(); + let raw_datafd_val = datafd.as_raw_fd(); + let raw_errfd_val = errfd.as_raw_fd(); + let fds = vec![datafd, errfd]; + let v = DualFds::from_reply(fds, 0).unwrap(); + assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val); + assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val); + } + + #[test] + fn test_new_from_raw_values_error_too_many_fds() { + let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()]; + match DualFds::from_reply(fds, 0) { + Ok(v) => unreachable!("{v:?}"), + Err(Error::Other(msg)) => { + assert_eq!(msg.as_ref(), "Expected two fds for DualFds") + } + Err(other) => unreachable!("{other}"), + } + } + + #[test] + fn test_new_from_raw_values_error_fd_with_zero_pipeid() { + let fds = vec![create_dummy_fd()]; + match FinishPipe::from_reply(fds, 0) { + Ok(v) => unreachable!("{v:?}"), + Err(Error::Other(msg)) => { + assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe") + } + Err(other) => unreachable!("{other}"), + } + } + + #[test] + fn test_new_from_raw_values_error_pipeid_with_both_fds() { + let fds = vec![create_dummy_fd(), create_dummy_fd()]; + match DualFds::from_reply(fds, 1) { + Ok(v) => unreachable!("{v:?}"), + Err(Error::Other(msg)) => { + assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds") + } + Err(other) => unreachable!("{other}"), + } + } + + #[test] + fn test_new_from_raw_values_error_no_fd_with_pipeid() { + let fds: Vec = vec![]; + match FinishPipe::from_reply(fds, 1) { + Ok(v) => unreachable!("{v:?}"), + Err(Error::Other(msg)) => { + assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe") + } + Err(other) => unreachable!("{other}"), + } + } }