Skip to content

Fix slow sync #1567

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ hyper = { git = "https://github.com/paritytech/hyper", default-features = false
journaldb = { path = "../util/journaldb" }
linked-hash-map = "0.5"
log = "0.4.6"
num_cpus = "1.8"
kvdb = { path = "../util/kvdb" }
kvdb-rocksdb = { path = "../util/kvdb-rocksdb" }
kvdb-memorydb = { path = "../util/kvdb-memorydb" }
Expand Down
2 changes: 1 addition & 1 deletion core/src/client/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl Importer {

/// This is triggered by a message coming from a header queue when the header is ready for insertion
pub fn import_verified_headers(&self, client: &Client) -> usize {
const MAX_HEADERS_TO_IMPORT: usize = 10_000;
const MAX_HEADERS_TO_IMPORT: usize = 1_000;
let lock = self.import_lock.lock();
let headers = self.header_queue.drain(MAX_HEADERS_TO_IMPORT);
self.import_headers(&headers, client, &lock)
Expand Down
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ extern crate kvdb_memorydb;
extern crate kvdb_rocksdb;
extern crate linked_hash_map;
extern crate memorydb;
extern crate num_cpus;
extern crate num_rational;
extern crate primitives;
extern crate rand;
Expand Down
10 changes: 4 additions & 6 deletions core/src/verification/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::{Arc, Condvar as SCondvar, Mutex as SMutex};
use std::thread::{self, JoinHandle};

use cio::IoChannel;
use num_cpus;
use parking_lot::{Mutex, RwLock};
use primitives::{H256, U256};

Expand All @@ -36,8 +35,8 @@ use crate::types::{BlockStatus as Status, VerificationQueueInfo as QueueInfo};
const MIN_MEM_LIMIT: usize = 16384;
const MIN_QUEUE_LIMIT: usize = 512;

// maximum possible number of verification threads.
const MAX_VERIFIERS: usize = 8;
// number of verification threads.
const NUM_VERIFIERS: usize = 2;

/// Type alias for block queue convenience.
pub type BlockQueue = VerificationQueue<kind::Blocks>;
Expand Down Expand Up @@ -150,10 +149,9 @@ impl<K: Kind> VerificationQueue<K> {
let empty = Arc::new(SCondvar::new());
let more_to_verify = Arc::new(SCondvar::new());

let num_verifiers = cmp::min(num_cpus::get(), MAX_VERIFIERS);
let mut verifier_handles = Vec::with_capacity(num_verifiers);
let mut verifier_handles = Vec::with_capacity(NUM_VERIFIERS);

for i in 0..num_verifiers {
for i in 0..NUM_VERIFIERS {
let engine = engine.clone();
let verification = verification.clone();
let more_to_verify = more_to_verify.clone();
Expand Down
21 changes: 17 additions & 4 deletions sync/src/block/downloader/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct HeaderDownloader {
pivot: Pivot,
request_time: Option<Instant>,
downloaded: HashMap<H256, Header>,
queued: HashMap<H256, Header>,
trial: usize,
}

Expand All @@ -69,6 +70,7 @@ impl HeaderDownloader {
},
request_time: None,
downloaded: HashMap::new(),
queued: HashMap::new(),
trial: 0,
}
}
Expand Down Expand Up @@ -100,12 +102,15 @@ impl HeaderDownloader {
self.request_time.map_or(false, |time| (Instant::now() - time).as_secs() > MAX_WAIT)
}

/// Find header from download cache, and then from blockchain
/// Find header from queued headers, downloaded cache and then from blockchain
/// Panics if header dosn't exist
fn pivot_header(&self) -> Header {
match self.downloaded.get(&self.pivot.hash) {
match self.queued.get(&self.pivot.hash) {
Some(header) => header.clone(),
None => self.client.block_header(&BlockId::Hash(self.pivot.hash)).unwrap(),
None => match self.downloaded.get(&self.pivot.hash) {
Some(header) => header.clone(),
None => self.client.block_header(&BlockId::Hash(self.pivot.hash)).unwrap(),
},
}
}

Expand Down Expand Up @@ -173,7 +178,7 @@ impl HeaderDownloader {

pub fn mark_as_imported(&mut self, hashes: Vec<H256>) {
for hash in hashes {
self.downloaded.remove(&hash);
self.queued.remove(&hash);

if self.best_hash == hash {
self.pivot = Pivot {
Expand All @@ -183,4 +188,12 @@ impl HeaderDownloader {
}
}
}

pub fn mark_as_queued(&mut self, hashes: Vec<H256>) {
for hash in hashes {
if let Some(header) = self.downloaded.remove(&hash) {
self.queued.insert(hash, header);
}
}
}
}
7 changes: 6 additions & 1 deletion sync/src/block/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,9 +680,13 @@ impl Extension {
completed.sort_unstable_by_key(EncodedHeader::number);

let mut exists = Vec::new();
let mut queued = Vec::new();

for header in completed {
let hash = header.hash();
match self.client.import_header(header.clone().into_inner()) {
Err(BlockImportError::Import(ImportError::AlreadyInChain)) => exists.push(header.hash()),
Err(BlockImportError::Import(ImportError::AlreadyInChain)) => exists.push(hash),
Err(BlockImportError::Import(ImportError::AlreadyQueued)) => queued.push(hash),
// FIXME: handle import errors
Err(err) => {
cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err);
Expand All @@ -693,6 +697,7 @@ impl Extension {
}

let request = self.header_downloaders.get_mut(from).and_then(|peer| {
peer.mark_as_queued(queued);
peer.mark_as_imported(exists);
peer.create_request()
});
Expand Down