diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index feb518b0f..d1c1f65d1 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -12,9 +12,9 @@ use std::path::PathBuf; use clap::{Arg, App, SubCommand}; use cratesfyi::{DocBuilder, DocBuilderOptions, db}; -use cratesfyi::utils::build_doc; +use cratesfyi::utils::{build_doc, add_crate_to_queue}; use cratesfyi::start_web_server; -use cratesfyi::db::add_path_into_database; +use cratesfyi::db::{add_path_into_database, connect_db}; pub fn main() { @@ -130,6 +130,23 @@ pub fn main() { chart") .subcommand(SubCommand::with_name("update-search-index")) .about("Updates search index")) + .subcommand(SubCommand::with_name("queue") + .about("Interactions with the build queue") + .subcommand(SubCommand::with_name("add") + .about("Add a crate to the build queue") + .arg(Arg::with_name("CRATE_NAME") + .index(1) + .required(true) + .help("Name of crate to build")) + .arg(Arg::with_name("CRATE_VERSION") + .index(2) + .required(true) + .help("Version of crate to build")) + .arg(Arg::with_name("BUILD_PRIORITY") + .short("p") + .long("priority") + .help("Priority of build (default: 5) (new crate builds get priority 0)") + .takes_value(true)))) .get_matches(); @@ -227,6 +244,17 @@ pub fn main() { start_web_server(Some(matches.value_of("SOCKET_ADDR").unwrap_or("0.0.0.0:3000"))); } else if let Some(_) = matches.subcommand_matches("daemon") { cratesfyi::utils::start_daemon(); + } else if let Some(matches) = matches.subcommand_matches("queue") { + if let Some(matches) = matches.subcommand_matches("add") { + let priority = matches.value_of("BUILD_PRIORITY").unwrap_or("5"); + let priority: i32 = priority.parse().expect("--priority was not a number"); + let conn = connect_db().expect("Could not connect to database"); + + add_crate_to_queue(&conn, + matches.value_of("CRATE_NAME").unwrap(), + matches.value_of("CRATE_VERSION").unwrap(), + priority).expect("Could not add crate to queue"); + } } else { println!("{}", matches.usage()); } diff --git a/src/db/migrate.rs b/src/db/migrate.rs index 073d4d004..80fff1b07 100644 --- a/src/db/migrate.rs +++ b/src/db/migrate.rs @@ -169,6 +169,16 @@ pub fn migrate(version: Option) -> CratesfyiResult<()> { "DROP TABLE authors, author_rels, keyword_rels, keywords, owner_rels, owners, releases, crates, builds, queue, files, config;" ), + migration!( + // version + 2, + // description + "Added priority column to build queue", + // upgrade query + "ALTER TABLE queue ADD COLUMN priority INT DEFAULT 0;", + // downgrade query + "ALTER TABLE queue DROP COLUMN priority;" + ), ]; for migration in migrations { diff --git a/src/docbuilder/mod.rs b/src/docbuilder/mod.rs index 3fe748abd..635fe2d55 100644 --- a/src/docbuilder/mod.rs +++ b/src/docbuilder/mod.rs @@ -110,6 +110,11 @@ impl DocBuilder { Ok(()) } + /// Checks for the lock file and returns whether it currently exists. + pub fn is_locked(&self) -> bool { + self.lock_path().exists() + } + /// Returns a reference of options pub fn options(&self) -> &DocBuilderOptions { &self.options diff --git a/src/docbuilder/queue.rs b/src/docbuilder/queue.rs index 23fdc7757..ae07e0a5e 100644 --- a/src/docbuilder/queue.rs +++ b/src/docbuilder/queue.rs @@ -5,67 +5,74 @@ use super::DocBuilder; use db::connect_db; use error::Result; use crates_index_diff::{ChangeKind, Index}; +use utils::add_crate_to_queue; impl DocBuilder { /// Updates crates.io-index repository and adds new crates into build queue. /// Returns size of queue - pub fn get_new_crates(&mut self) -> Result { + pub fn get_new_crates(&mut self) -> Result { let conn = try!(connect_db()); let index = try!(Index::from_path_or_cloned(&self.options.crates_io_index_path)); let mut changes = try!(index.fetch_changes()); + let mut add_count: usize = 0; // I belive this will fix ordering of queue if we get more than one crate from changes changes.reverse(); for krate in changes.iter().filter(|k| k.kind != ChangeKind::Yanked) { - conn.execute("INSERT INTO queue (name, version) VALUES ($1, $2)", - &[&krate.name, &krate.version]) - .ok(); + add_crate_to_queue(&conn, &krate.name, &krate.version, 0).ok(); debug!("{}-{} added into build queue", krate.name, krate.version); + add_count += 1; } - let queue_count = conn.query("SELECT COUNT(*) FROM queue WHERE attempt < 5", &[]) + Ok(add_count) + } + + pub fn get_queue_count(&self) -> Result { + let conn = try!(connect_db()); + Ok(conn.query("SELECT COUNT(*) FROM queue WHERE attempt < 5", &[]) .unwrap() .get(0) - .get(0); - - Ok(queue_count) + .get(0)) } - - /// Builds packages from queue - pub fn build_packages_queue(&mut self) -> Result { + /// Builds the top package from the queue. Returns whether the queue was empty. + pub fn build_next_queue_package(&mut self) -> Result { let conn = try!(connect_db()); - let mut build_count = 0; - for row in &try!(conn.query("SELECT id, name, version + let query = try!(conn.query("SELECT id, name, version FROM queue WHERE attempt < 5 - ORDER BY id ASC", - &[])) { - let id: i32 = row.get(0); - let name: String = row.get(1); - let version: String = row.get(2); - - match self.build_package(&name[..], &version[..]) { - Ok(_) => { - build_count += 1; - let _ = conn.execute("DELETE FROM queue WHERE id = $1", &[&id]); - } - Err(e) => { - // Increase attempt count - let _ = conn.execute("UPDATE queue SET attempt = attempt + 1 WHERE id = $1", - &[&id]); - error!("Failed to build package {}-{} from queue: {}", - name, - version, - e) - } + ORDER BY priority ASC, attempt ASC, id ASC + LIMIT 1", + &[])); + + if query.is_empty() { + // nothing in the queue; bail + return Ok(false); + } + + let id: i32 = query.get(0).get(0); + let name: String = query.get(0).get(1); + let version: String = query.get(0).get(2); + + match self.build_package(&name[..], &version[..]) { + Ok(_) => { + let _ = conn.execute("DELETE FROM queue WHERE id = $1", &[&id]); + } + Err(e) => { + // Increase attempt count + let _ = conn.execute("UPDATE queue SET attempt = attempt + 1 WHERE id = $1", + &[&id]); + error!("Failed to build package {}-{} from queue: {}", + name, + version, + e) } } - Ok(build_count) + Ok(true) } } @@ -87,14 +94,4 @@ mod test { } assert!(res.is_ok()); } - - - #[test] - #[ignore] - fn test_build_packages_queue() { - let _ = env_logger::try_init(); - let options = DocBuilderOptions::from_prefix(PathBuf::from("../cratesfyi-prefix")); - let mut docbuilder = DocBuilder::new(options); - assert!(docbuilder.build_packages_queue().is_ok()); - } } diff --git a/src/utils/daemon.rs b/src/utils/daemon.rs index 0b1e25c45..aa9c7147a 100644 --- a/src/utils/daemon.rs +++ b/src/utils/daemon.rs @@ -4,6 +4,7 @@ use std::{env, thread}; +use std::panic::{catch_unwind, AssertUnwindSafe}; use std::process::exit; use std::fs::File; use std::io::Write; @@ -44,80 +45,159 @@ pub fn start_daemon() { exit(0); } - // check new crates every minute thread::spawn(move || { + // space this out to prevent it from clashing against the queue-builder thread on launch + thread::sleep(Duration::from_secs(30)); loop { + let opts = opts(); + let mut doc_builder = DocBuilder::new(opts); + + debug!("Checking new crates"); + match doc_builder.get_new_crates() { + Ok(n) => debug!("{} crates added to queue", n), + Err(e) => error!("Failed to get new crates: {}", e), + } + thread::sleep(Duration::from_secs(60)); + } + }); + + // build new crates every minute + thread::spawn(move || { + let opts = opts(); + let mut doc_builder = DocBuilder::new(opts); + + /// Represents the current state of the builder thread. + enum BuilderState { + /// The builder thread has just started, and hasn't built any crates yet. + Fresh, + /// The builder has just seen an empty build queue. + EmptyQueue, + /// The builder has just seen the lock file. + Locked, + /// The builder has just finished building a crate. The enclosed count is the number of + /// crates built since the caches have been refreshed. + QueueInProgress(usize), + } - let mut opts = opts(); - opts.skip_if_exists = true; + let mut status = BuilderState::Fresh; + + loop { + if !status.is_in_progress() { + thread::sleep(Duration::from_secs(60)); + } // check lock file - if opts.prefix.join("cratesfyi.lock").exists() { + if doc_builder.is_locked() { warn!("Lock file exits, skipping building new crates"); + status = BuilderState::Locked; continue; } - let mut doc_builder = DocBuilder::new(opts); + if status.count() >= 10 { + // periodically, we need to flush our caches and ping the hubs + debug!("10 builds in a row; flushing caches"); + status = BuilderState::QueueInProgress(0); - debug!("Checking new crates"); - let queue_count = match doc_builder.get_new_crates() { - Ok(size) => size, - Err(e) => { - error!("Failed to get new crates: {}", e); - continue; + match pubsubhubbub::ping_hubs() { + Err(e) => error!("Failed to ping hub: {}", e), + Ok(n) => debug!("Succesfully pinged {} hubs", n) } - }; - // Only build crates if there is any - if queue_count == 0 { - debug!("Queue is empty, going back to sleep"); - continue; - } + if let Err(e) = doc_builder.load_cache() { + error!("Failed to load cache: {}", e); + } - info!("Building {} crates from queue", queue_count); + if let Err(e) = doc_builder.save_cache() { + error!("Failed to save cache: {}", e); + } - // update index - if let Err(e) = update_sources() { - error!("Failed to update sources: {}", e); - continue; + if let Err(e) = update_sources() { + error!("Failed to update sources: {}", e); + continue; + } } - if let Err(e) = doc_builder.load_cache() { - error!("Failed to load cache: {}", e); - continue; + // Only build crates if there are any to build + debug!("Checking build queue"); + match doc_builder.get_queue_count() { + Err(e) => { + error!("Failed to read the number of crates in the queue: {}", e); + continue; + } + Ok(0) => { + if status.count() > 0 { + // ping the hubs before continuing + match pubsubhubbub::ping_hubs() { + Err(e) => error!("Failed to ping hub: {}", e), + Ok(n) => debug!("Succesfully pinged {} hubs", n) + } + + if let Err(e) = doc_builder.save_cache() { + error!("Failed to save cache: {}", e); + } + } + debug!("Queue is empty, going back to sleep"); + status = BuilderState::EmptyQueue; + continue; + } + Ok(queue_count) => { + info!("Starting build with {} crates in queue (currently on a {} crate streak)", + queue_count, status.count()); + } } + // if we're starting a new batch, reload our caches and sources + if !status.is_in_progress() { + if let Err(e) = doc_builder.load_cache() { + error!("Failed to load cache: {}", e); + continue; + } + + if let Err(e) = update_sources() { + error!("Failed to update sources: {}", e); + continue; + } + } - // Run build_packages_queue in it's own thread to catch panics + // Run build_packages_queue under `catch_unwind` to catch panics // This only panicked twice in the last 6 months but its just a better // idea to do this. - let res = thread::spawn(move || { - match doc_builder.build_packages_queue() { - Err(e) => error!("Failed build new crates: {}", e), - Ok(n) => { - if n > 0 { - match pubsubhubbub::ping_hubs() { - Err(e) => error!("Failed to ping hub: {}", e), - Ok(n) => debug!("Succesfully pinged {} hubs", n) - } - } - } - } - - if let Err(e) = doc_builder.save_cache() { - error!("Failed to save cache: {}", e); + let res = catch_unwind(AssertUnwindSafe(|| { + match doc_builder.build_next_queue_package() { + Err(e) => error!("Failed to build crate from queue: {}", e), + Ok(crate_built) => if crate_built { + status.increment(); } - debug!("Finished building new crates, going back to sleep"); - }) - .join(); + } + })); if let Err(e) = res { error!("GRAVE ERROR Building new crates panicked: {:?}", e); } } + + impl BuilderState { + fn count(&self) -> usize { + match *self { + BuilderState::QueueInProgress(n) => n, + _ => 0, + } + } + + fn is_in_progress(&self) -> bool { + match *self { + BuilderState::QueueInProgress(_) => true, + _ => false, + } + } + + fn increment(&mut self) { + *self = BuilderState::QueueInProgress(self.count() + 1); + } + } }); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index c0db8296c..6f03bc9ac 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -8,6 +8,7 @@ pub use self::release_activity_updater::update_release_activity; pub use self::daemon::start_daemon; pub use self::rustc_version::{parse_rustc_version, get_current_versions, command_result}; pub use self::html::extract_head_and_body; +pub use self::queue::add_crate_to_queue; mod github_updater; mod build_doc; @@ -17,3 +18,4 @@ mod daemon; mod pubsubhubbub; mod rustc_version; mod html; +mod queue; diff --git a/src/utils/queue.rs b/src/utils/queue.rs new file mode 100644 index 000000000..8cd691fbe --- /dev/null +++ b/src/utils/queue.rs @@ -0,0 +1,10 @@ +//! Utilities for interacting with the build queue + +use postgres::Connection; +use error::Result; + +pub fn add_crate_to_queue(conn: &Connection, name: &str, version: &str, priority: i32) -> Result<()> { + try!(conn.execute("INSERT INTO queue (name, version, priority) VALUES ($1, $2, $3)", + &[&name, &version, &priority])); + Ok(()) +} diff --git a/src/web/releases.rs b/src/web/releases.rs index 1605e19c5..aa46bd8ad 100644 --- a/src/web/releases.rs +++ b/src/web/releases.rs @@ -565,7 +565,7 @@ pub fn build_queue_handler(req: &mut Request) -> IronResult { for krate in &conn.query("SELECT name, version FROM queue WHERE attempt < 5 - ORDER BY id ASC", + ORDER BY priority ASC, attempt ASC, id ASC", &[]) .unwrap() { crates.push((krate.get(0), krate.get(1)));