Skip to content

Run daemon without registry index watcher in local docker #872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 5, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 10 additions & 133 deletions src/utils/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

use crate::{
db::Pool,
docbuilder::RustwideBuilder,
utils::{github_updater, pubsubhubbub, update_release_activity},
utils::{github_updater, queue_builder, update_release_activity},
BuildQueue, Config, DocBuilder, DocBuilderOptions,
};
use chrono::{Timelike, Utc};
use failure::Error;
use log::{debug, error, info, warn};
use std::panic::{catch_unwind, AssertUnwindSafe};
use log::{debug, error, info};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -69,137 +67,16 @@ pub fn start_daemon(
.unwrap();

// build new crates every minute
// REFACTOR: Break this into smaller functions
let cloned_db = db.clone();
let cloned_build_queue = build_queue.clone();
thread::Builder::new().name("build queue reader".to_string()).spawn(move || {
let opts = opts();
let mut doc_builder = DocBuilder::new(opts, cloned_db.clone(), cloned_build_queue.clone());

/// Represents the current state of the builder thread.
enum BuilderState {
/// The builder thread has just started, and hasn't built any crates yet.
Fresh,
/// The builder has just seen an empty build queue.
EmptyQueue,
/// The builder has just seen the lock file.
Locked,
/// The builder has just finished building a crate. The enclosed count is the number of
/// crates built since the caches have been refreshed.
QueueInProgress(usize),
}

let mut builder = RustwideBuilder::init(cloned_db).unwrap();

let mut status = BuilderState::Fresh;

loop {
if !status.is_in_progress() {
thread::sleep(Duration::from_secs(60));
}

// check lock file
if doc_builder.is_locked() {
warn!("Lock file exits, skipping building new crates");
status = BuilderState::Locked;
continue;
}

if status.count() >= 10 {
// periodically, we need to flush our caches and ping the hubs
debug!("10 builds in a row; flushing caches");
status = BuilderState::QueueInProgress(0);

match pubsubhubbub::ping_hubs() {
Err(e) => error!("Failed to ping hub: {}", e),
Ok(n) => debug!("Succesfully pinged {} hubs", n)
}

if let Err(e) = doc_builder.load_cache() {
error!("Failed to load cache: {}", e);
}

if let Err(e) = doc_builder.save_cache() {
error!("Failed to save cache: {}", e);
}
}

// Only build crates if there are any to build
debug!("Checking build queue");
match cloned_build_queue.pending_count() {
Err(e) => {
error!("Failed to read the number of crates in the queue: {}", e);
continue;
}

Ok(0) => {
if status.count() > 0 {
// ping the hubs before continuing
match pubsubhubbub::ping_hubs() {
Err(e) => error!("Failed to ping hub: {}", e),
Ok(n) => debug!("Succesfully pinged {} hubs", n)
}

if let Err(e) = doc_builder.save_cache() {
error!("Failed to save cache: {}", e);
}
}
debug!("Queue is empty, going back to sleep");
status = BuilderState::EmptyQueue;
continue;
}

Ok(queue_count) => {
info!("Starting build with {} crates in queue (currently on a {} crate streak)",
queue_count, status.count());
}
}

// if we're starting a new batch, reload our caches and sources
if !status.is_in_progress() {
if let Err(e) = doc_builder.load_cache() {
error!("Failed to load cache: {}", e);
continue;
}
}

// Run build_packages_queue under `catch_unwind` to catch panics
// This only panicked twice in the last 6 months but its just a better
// idea to do this.
let res = catch_unwind(AssertUnwindSafe(|| {
match doc_builder.build_next_queue_package(&mut builder) {
Err(e) => error!("Failed to build crate from queue: {}", e),
Ok(crate_built) => if crate_built {
status.increment();
}
}
}));

if let Err(e) = res {
error!("GRAVE ERROR Building new crates panicked: {:?}", e);
}
}

impl BuilderState {
fn count(&self) -> usize {
match *self {
BuilderState::QueueInProgress(n) => n,
_ => 0,
}
}

fn is_in_progress(&self) -> bool {
match *self {
BuilderState::QueueInProgress(_) => true,
_ => false,
}
}

fn increment(&mut self) {
*self = BuilderState::QueueInProgress(self.count() + 1);
}
}
}).unwrap();
thread::Builder::new()
.name("build queue reader".to_string())
.spawn(move || {
let doc_builder =
DocBuilder::new(opts(), cloned_db.clone(), cloned_build_queue.clone());
queue_builder(doc_builder, cloned_db, cloned_build_queue).unwrap();
})
.unwrap();

// update release activity everyday at 23:55
let cloned_db = db.clone();
Expand Down
2 changes: 2 additions & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub use self::daemon::start_daemon;
pub use self::github_updater::github_updater;
pub use self::html::extract_head_and_body;
pub use self::queue::{get_crate_priority, remove_crate_priority, set_crate_priority};
pub use self::queue_builder::queue_builder;
pub use self::release_activity_updater::update_release_activity;
pub(crate) use self::rustc_version::parse_rustc_version;

Expand All @@ -19,6 +20,7 @@ mod github_updater;
mod html;
mod pubsubhubbub;
mod queue;
mod queue_builder;
mod release_activity_updater;
mod rustc_version;
pub(crate) mod sized_buffer;
144 changes: 144 additions & 0 deletions src/utils/queue_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use crate::{db::Pool, docbuilder::RustwideBuilder, utils::pubsubhubbub, BuildQueue, DocBuilder};
use failure::Error;
use log::{debug, error, info, warn};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

// TODO: change to `fn() -> Result<!, Error>` when never _finally_ stabilizes
// REFACTOR: Break this into smaller functions
pub fn queue_builder(
mut doc_builder: DocBuilder,
db: Pool,
build_queue: Arc<BuildQueue>,
) -> Result<(), Error> {
/// Represents the current state of the builder thread.
enum BuilderState {
/// The builder thread has just started, and hasn't built any crates yet.
Fresh,
/// The builder has just seen an empty build queue.
EmptyQueue,
/// The builder has just seen the lock file.
Locked,
/// The builder has just finished building a crate. The enclosed count is the number of
/// crates built since the caches have been refreshed.
QueueInProgress(usize),
}

let mut builder = RustwideBuilder::init(db)?;

let mut status = BuilderState::Fresh;

loop {
if !status.is_in_progress() {
thread::sleep(Duration::from_secs(60));
}

// check lock file
if doc_builder.is_locked() {
warn!("Lock file exits, skipping building new crates");
status = BuilderState::Locked;
continue;
}

if status.count() >= 10 {
// periodically, we need to flush our caches and ping the hubs
debug!("10 builds in a row; flushing caches");
status = BuilderState::QueueInProgress(0);

match pubsubhubbub::ping_hubs() {
Err(e) => error!("Failed to ping hub: {}", e),
Ok(n) => debug!("Succesfully pinged {} hubs", n),
}

if let Err(e) = doc_builder.load_cache() {
error!("Failed to load cache: {}", e);
}

if let Err(e) = doc_builder.save_cache() {
error!("Failed to save cache: {}", e);
}
}

// Only build crates if there are any to build
debug!("Checking build queue");
match build_queue.pending_count() {
Err(e) => {
error!("Failed to read the number of crates in the queue: {}", e);
continue;
}

Ok(0) => {
if status.count() > 0 {
// ping the hubs before continuing
match pubsubhubbub::ping_hubs() {
Err(e) => error!("Failed to ping hub: {}", e),
Ok(n) => debug!("Succesfully pinged {} hubs", n),
}

if let Err(e) = doc_builder.save_cache() {
error!("Failed to save cache: {}", e);
}
}
debug!("Queue is empty, going back to sleep");
status = BuilderState::EmptyQueue;
continue;
}

Ok(queue_count) => {
info!(
"Starting build with {} crates in queue (currently on a {} crate streak)",
queue_count,
status.count()
);
}
}

// if we're starting a new batch, reload our caches and sources
if !status.is_in_progress() {
if let Err(e) = doc_builder.load_cache() {
error!("Failed to load cache: {}", e);
continue;
}
}

// Run build_packages_queue under `catch_unwind` to catch panics
// This only panicked twice in the last 6 months but its just a better
// idea to do this.
let res = catch_unwind(AssertUnwindSafe(|| {
match doc_builder.build_next_queue_package(&mut builder) {
Err(e) => error!("Failed to build crate from queue: {}", e),
Ok(crate_built) => {
if crate_built {
status.increment();
}
}
}
}));

if let Err(e) = res {
error!("GRAVE ERROR Building new crates panicked: {:?}", e);
}
}

impl BuilderState {
fn count(&self) -> usize {
match *self {
BuilderState::QueueInProgress(n) => n,
_ => 0,
}
}

fn is_in_progress(&self) -> bool {
match *self {
BuilderState::QueueInProgress(_) => true,
_ => false,
}
}

fn increment(&mut self) {
*self = BuilderState::QueueInProgress(self.count() + 1);
}
}
}