Skip to content

Remove unnecessary lock in VoteCollector #1797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 21 additions & 28 deletions core/src/consensus/tendermint/vote_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::iter::Iterator;

use ckey::SchnorrSignature;
use parking_lot::RwLock;
use primitives::H256;
use rlp::{Encodable, RlpStream};

Expand All @@ -29,7 +28,7 @@ use crate::consensus::BitSet;
/// Storing all Proposals, Prevotes and Precommits.
#[derive(Debug)]
pub struct VoteCollector {
votes: RwLock<BTreeMap<VoteStep, StepCollector>>,
votes: BTreeMap<VoteStep, StepCollector>,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -113,29 +112,27 @@ impl Default for VoteCollector {
// Insert dummy entry to fulfill invariant: "only messages newer than the oldest are inserted".
collector.insert(Default::default(), Default::default());
VoteCollector {
votes: RwLock::new(collector),
votes: collector,
}
}
}

impl VoteCollector {
/// Insert vote if it is newer than the oldest one.
pub fn vote(&self, message: ConsensusMessage) -> Option<DoubleVote> {
self.votes.write().entry(*message.round()).or_insert_with(Default::default).insert(message)
pub fn vote(&mut self, message: ConsensusMessage) -> Option<DoubleVote> {
self.votes.entry(*message.round()).or_insert_with(Default::default).insert(message)
}

/// Checks if the message should be ignored.
pub fn is_old_or_known(&self, message: &ConsensusMessage) -> bool {
let read_guard = self.votes.read();

let is_known = read_guard.get(&message.round()).map_or(false, |c| c.messages.contains(message));
let is_known = self.votes.get(&message.round()).map_or(false, |c| c.messages.contains(message));
if is_known {
cdebug!(ENGINE, "Known message: {:?}.", message);
return true
}

// The reason not using `message.round() <= oldest` is to allow precommit messages on Commit step.
let is_old = read_guard.keys().next().map_or(true, |oldest| message.round() < oldest);
let is_old = self.votes.keys().next().map_or(true, |oldest| message.round() < oldest);
if is_old {
cdebug!(ENGINE, "Old message {:?}.", message);
return true
Expand All @@ -145,11 +142,10 @@ impl VoteCollector {
}

/// Throws out messages older than message, leaves message as marker for the oldest.
pub fn throw_out_old(&self, vote_round: &VoteStep) {
let mut guard = self.votes.write();
let new_collector = guard.split_off(vote_round);
pub fn throw_out_old(&mut self, vote_round: &VoteStep) {
let new_collector = self.votes.split_off(vote_round);
assert!(!new_collector.is_empty());
*guard = new_collector;
self.votes = new_collector;
}

/// Collects the signatures and the indices for the given round and hash.
Expand All @@ -159,8 +155,7 @@ impl VoteCollector {
round: &VoteStep,
block_hash: &H256,
) -> (Vec<SchnorrSignature>, Vec<usize>) {
let guard = self.votes.read();
guard
self.votes
.get(round)
.and_then(|c| c.block_votes.get(&Some(*block_hash)))
.map(|votes| {
Expand All @@ -173,24 +168,23 @@ impl VoteCollector {

/// Returns the first signature and the index of its signer for a given round and hash if exists.
pub fn round_signature(&self, round: &VoteStep, block_hash: &H256) -> Option<SchnorrSignature> {
let guard = self.votes.read();
guard
self.votes
.get(round)
.and_then(|c| c.block_votes.get(&Some(*block_hash)))
.and_then(|votes| votes.values().next().cloned())
}

/// Count votes which agree with the given message.
pub fn aligned_votes(&self, message: &ConsensusMessage) -> BitSet {
if let Some(votes) = self.votes.read().get(&message.round()) {
if let Some(votes) = self.votes.get(&message.round()) {
votes.count_block(&message.block_hash())
} else {
Default::default()
}
}

pub fn block_round_votes(&self, round: &VoteStep, block_hash: &Option<H256>) -> BitSet {
if let Some(votes) = self.votes.read().get(round) {
if let Some(votes) = self.votes.get(round) {
votes.count_block(block_hash)
} else {
Default::default()
Expand All @@ -199,30 +193,29 @@ impl VoteCollector {

/// Count all votes collected for a given round.
pub fn round_votes(&self, vote_round: &VoteStep) -> BitSet {
if let Some(votes) = self.votes.read().get(vote_round) {
if let Some(votes) = self.votes.get(vote_round) {
votes.count()
} else {
Default::default()
}
}

pub fn get_block_hashes(&self, round: &VoteStep) -> Vec<H256> {
let guard = self.votes.read();
guard.get(round).map(|c| c.block_votes.keys().cloned().filter_map(|x| x).collect()).unwrap_or_else(Vec::new)
self.votes
.get(round)
.map(|c| c.block_votes.keys().cloned().filter_map(|x| x).collect())
.unwrap_or_else(Vec::new)
}

pub fn get_all(&self) -> Vec<ConsensusMessage> {
self.votes.read().iter().flat_map(|(_round, collector)| collector.messages.iter()).cloned().collect()
self.votes.iter().flat_map(|(_round, collector)| collector.messages.iter()).cloned().collect()
}

pub fn get_all_votes_in_round(&self, round: &VoteStep) -> Vec<ConsensusMessage> {
let guard = self.votes.read();
let c = guard.get(round);
c.map(|c| c.messages.iter().cloned().collect()).unwrap_or_default()
self.votes.get(round).map(|c| c.messages.iter().cloned().collect()).unwrap_or_default()
}

pub fn get_all_votes_and_indices_in_round(&self, round: &VoteStep) -> Vec<(usize, ConsensusMessage)> {
let guard = self.votes.read();
guard.get(round).map(|c| c.voted.iter().map(|(k, v)| (*k, v.clone())).collect()).unwrap_or_default()
self.votes.get(round).map(|c| c.voted.iter().map(|(k, v)| (*k, v.clone())).collect()).unwrap_or_default()
}
}