From 6b9dbb5cf316885fcce72f0a784b0bb47153ebb1 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Wed, 7 Aug 2019 09:31:08 -0600 Subject: [PATCH] Move `update-downloads` to a background job This replaces the `update-downloads` binary with a background job, and a binary that is used to queue up a given job, which we will run from Heroku scheduler. This accomplishes 2 things: - It makes it easier to write tasks that need to run periodically (e.g. cleaning up stale rate limit buckets), since we don't need to create a new standalone binary. - `update_downloads` and any future recurring tasks will automatically get monitoring if they fail, since we are already monitoring for background jobs not being successfully run. Right now the intent is to have `enqueue-job update_downloads` get run periodically by Heroku scheudler (and a similar scheduled task for any future tasks that are added). Once swirl gains the ability to schedule jobs to be run at arbitrary points in the future, we could instead have these jobs re-queue themselves once they complete, and have the cron task just look to see if any job is queued for each given type, queuing it if not. That would have a bit less boilerplate, but a lot more complexity. Fixes #1797. --- src/bin/enqueue-job.rs | 17 ++++++++++ src/lib.rs | 1 + src/tasks.rs | 3 ++ .../update_downloads.rs} | 34 ++++++++----------- 4 files changed, 36 insertions(+), 19 deletions(-) create mode 100644 src/bin/enqueue-job.rs create mode 100644 src/tasks.rs rename src/{bin/update-downloads.rs => tasks/update_downloads.rs} (94%) diff --git a/src/bin/enqueue-job.rs b/src/bin/enqueue-job.rs new file mode 100644 index 00000000000..5a3494acdf0 --- /dev/null +++ b/src/bin/enqueue-job.rs @@ -0,0 +1,17 @@ +use cargo_registry::util::{CargoError, CargoResult}; +use cargo_registry::{db, tasks}; +use std::env::args; +use swirl::Job; + +fn main() -> CargoResult<()> { + let conn = db::connect_now()?; + + match &*args().nth(1).unwrap_or_default() { + "update_downloads" => tasks::update_downloads() + .enqueue(&conn) + .map_err(|e| CargoError::from_std_error(e))?, + other => panic!("Unrecognized job type `{}`", other), + }; + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 5b5ca0a94a9..ec774e4dab2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,7 @@ pub mod middleware; mod publish_rate_limit; pub mod render; pub mod schema; +pub mod tasks; mod test_util; pub mod uploaders; pub mod util; diff --git a/src/tasks.rs b/src/tasks.rs new file mode 100644 index 00000000000..930f83bfc11 --- /dev/null +++ b/src/tasks.rs @@ -0,0 +1,3 @@ +mod update_downloads; + +pub use update_downloads::update_downloads; diff --git a/src/bin/update-downloads.rs b/src/tasks/update_downloads.rs similarity index 94% rename from src/bin/update-downloads.rs rename to src/tasks/update_downloads.rs index 3ada76a7106..2bc63a2bac2 100644 --- a/src/bin/update-downloads.rs +++ b/src/tasks/update_downloads.rs @@ -1,25 +1,21 @@ -#![deny(warnings, clippy::all, rust_2018_idioms)] - -#[macro_use] -extern crate diesel; - -use cargo_registry::{ - db, +use crate::{ + background_jobs::Environment, models::VersionDownload, schema::{crates, metadata, version_downloads, versions}, - util::CargoResult, }; use diesel::prelude::*; +use swirl::PerformError; -fn main() -> CargoResult<()> { - let conn = db::connect_now()?; +#[swirl::background_job] +pub fn update_downloads(env: &Environment) -> Result<(), PerformError> { + let conn = env.connection()?; update(&conn)?; Ok(()) } fn update(conn: &PgConnection) -> QueryResult<()> { - use crate::version_downloads::dsl::*; + use self::version_downloads::dsl::*; use diesel::dsl::now; use diesel::select; @@ -84,7 +80,7 @@ fn collect(conn: &PgConnection, rows: &[VersionDownload]) -> QueryResult<()> { #[cfg(test)] mod test { use super::*; - use cargo_registry::{ + use crate::{ env, models::{Crate, NewCrate, NewUser, NewVersion, User, Version}, }; @@ -143,7 +139,7 @@ mod test { .execute(&conn) .unwrap(); - crate::update(&conn).unwrap(); + super::update(&conn).unwrap(); let version_downloads = versions::table .find(version.id) .select(versions::downloads) @@ -154,7 +150,7 @@ mod test { .select(crates::downloads) .first(&conn); assert_eq!(Ok(1), crate_downloads); - crate::update(&conn).unwrap(); + super::update(&conn).unwrap(); let version_downloads = versions::table .find(version.id) .select(versions::downloads) @@ -179,7 +175,7 @@ mod test { )) .execute(&conn) .unwrap(); - crate::update(&conn).unwrap(); + super::update(&conn).unwrap(); let processed = version_downloads::table .filter(version_downloads::version_id.eq(version.id)) .select(version_downloads::processed) @@ -203,7 +199,7 @@ mod test { )) .execute(&conn) .unwrap(); - crate::update(&conn).unwrap(); + super::update(&conn).unwrap(); let processed = version_downloads::table .filter(version_downloads::version_id.eq(version.id)) .select(version_downloads::processed) @@ -253,7 +249,7 @@ mod test { .filter(crates::id.eq(krate.id)) .first::(&conn) .unwrap(); - crate::update(&conn).unwrap(); + super::update(&conn).unwrap(); let version2 = versions::table .find(version.id) .first::(&conn) @@ -266,7 +262,7 @@ mod test { .unwrap(); assert_eq!(krate2.downloads, 2); assert_eq!(krate2.updated_at, krate_before.updated_at); - crate::update(&conn).unwrap(); + super::update(&conn).unwrap(); let version3 = versions::table .find(version.id) .first::(&conn) @@ -301,7 +297,7 @@ mod test { .execute(&conn) .unwrap(); - crate::update(&conn).unwrap(); + super::update(&conn).unwrap(); let versions_changed = versions::table .select(versions::updated_at.ne(now - 2.days())) .get_result(&conn);