diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index 489d294cc..ac5482ccf 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -108,6 +108,8 @@ enum CommandLine { value_enum )] repository_stats_updater: Toggle, + #[arg(long = "cdn-invalidator", default_value = "enabled", value_enum)] + cdn_invalidator: Toggle, }, StartBuildServer, @@ -143,10 +145,14 @@ impl CommandLine { } => subcommand.handle_args(ctx, skip_if_exists)?, Self::StartRegistryWatcher { repository_stats_updater, + cdn_invalidator, } => { if repository_stats_updater == Toggle::Enabled { docs_rs::utils::daemon::start_background_repository_stats_updater(&ctx)?; } + if cdn_invalidator == Toggle::Enabled { + docs_rs::utils::daemon::start_background_cdn_invalidator(&ctx)?; + } docs_rs::utils::watch_registry(ctx.build_queue()?, ctx.config()?, ctx.index()?)?; } @@ -586,7 +592,6 @@ impl Context for BinContext { self.pool()?, self.metrics()?, self.config()?, - self.cdn()?, self.storage()?, ); fn storage(self) -> Storage = Storage::new( @@ -595,7 +600,10 @@ impl Context for BinContext { self.config()?, self.runtime()?, )?; - fn cdn(self) -> CdnBackend = CdnBackend::new(&self.config()?, &self.runtime()?); + fn cdn(self) -> CdnBackend = CdnBackend::new( + &self.config()?, + &self.runtime()?, + ); fn config(self) -> Config = Config::from_env()?; fn metrics(self) -> Metrics = Metrics::new()?; fn runtime(self) -> Runtime = { diff --git a/src/build_queue.rs b/src/build_queue.rs index dc14c8cd2..b39f559c2 100644 --- a/src/build_queue.rs +++ b/src/build_queue.rs @@ -1,4 +1,4 @@ -use crate::cdn::{self, CdnBackend}; +use crate::cdn; use crate::db::{delete_crate, Pool}; use crate::docbuilder::PackageKind; use crate::error::Result; @@ -27,7 +27,6 @@ pub(crate) struct QueuedCrate { #[derive(Debug)] pub struct BuildQueue { config: Arc, - cdn: Arc, storage: Arc, pub(crate) db: Pool, metrics: Arc, @@ -39,13 +38,11 @@ impl BuildQueue { db: Pool, metrics: Arc, config: Arc, - cdn: Arc, storage: Arc, ) -> Self { BuildQueue { max_attempts: config.build_attempts.into(), config, - cdn, db, metrics, storage, @@ -191,7 +188,9 @@ impl BuildQueue { ) }); self.metrics.total_builds.inc(); - if let Err(err) = cdn::invalidate_crate(&self.config, &self.cdn, &to_process.name) { + if let Err(err) = + cdn::queue_crate_invalidation(&mut transaction, &self.config, &to_process.name) + { report_error(&err); } @@ -321,7 +320,7 @@ impl BuildQueue { ), Err(err) => report_error(&err), } - if let Err(err) = cdn::invalidate_crate(&self.config, &self.cdn, krate) { + if let Err(err) = cdn::queue_crate_invalidation(&mut *conn, &self.config, krate) { report_error(&err); } continue; @@ -366,7 +365,9 @@ impl BuildQueue { yanked.is_some(), ); - if let Err(err) = cdn::invalidate_crate(&self.config, &self.cdn, &release.name) { + if let Err(err) = + cdn::queue_crate_invalidation(&mut *conn, &self.config, &release.name) + { report_error(&err); } } @@ -570,11 +571,7 @@ mod tests { assert_eq!(metrics.failed_builds.get(), 1); // no invalidations were run since we don't have a distribution id configured - assert!(matches!(*env.cdn(), CdnBackend::Dummy(_))); - if let CdnBackend::Dummy(ref invalidation_requests) = *env.cdn() { - let ir = invalidation_requests.lock().unwrap(); - assert!(ir.is_empty()); - } + assert!(cdn::queued_or_active_crate_invalidations(&mut *env.db().conn())?.is_empty()); Ok(()) }) @@ -593,55 +590,31 @@ mod tests { queue.add_crate("will_succeed", "1.0.0", -1, None)?; queue.add_crate("will_fail", "1.0.0", 0, None)?; - assert!(matches!(*env.cdn(), CdnBackend::Dummy(_))); - if let CdnBackend::Dummy(ref invalidation_requests) = *env.cdn() { - let ir = invalidation_requests.lock().unwrap(); - assert!(ir.is_empty()); - } + let mut conn = env.db().conn(); + cdn::queued_or_active_crate_invalidations(&mut *conn)?.is_empty(); queue.process_next_crate(|krate| { assert_eq!("will_succeed", krate.name); Ok(()) })?; - if let CdnBackend::Dummy(ref invalidation_requests) = *env.cdn() { - let ir = invalidation_requests.lock().unwrap(); - assert_eq!( - *ir, - [ - ("distribution_id_web".into(), "/will_succeed*".into()), - ("distribution_id_web".into(), "/crate/will_succeed*".into()), - ( - "distribution_id_static".into(), - "/rustdoc/will_succeed*".into() - ), - ] - ); - } + + let queued_invalidations = cdn::queued_or_active_crate_invalidations(&mut *conn)?; + assert_eq!(queued_invalidations.len(), 3); + assert!(queued_invalidations + .iter() + .all(|i| i.krate == "will_succeed")); queue.process_next_crate(|krate| { assert_eq!("will_fail", krate.name); anyhow::bail!("simulate a failure"); })?; - if let CdnBackend::Dummy(ref invalidation_requests) = *env.cdn() { - let ir = invalidation_requests.lock().unwrap(); - assert_eq!( - *ir, - [ - ("distribution_id_web".into(), "/will_succeed*".into()), - ("distribution_id_web".into(), "/crate/will_succeed*".into()), - ( - "distribution_id_static".into(), - "/rustdoc/will_succeed*".into() - ), - ("distribution_id_web".into(), "/will_fail*".into()), - ("distribution_id_web".into(), "/crate/will_fail*".into()), - ( - "distribution_id_static".into(), - "/rustdoc/will_fail*".into() - ), - ] - ); - } + + let queued_invalidations = cdn::queued_or_active_crate_invalidations(&mut *conn)?; + assert_eq!(queued_invalidations.len(), 6); + assert!(queued_invalidations + .iter() + .skip(3) + .all(|i| i.krate == "will_fail")); Ok(()) }) diff --git a/src/cdn.rs b/src/cdn.rs index ee3ac8e4d..de68dc032 100644 --- a/src/cdn.rs +++ b/src/cdn.rs @@ -1,17 +1,24 @@ -use crate::Config; -use anyhow::{Context, Error, Result}; +use crate::{utils::report_error, Config}; +use anyhow::{anyhow, Context, Error, Result}; use aws_sdk_cloudfront::{ config::retry::RetryConfig, model::{InvalidationBatch, Paths}, Client, Region, }; use chrono::{DateTime, Utc}; +use futures_util::StreamExt; use serde::Serialize; use std::sync::{Arc, Mutex}; use strum::EnumString; use tokio::runtime::Runtime; +use tracing::{debug, info, instrument, warn}; use uuid::Uuid; +/// maximum amout of parallel in-progress wildcard invalidations +/// The actual limit is 15, but we want to keep some room for manually +/// triggered invalidations +const MAX_CLOUDFRONT_WILDCARD_INVALIDATIONS: i32 = 13; + #[derive(Debug, EnumString)] pub(crate) enum CdnKind { #[strum(ascii_case_insensitive)] @@ -21,9 +28,18 @@ pub(crate) enum CdnKind { CloudFront, } +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct CdnInvalidation { + pub(crate) distribution_id: String, + pub(crate) invalidation_id: String, + pub(crate) path_patterns: Vec, +} + #[derive(Debug)] pub enum CdnBackend { - Dummy(Arc>>), + Dummy { + invalidation_requests: Arc>>, + }, CloudFront { runtime: Arc, client: Client, @@ -46,7 +62,9 @@ impl CdnBackend { client: Client::from_conf(config_builder.build()), } } - CdnKind::Dummy => Self::Dummy(Arc::new(Mutex::new(Vec::new()))), + CdnKind::Dummy => Self::Dummy { + invalidation_requests: Arc::new(Mutex::new(Vec::new())), + }, } } /// create a Front invalidation request for a list of path patterns. @@ -59,50 +77,173 @@ impl CdnBackend { /// /// Returns the caller reference that can be used to query the status of this /// invalidation request. - pub(crate) fn create_invalidation( + #[instrument] + fn create_invalidation( &self, distribution_id: &str, path_patterns: &[&str], - ) -> Result { + ) -> Result { let caller_reference = Uuid::new_v4(); match *self { CdnBackend::CloudFront { ref runtime, ref client, + .. } => { - runtime.block_on(CdnBackend::cloudfront_invalidation( + let id = runtime.block_on(CdnBackend::create_cloudfront_invalidation( client, distribution_id, &caller_reference.to_string(), path_patterns, ))?; + Ok(CdnInvalidation { + distribution_id: distribution_id.to_owned(), + invalidation_id: id, + path_patterns: path_patterns.iter().cloned().map(str::to_owned).collect(), + }) } - CdnBackend::Dummy(ref invalidation_requests) => { + CdnBackend::Dummy { + ref invalidation_requests, + .. + } => { let mut invalidation_requests = invalidation_requests .lock() .expect("could not lock mutex on dummy CDN"); - invalidation_requests.extend( - path_patterns - .iter() - .map(|p| (distribution_id.to_owned(), (*p).to_owned())), + let invalidation = CdnInvalidation { + distribution_id: distribution_id.to_owned(), + invalidation_id: caller_reference.to_string(), + path_patterns: path_patterns.iter().cloned().map(str::to_owned).collect(), + }; + + invalidation_requests.push(invalidation.clone()); + Ok(invalidation) + } + } + } + + #[cfg(test)] + fn clear_active_invalidations(&self) { + match self { + CdnBackend::Dummy { + invalidation_requests, + .. + } => { + invalidation_requests + .lock() + .expect("could not lock mutex on dummy CDN") + .clear(); + } + CdnBackend::CloudFront { .. } => unreachable!(), + } + } + + fn active_invalidations(&self, distribution_id: &str) -> Result, Error> { + match self { + CdnBackend::Dummy { + invalidation_requests, + .. + } => { + let invalidation_requests = invalidation_requests + .lock() + .expect("could not lock mutex on dummy CDN"); + + Ok(invalidation_requests + .iter() + .filter(|i| i.distribution_id == distribution_id) + .cloned() + .collect()) + } + CdnBackend::CloudFront { + runtime, client, .. + } => Ok( + runtime.block_on(CdnBackend::list_active_cloudfront_invalidations( + client, + distribution_id, + ))?, + ), + } + } + + #[instrument] + async fn list_active_cloudfront_invalidations( + client: &Client, + distribution_id: &str, + ) -> Result, Error> { + let mut stream = client + .list_invalidations() + .distribution_id(distribution_id) + .into_paginator() + .items() + .send(); + + let mut ids: Vec = Vec::new(); + while let Some(invalidation) = stream.next().await { + let invalidation = invalidation?; + + if let (Some(id), Some(status)) = (invalidation.id(), invalidation.status()) { + // the `ListInvalidation` AWS API doesn't support filtering, so we + // have to query all invalidations and filter here. + if status == "InProgress" { + ids.push(id.to_owned()); + } else if status != "Completed" { + report_error(&anyhow!( + "got unknown cloudfront invalidation status: {} in {:?}", + status, + invalidation + )); + } + } + } + + let mut result = Vec::new(); + for id in ids { + let response = client + .get_invalidation() + .distribution_id(distribution_id) + .id(id.clone()) + .send() + .await?; + + let mut patterns = Vec::new(); + if let Some(invalidation) = response.invalidation() { + if let Some(batch) = invalidation.invalidation_batch() { + if let Some(paths) = batch.paths() { + if let Some(items) = paths.items() { + patterns.extend_from_slice(items); + } + } + } + } + if patterns.is_empty() { + warn!( + id, + ?response, + "got invalidation detail response without paths" ); } + + result.push(CdnInvalidation { + distribution_id: distribution_id.to_owned(), + invalidation_id: id, + path_patterns: patterns, + }); } - Ok(caller_reference) + Ok(result) } - async fn cloudfront_invalidation( + #[instrument] + async fn create_cloudfront_invalidation( client: &Client, distribution_id: &str, caller_reference: &str, path_patterns: &[&str], - ) -> Result<(), Error> { + ) -> Result { let path_patterns: Vec<_> = path_patterns.iter().cloned().map(String::from).collect(); - client + Ok(client .create_invalidation() .distribution_id(distribution_id) .invalidation_batch( @@ -117,60 +258,188 @@ impl CdnBackend { .build(), ) .send() - .await?; + .await? + .invalidation() + .ok_or_else(|| { + anyhow!("missing invalidation information in create-invalidation result") + })? + .id() + .ok_or_else(|| anyhow!("missing invalidation ID in create-invalidation result"))? + .to_owned()) + } +} - Ok(()) +#[instrument(skip(conn))] +pub(crate) fn handle_queued_invalidation_requests( + cdn: &CdnBackend, + conn: &mut impl postgres::GenericClient, + distribution_id: &str, +) -> Result<()> { + info!("handling queued CDN invalidations"); + + let active_invalidations = cdn + .active_invalidations(distribution_id) + .context("error querying active invalidations")?; + + // for now we assume all invalidation paths are wildcard invalidations, + // so we apply the wildcard limit. + let active_path_invalidations: usize = active_invalidations + .iter() + .map(|i| i.path_patterns.len()) + .sum(); + + debug!( + active_invalidations = active_invalidations.len(), + active_path_invalidations, "found active invalidations", + ); + + // remove the invalidation from the queue when they are completed. + // We're only looking at InProgress invalidations, + // we don't differentiate between `Completed` ones, and invalidations + // missing in the CloudFront `ListInvalidations` response. + conn.execute( + "DELETE FROM cdn_invalidation_queue + WHERE + cdn_distribution_id = $1 AND + created_in_cdn IS NOT NULL AND + NOT (cdn_reference = ANY($2))", + &[ + &distribution_id, + &active_invalidations + .iter() + .map(|i| i.invalidation_id.clone()) + .collect::>(), + ], + )?; + + let possible_path_invalidations: i32 = + MAX_CLOUDFRONT_WILDCARD_INVALIDATIONS - active_path_invalidations as i32; + + if possible_path_invalidations <= 0 { + info!( + active_path_invalidations, + "too many active cloudfront wildcard invalidations \ + will not create a new one." + ); + return Ok(()); + } + + // create new an invalidation for the queued path patterns + let mut transaction = conn.transaction()?; + let mut path_patterns: Vec = Vec::new(); + let mut queued_entry_ids: Vec = Vec::new(); + + for row in transaction.query( + "SELECT id, path_pattern + FROM cdn_invalidation_queue + WHERE cdn_distribution_id = $1 AND created_in_cdn IS NULL + ORDER BY queued, id + LIMIT $2 + FOR UPDATE", + &[&distribution_id, &(possible_path_invalidations as i64)], + )? { + queued_entry_ids.push(row.get("id")); + path_patterns.push(row.get("path_pattern")); + } + + if path_patterns.is_empty() { + info!("no queued path patterns to invalidate, going back to sleep"); + return Ok(()); + } + + match cdn + .create_invalidation( + distribution_id, + &path_patterns.iter().map(String::as_str).collect::>(), + ) + .context("error creating new invalidation") + { + Ok(invalidation) => { + transaction.execute( + "UPDATE cdn_invalidation_queue + SET + created_in_cdn = CURRENT_TIMESTAMP, + cdn_reference = $1 + WHERE + id = ANY($2)", + &[&invalidation.invalidation_id, &queued_entry_ids], + )?; + transaction.commit()?; + } + Err(err) => return Err(err), } + + Ok(()) } -pub(crate) fn invalidate_crate(config: &Config, cdn: &CdnBackend, name: &str) -> Result<()> { +#[instrument(skip(conn, config))] +pub(crate) fn queue_crate_invalidation( + conn: &mut impl postgres::GenericClient, + config: &Config, + name: &str, +) -> Result<()> { + let mut add = |distribution_id: &str, path_patterns: &[&str]| -> Result<()> { + for pattern in path_patterns { + debug!(distribution_id, pattern, "enqueueing web CDN invalidation"); + conn.execute( + "INSERT INTO cdn_invalidation_queue (crate, cdn_distribution_id, path_pattern) + VALUES ($1, $2, $3)", + &[&name, &distribution_id, pattern], + )?; + } + Ok(()) + }; if let Some(distribution_id) = config.cloudfront_distribution_id_web.as_ref() { - cdn.create_invalidation( + add( distribution_id, &[&format!("/{}*", name), &format!("/crate/{}*", name)], ) - .context("error creating web CDN invalidation")?; + .context("error enqueueing web CDN invalidation")?; } if let Some(distribution_id) = config.cloudfront_distribution_id_static.as_ref() { - cdn.create_invalidation(distribution_id, &[&format!("/rustdoc/{}*", name)]) - .context("error creating static CDN invalidation")?; + add(distribution_id, &[&format!("/rustdoc/{}*", name)]) + .context("error enqueueing static CDN invalidation")?; } Ok(()) } -#[derive(Debug, Clone, Serialize, PartialEq, Eq)] -pub(crate) struct CrateInvalidation { - pub name: String, - pub created: DateTime, +#[derive(Debug, Clone, Serialize, PartialEq, Eq, Default)] +pub(crate) struct QueuedInvalidation { + pub krate: String, + pub cdn_distribution_id: String, + pub path_pattern: String, + pub queued: DateTime, + pub created_in_cdn: Option>, + pub cdn_reference: Option, } -/// Return fake active cloudfront invalidations. -/// CloudFront invalidations can take up to 15 minutes. Until we have -/// live queries of the invalidation status we just assume it's fine -/// 20 minutes after the build. -/// TODO: should be replaced be keeping track or querying the active invalidation from CloudFront -pub(crate) fn active_crate_invalidations( - conn: &mut postgres::Client, -) -> Result> { +/// Return which crates have queued or active cloudfront invalidations. +pub(crate) fn queued_or_active_crate_invalidations( + conn: &mut impl postgres::GenericClient, +) -> Result> { Ok(conn .query( r#" SELECT - crates.name, - MIN(builds.build_time) as build_time - FROM crates - INNER JOIN releases ON crates.id = releases.crate_id - INNER JOIN builds ON releases.id = builds.rid - WHERE builds.build_time >= CURRENT_TIMESTAMP - INTERVAL '20 minutes' - GROUP BY crates.name - ORDER BY MIN(builds.build_time)"#, + crate, + cdn_distribution_id, + path_pattern, + queued, + created_in_cdn, + cdn_reference + FROM cdn_invalidation_queue + ORDER BY queued, id"#, &[], )? .iter() - .map(|row| CrateInvalidation { - name: row.get(0), - created: row.get(1), + .map(|row| QueuedInvalidation { + krate: row.get("crate"), + cdn_distribution_id: row.get("cdn_distribution_id"), + path_pattern: row.get("path_pattern"), + queued: row.get("queued"), + created_in_cdn: row.get("created_in_cdn"), + cdn_reference: row.get("cdn_reference"), }) .collect()) } @@ -178,14 +447,13 @@ pub(crate) fn active_crate_invalidations( #[cfg(test)] mod tests { use super::*; - use crate::test::{wrapper, FakeBuild}; + use crate::test::wrapper; use aws_sdk_cloudfront::{Client, Config, Credentials, Region}; use aws_smithy_client::{ erase::DynConnector, http_connector::HttpConnector, test_connection::TestConnection, }; use aws_smithy_http::body::SdkBody; - use chrono::{Duration, Timelike}; #[test] fn create_cloudfront() { @@ -224,20 +492,201 @@ mod tests { config.cloudfront_distribution_id_web = Some("distribution_id_web".into()); config.cloudfront_distribution_id_static = Some("distribution_id_static".into()); }); - invalidate_crate(&env.config(), &env.cdn(), "krate")?; - - assert!(matches!(*env.cdn(), CdnBackend::Dummy(_))); - if let CdnBackend::Dummy(ref invalidation_requests) = *env.cdn() { - let ir = invalidation_requests.lock().unwrap(); - assert_eq!( - *ir, - [ - ("distribution_id_web".into(), "/krate*".into()), - ("distribution_id_web".into(), "/crate/krate*".into()), - ("distribution_id_static".into(), "/rustdoc/krate*".into()), - ] - ); + + let cdn = env.cdn(); + let mut conn = env.db().conn(); + assert!(queued_or_active_crate_invalidations(&mut *conn)?.is_empty()); + + queue_crate_invalidation(&mut *conn, &env.config(), "krate")?; + + // invalidation paths are queued. + assert_eq!( + queued_or_active_crate_invalidations(&mut *conn)? + .into_iter() + .map(|i| ( + i.cdn_distribution_id, + i.krate, + i.path_pattern, + i.cdn_reference + )) + .collect::>(), + vec![ + ( + "distribution_id_web".into(), + "krate".into(), + "/krate*".into(), + None + ), + ( + "distribution_id_web".into(), + "krate".into(), + "/crate/krate*".into(), + None + ), + ( + "distribution_id_static".into(), + "krate".into(), + "/rustdoc/krate*".into(), + None + ), + ] + ); + + // queueing the invalidation doesn't create it in the CDN + assert!(cdn.active_invalidations("distribution_id_web")?.is_empty()); + assert!(cdn + .active_invalidations("distribution_id_static")? + .is_empty()); + + // now handle the queued invalidations + handle_queued_invalidation_requests(&env.cdn(), &mut *conn, "distribution_id_web")?; + handle_queued_invalidation_requests(&env.cdn(), &mut *conn, "distribution_id_static")?; + + // which creates them in the CDN + { + let ir_web = cdn.active_invalidations("distribution_id_web")?; + assert_eq!(ir_web.len(), 1); + assert_eq!(ir_web[0].path_patterns, vec!["/krate*", "/crate/krate*"]); + + let ir_static = cdn.active_invalidations("distribution_id_static")?; + assert_eq!(ir_web.len(), 1); + assert_eq!(ir_static[0].path_patterns, vec!["/rustdoc/krate*"]); } + + // the queued entries got a CDN reference attached + assert!(queued_or_active_crate_invalidations(&mut *conn)? + .iter() + .all(|i| i.cdn_reference.is_some() && i.created_in_cdn.is_some())); + + // clear the active invalidations in the CDN to _fake_ them + // being completed on the CDN side. + cdn.clear_active_invalidations(); + + // now handle again + handle_queued_invalidation_requests(&env.cdn(), &mut *conn, "distribution_id_web")?; + handle_queued_invalidation_requests(&env.cdn(), &mut *conn, "distribution_id_static")?; + + // which removes them from the queue table + assert!(queued_or_active_crate_invalidations(&mut *conn)?.is_empty()); + + Ok(()) + }); + } + + #[test] + fn only_add_some_invalidations_when_too_many_are_active() { + crate::test::wrapper(|env| { + env.override_config(|config| { + config.cloudfront_distribution_id_web = Some("distribution_id_web".into()); + }); + + let cdn = env.cdn(); + + // create an invalidation with 15 paths, so we're over the limit + cdn.create_invalidation( + "distribution_id_web", + &(0..(MAX_CLOUDFRONT_WILDCARD_INVALIDATIONS - 1)) + .map(|_| "/something*") + .collect::>(), + )?; + + let mut conn = env.db().conn(); + assert!(queued_or_active_crate_invalidations(&mut *conn)?.is_empty()); + + // queue an invalidation + queue_crate_invalidation(&mut *conn, &env.config(), "krate")?; + + // handle the queued invalidations + handle_queued_invalidation_requests(&env.cdn(), &mut *conn, "distribution_id_web")?; + + // only one path was added to the CDN + let q = queued_or_active_crate_invalidations(&mut *conn)?; + assert_eq!(q.iter().filter(|i| i.cdn_reference.is_some()).count(), 1); + + // old invalidation is still active, new one is added + let ir_web = cdn.active_invalidations("distribution_id_web")?; + assert_eq!(ir_web.len(), 2); + assert_eq!(ir_web[0].path_patterns.len(), 12); + assert_eq!(ir_web[1].path_patterns.len(), 1); + + Ok(()) + }); + } + + #[test] + fn dont_create_invalidations_when_too_many_are_active() { + crate::test::wrapper(|env| { + env.override_config(|config| { + config.cloudfront_distribution_id_web = Some("distribution_id_web".into()); + }); + + let cdn = env.cdn(); + + // create an invalidation with 15 paths, so we're over the limit + cdn.create_invalidation( + "distribution_id_web", + &(0..15).map(|_| "/something*").collect::>(), + )?; + + let mut conn = env.db().conn(); + assert!(queued_or_active_crate_invalidations(&mut *conn)?.is_empty()); + + // queue an invalidation + queue_crate_invalidation(&mut *conn, &env.config(), "krate")?; + + // handle the queued invalidations + handle_queued_invalidation_requests(&env.cdn(), &mut *conn, "distribution_id_web")?; + + // nothing was added to the CDN + assert!(queued_or_active_crate_invalidations(&mut *conn)? + .iter() + .all(|i| i.cdn_reference.is_none())); + + // old invalidations are still active + let ir_web = cdn.active_invalidations("distribution_id_web")?; + assert_eq!(ir_web.len(), 1); + assert_eq!(ir_web[0].path_patterns.len(), 15); + + // clear the active invalidations in the CDN to _fake_ them + // being completed on the CDN side. + cdn.clear_active_invalidations(); + + // now handle again + handle_queued_invalidation_requests(&env.cdn(), &mut *conn, "distribution_id_web")?; + + // which adds the CDN reference + assert!(queued_or_active_crate_invalidations(&mut *conn)? + .iter() + .all(|i| i.cdn_reference.is_some())); + + // and creates them in the CDN too + let ir_web = cdn.active_invalidations("distribution_id_web")?; + assert_eq!(ir_web.len(), 1); + assert_eq!(ir_web[0].path_patterns, vec!["/krate*", "/crate/krate*"]); + + Ok(()) + }); + } + + #[test] + fn dont_create_invalidations_without_paths() { + crate::test::wrapper(|env| { + env.override_config(|config| { + config.cloudfront_distribution_id_web = Some("distribution_id_web".into()); + }); + + let cdn = env.cdn(); + + let mut conn = env.db().conn(); + // no invalidation is queued + assert!(queued_or_active_crate_invalidations(&mut *conn)?.is_empty()); + + // run the handler + handle_queued_invalidation_requests(&env.cdn(), &mut *conn, "distribution_id_web")?; + + // no invalidation was created + assert!(cdn.active_invalidations("distribution_id_web")?.is_empty()); + Ok(()) }); } @@ -261,56 +710,6 @@ mod tests { Config::new(&cfg) } - #[test] - fn get_active_invalidations() { - wrapper(|env| { - let now = Utc::now().with_nanosecond(0).unwrap(); - let past_deploy = now - Duration::minutes(21); - let first_running_deploy = now - Duration::minutes(10); - let second_running_deploy = now; - - env.fake_release() - .name("krate_2") - .version("0.0.1") - .builds(vec![FakeBuild::default().build_time(first_running_deploy)]) - .create()?; - - env.fake_release() - .name("krate_2") - .version("0.0.2") - .builds(vec![FakeBuild::default().build_time(second_running_deploy)]) - .create()?; - - env.fake_release() - .name("krate_1") - .version("0.0.2") - .builds(vec![FakeBuild::default().build_time(second_running_deploy)]) - .create()?; - - env.fake_release() - .name("krate_1") - .version("0.0.3") - .builds(vec![FakeBuild::default().build_time(past_deploy)]) - .create()?; - - assert_eq!( - active_crate_invalidations(&mut env.db().conn())?, - vec![ - CrateInvalidation { - name: "krate_2".into(), - created: first_running_deploy, - }, - CrateInvalidation { - name: "krate_1".into(), - created: second_running_deploy, - } - ] - ); - - Ok(()) - }) - } - #[tokio::test] async fn invalidate_path() { let conn = TestConnection::new(vec![( @@ -348,7 +747,7 @@ mod tests { )]); let client = Client::from_conf(get_mock_config(DynConnector::new(conn.clone())).await); - CdnBackend::cloudfront_invalidation( + CdnBackend::create_cloudfront_invalidation( &client, "some_distribution", "some_reference", diff --git a/src/db/migrate.rs b/src/db/migrate.rs index 2fb9ba824..47ef6a034 100644 --- a/src/db/migrate.rs +++ b/src/db/migrate.rs @@ -859,7 +859,29 @@ pub fn migrate(version: Option, conn: &mut Client) -> crate::error::Res ALTER TABLE owners ADD COLUMN name VARCHAR(255); ", ), - + sql_migration!( + context, 37, "add cdn-invalidation-queue table", + " + CREATE TABLE cdn_invalidation_queue ( + id BIGSERIAL, + crate VARCHAR(255) NOT NULL, + cdn_distribution_id VARCHAR(255) NOT NULL, + path_pattern text NOT NULL, + queued TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + created_in_cdn TIMESTAMP WITH TIME ZONE, + cdn_reference VARCHAR(255) + ); + CREATE INDEX cdn_invalidation_queue_crate_idx ON cdn_invalidation_queue (crate); + CREATE INDEX cdn_invalidation_queue_cdn_reference_idx ON cdn_invalidation_queue (cdn_reference); + CREATE INDEX cdn_invalidation_queue_created_in_cdn_idx ON cdn_invalidation_queue (created_in_cdn); + ", + " + DROP INDEX cdn_invalidation_queue_crate_idx; + DROP INDEX cdn_invalidation_queue_cdn_reference_idx; + DROP INDEX cdn_invalidation_queue_created_in_cdn_idx; + DROP TABLE cdn_invalidation_queue; + " + ), ]; for migration in migrations { diff --git a/src/lib.rs b/src/lib.rs index 7ee70ab65..6ff3d7e46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ pub mod db; mod docbuilder; mod error; pub mod index; -mod metrics; +pub mod metrics; pub mod repositories; pub mod storage; #[cfg(test)] diff --git a/src/test/fakes.rs b/src/test/fakes.rs index fc562f616..d7ae25192 100644 --- a/src/test/fakes.rs +++ b/src/test/fakes.rs @@ -38,7 +38,6 @@ pub(crate) struct FakeRelease<'a> { pub(crate) struct FakeBuild { s3_build_log: Option, db_build_log: Option, - build_time: Option>, result: BuildResult, } @@ -459,12 +458,6 @@ impl FakeGithubStats { } impl FakeBuild { - pub(crate) fn build_time(self, build_time: impl Into>) -> Self { - Self { - build_time: Some(build_time.into()), - ..self - } - } pub(crate) fn rustc_version(self, rustc_version: impl Into) -> Self { Self { result: BuildResult { @@ -532,13 +525,6 @@ impl FakeBuild { )?; } - if let Some(build_time) = self.build_time.as_ref() { - conn.query( - "UPDATE builds SET build_time = $2 WHERE id = $1", - &[&build_id, &build_time], - )?; - } - if let Some(s3_build_log) = self.s3_build_log.as_deref() { let path = format!("build-logs/{}/{}.txt", build_id, default_target); storage.store_one(path, s3_build_log)?; @@ -553,7 +539,6 @@ impl Default for FakeBuild { Self { s3_build_log: Some("It works!".into()), db_build_log: None, - build_time: None, result: BuildResult { rustc_version: "rustc 2.0.0-nightly (000000000 1970-01-01)".into(), docsrs_version: "docs.rs 1.0.0 (000000000 1970-01-01)".into(), diff --git a/src/test/mod.rs b/src/test/mod.rs index 1bf90f52c..f60e2a238 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -321,7 +321,6 @@ impl TestEnvironment { self.db().pool(), self.metrics(), self.config(), - self.cdn(), self.storage(), )) }) diff --git a/src/utils/daemon.rs b/src/utils/daemon.rs index 93d616664..ae6309260 100644 --- a/src/utils/daemon.rs +++ b/src/utils/daemon.rs @@ -3,6 +3,7 @@ //! This daemon will start web server, track new packages and build them use crate::{ + cdn, utils::{queue_builder, report_error}, web::start_web_server, BuildQueue, Config, Context, Index, RustwideBuilder, @@ -95,6 +96,33 @@ pub fn start_background_repository_stats_updater(context: &dyn Context) -> Resul Ok(()) } +pub fn start_background_cdn_invalidator(context: &dyn Context) -> Result<(), Error> { + let cdn = context.cdn()?; + let config = context.config()?; + let pool = context.pool()?; + + if config.cloudfront_distribution_id_web.is_none() + && config.cloudfront_distribution_id_static.is_none() + { + info!("no cloudfront distribution IDs found, skipping background cdn invalidation"); + return Ok(()); + } + + cron("cdn invalidator", Duration::from_secs(60), move || { + let mut conn = pool.get()?; + if let Some(distribution_id) = config.cloudfront_distribution_id_web.as_ref() { + cdn::handle_queued_invalidation_requests(&cdn, &mut *conn, distribution_id) + .context("error handling queued invalidations for web CDN invalidation")?; + } + if let Some(distribution_id) = config.cloudfront_distribution_id_static.as_ref() { + cdn::handle_queued_invalidation_requests(&cdn, &mut *conn, distribution_id) + .context("error handling queued invalidations for static CDN invalidation")?; + } + Ok(()) + })?; + Ok(()) +} + pub fn start_daemon( context: C, enable_registry_watcher: bool, @@ -125,6 +153,7 @@ pub fn start_daemon( .unwrap(); start_background_repository_stats_updater(&*context)?; + start_background_cdn_invalidator(&*context)?; // NOTE: if a error occurred earlier in `start_daemon`, the server will _not_ be joined - // instead it will get killed when the process exits. diff --git a/src/web/releases.rs b/src/web/releases.rs index 9f4dbe43d..555187055 100644 --- a/src/web/releases.rs +++ b/src/web/releases.rs @@ -2,7 +2,7 @@ use crate::{ build_queue::QueuedCrate, - cdn::{self, CrateInvalidation}, + cdn, db::Pool, impl_axum_webpage, utils::{report_error, spawn_blocking}, @@ -21,7 +21,7 @@ use axum::{ use chrono::{DateTime, NaiveDate, Utc}; use postgres::Client; use serde::{Deserialize, Serialize}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::str; use std::sync::Arc; use tracing::{debug, warn}; @@ -694,7 +694,7 @@ pub(crate) async fn activity_handler( struct BuildQueuePage { description: &'static str, queue: Vec, - active_deployments: Vec, + active_deployments: Vec, } impl_axum_webpage! { @@ -715,7 +715,19 @@ pub(crate) async fn build_queue_handler( } let mut conn = pool.get()?; - Ok((queue, cdn::active_crate_invalidations(&mut conn)?)) + let mut active_deployments: Vec<_> = cdn::queued_or_active_crate_invalidations(&mut *conn)? + .into_iter() + .map(|i| i.krate) + .collect(); + + // deduplicate the list of crates while keeping their order + let mut set = HashSet::new(); + active_deployments.retain(|k| set.insert(k.clone())); + + // reverse the list, so the oldest comes first + active_deployments.reverse(); + + Ok((queue, active_deployments)) }) .await?; @@ -731,8 +743,7 @@ mod tests { use super::*; use crate::index::api::CrateOwner; use crate::test::{ - assert_redirect, assert_redirect_unchecked, assert_success, wrapper, FakeBuild, - TestFrontend, + assert_redirect, assert_redirect_unchecked, assert_success, wrapper, TestFrontend, }; use anyhow::Error; use chrono::{Duration, TimeZone}; @@ -1366,15 +1377,13 @@ mod tests { #[test] fn test_deployment_queue() { wrapper(|env| { + env.override_config(|config| { + config.cloudfront_distribution_id_web = Some("distribution_id_web".into()); + }); + let web = env.frontend(); - env.fake_release() - .name("krate_2") - .version("0.0.1") - .builds(vec![ - FakeBuild::default().build_time(Utc::now() - Duration::minutes(10)) - ]) - .create()?; + cdn::queue_crate_invalidation(&mut *env.db().conn(), &env.config(), "krate_2")?; let empty = kuchiki::parse_html().one(web.get("/releases/queue").send()?.text()?); assert!(empty diff --git a/templates/releases/build_queue.html b/templates/releases/build_queue.html index 1f0b8accb..73834dce3 100644 --- a/templates/releases/build_queue.html +++ b/templates/releases/build_queue.html @@ -18,10 +18,10 @@
    - {% for invalidation in active_deployments -%} + {% for krate in active_deployments -%}
  1. - - {{ invalidation.name }} + + {{ krate }}
  2. {%- endfor %}