From c5eab6e11f050c7f049b3bfaac35d35bcf5b751a Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 19 Jul 2022 22:37:16 +0000 Subject: [PATCH 1/4] Track history of where channel liquidities have been in the past This introduces two new fields to the `ChannelLiquidity` struct - `min_liquidity_offset_history` and `max_liquidity_offset_history`, both an array of 8 `u16`s. Each entry represents the proportion of time that we spent with the min or max liquidity offset in the given 1/8th of the channel's liquidity range. ie the first bucket in `min_liquidity_offset_history` represents the proportion of time we've thought the channel's minimum liquidity is lower than 1/8th's the channel's capacity. Each bucket is stored, effectively, as a fixed-point number with 5 bits for the fractional part, which is incremented by one (ie 32) each time we update our liquidity estimates and decide our estimates are in that bucket. We then decay each bucket by 2047/2048. Thus, memory of a payment sticks around for more than 8,000 data points, though the majority of that memory decays after 1,387 data points. --- lightning/src/routing/scoring.rs | 137 ++++++++++++++++++++++++++----- lightning/src/util/ser.rs | 23 ++++++ 2 files changed, 139 insertions(+), 21 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 6ae339eae2b..2bae959f4fb 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -64,6 +64,7 @@ use util::time::Time; use prelude::*; use core::fmt; use core::cell::{RefCell, RefMut}; +use core::convert::TryInto; use core::ops::{Deref, DerefMut}; use core::time::Duration; use io::{self, Read}; @@ -432,6 +433,48 @@ pub struct ProbabilisticScoringParameters { pub considered_impossible_penalty_msat: u64, } +/// Tracks the historical state of a distribution as a weighted average of how much time was spent +/// in each of 8 buckets. +#[derive(Clone, Copy)] +struct HistoricalBucketRangeTracker { + buckets: [u16; 8], +} + +impl HistoricalBucketRangeTracker { + fn new() -> Self { Self { buckets: [0; 8] } } + fn track_datapoint(&mut self, bucket_idx: u8) { + // We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time + // we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part. + // + // Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to + // the buckets for the current min and max liquidity offset positions. + // + // We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a + // non-power-of-two). This ensures we can't actually overflow the u16 - when we get to + // 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457. + // + // In total, this allows us to track data for the last 8,000 or so payments across a given + // channel. + // + // These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead, + // and need to balance having more bits in the decimal part (to ensure decay isn't too + // non-linear) with having too few bits in the mantissa, causing us to not store very many + // datapoints. + // + // The constants were picked experimentally, selecting a decay amount that restricts us + // from overflowing buckets without having to cap them manually. + debug_assert!(bucket_idx < 8); + if bucket_idx < 8 { + for e in self.buckets.iter_mut() { + *e = ((*e as u32) * 2047 / 2048) as u16; + } + self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32); + } + } +} + +impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) }); + /// Accounting for channel liquidity balance uncertainty. /// /// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the @@ -446,13 +489,18 @@ struct ChannelLiquidity { /// Time when the liquidity bounds were last modified. last_updated: T, + + min_liquidity_offset_history: HistoricalBucketRangeTracker, + max_liquidity_offset_history: HistoricalBucketRangeTracker, } /// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity and /// decayed with a given half life. -struct DirectedChannelLiquidity, T: Time, U: Deref> { +struct DirectedChannelLiquidity, BRT: Deref, T: Time, U: Deref> { min_liquidity_offset_msat: L, max_liquidity_offset_msat: L, + min_liquidity_offset_history: BRT, + max_liquidity_offset_history: BRT, capacity_msat: u64, last_updated: U, now: T, @@ -593,6 +641,8 @@ impl ChannelLiquidity { Self { min_liquidity_offset_msat: 0, max_liquidity_offset_msat: 0, + min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), + max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), last_updated: T::now(), } } @@ -601,16 +651,21 @@ impl ChannelLiquidity { /// `capacity_msat`. fn as_directed( &self, source: &NodeId, target: &NodeId, capacity_msat: u64, half_life: Duration - ) -> DirectedChannelLiquidity<&u64, T, &T> { - let (min_liquidity_offset_msat, max_liquidity_offset_msat) = if source < target { - (&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat) - } else { - (&self.max_liquidity_offset_msat, &self.min_liquidity_offset_msat) - }; + ) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, T, &T> { + let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) = + if source < target { + (&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat, + &self.min_liquidity_offset_history, &self.max_liquidity_offset_history) + } else { + (&self.max_liquidity_offset_msat, &self.min_liquidity_offset_msat, + &self.max_liquidity_offset_history, &self.min_liquidity_offset_history) + }; DirectedChannelLiquidity { min_liquidity_offset_msat, max_liquidity_offset_msat, + min_liquidity_offset_history, + max_liquidity_offset_history, capacity_msat, last_updated: &self.last_updated, now: T::now(), @@ -622,16 +677,21 @@ impl ChannelLiquidity { /// `capacity_msat`. fn as_directed_mut( &mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, half_life: Duration - ) -> DirectedChannelLiquidity<&mut u64, T, &mut T> { - let (min_liquidity_offset_msat, max_liquidity_offset_msat) = if source < target { - (&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat) - } else { - (&mut self.max_liquidity_offset_msat, &mut self.min_liquidity_offset_msat) - }; + ) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, T, &mut T> { + let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) = + if source < target { + (&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat, + &mut self.min_liquidity_offset_history, &mut self.max_liquidity_offset_history) + } else { + (&mut self.max_liquidity_offset_msat, &mut self.min_liquidity_offset_msat, + &mut self.max_liquidity_offset_history, &mut self.min_liquidity_offset_history) + }; DirectedChannelLiquidity { min_liquidity_offset_msat, max_liquidity_offset_msat, + min_liquidity_offset_history, + max_liquidity_offset_history, capacity_msat, last_updated: &mut self.last_updated, now: T::now(), @@ -652,7 +712,7 @@ const PRECISION_LOWER_BOUND_DENOMINATOR: u64 = approx::LOWER_BITS_BOUND; const AMOUNT_PENALTY_DIVISOR: u64 = 1 << 20; const BASE_AMOUNT_PENALTY_DIVISOR: u64 = 1 << 30; -impl, T: Time, U: Deref> DirectedChannelLiquidity { +impl, BRT: Deref, T: Time, U: Deref> DirectedChannelLiquidity { /// Returns a liquidity penalty for routing the given HTLC `amount_msat` through the channel in /// this direction. fn penalty_msat(&self, amount_msat: u64, params: &ProbabilisticScoringParameters) -> u64 { @@ -722,7 +782,7 @@ impl, T: Time, U: Deref> DirectedChannelLiqui } } -impl, T: Time, U: DerefMut> DirectedChannelLiquidity { +impl, BRT: DerefMut, T: Time, U: DerefMut> DirectedChannelLiquidity { /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat`. fn failed_at_channel(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger { if amount_msat < self.max_liquidity_msat() { @@ -750,6 +810,21 @@ impl, T: Time, U: DerefMut> DirectedChanne self.set_max_liquidity_msat(max_liquidity_msat); } + fn update_history_buckets(&mut self) { + debug_assert!(*self.min_liquidity_offset_msat <= self.capacity_msat); + self.min_liquidity_offset_history.track_datapoint( + // Ensure the bucket index we pass is in the range [0, 7], even if the liquidity offset + // is zero or the channel's capacity, though the second should generally never happen. + (self.min_liquidity_offset_msat.saturating_sub(1) * 8 / self.capacity_msat) + .try_into().unwrap_or(32)); // 32 is bogus for 8 buckets, and will be ignored + debug_assert!(*self.max_liquidity_offset_msat <= self.capacity_msat); + self.max_liquidity_offset_history.track_datapoint( + // Ensure the bucket index we pass is in the range [0, 7], even if the liquidity offset + // is zero or the channel's capacity, though the second should generally never happen. + (self.max_liquidity_offset_msat.saturating_sub(1) * 8 / self.capacity_msat) + .try_into().unwrap_or(32)); // 32 is bogus for 8 buckets, and will be ignored + } + /// Adjusts the lower bound of the channel liquidity balance in this direction. fn set_min_liquidity_msat(&mut self, amount_msat: u64) { *self.min_liquidity_offset_msat = amount_msat; @@ -759,6 +834,7 @@ impl, T: Time, U: DerefMut> DirectedChanne self.decayed_offset_msat(*self.max_liquidity_offset_msat) }; *self.last_updated = self.now; + self.update_history_buckets(); } /// Adjusts the upper bound of the channel liquidity balance in this direction. @@ -770,6 +846,7 @@ impl, T: Time, U: DerefMut> DirectedChanne self.decayed_offset_msat(*self.min_liquidity_offset_msat) }; *self.last_updated = self.now; + self.update_history_buckets(); } } @@ -1236,7 +1313,9 @@ impl Writeable for ChannelLiquidity { let duration_since_epoch = T::duration_since_epoch() - self.last_updated.elapsed(); write_tlv_fields!(w, { (0, self.min_liquidity_offset_msat, required), + (1, Some(self.min_liquidity_offset_history), option), (2, self.max_liquidity_offset_msat, required), + (3, Some(self.max_liquidity_offset_history), option), (4, duration_since_epoch, required), }); Ok(()) @@ -1248,10 +1327,14 @@ impl Readable for ChannelLiquidity { fn read(r: &mut R) -> Result { let mut min_liquidity_offset_msat = 0; let mut max_liquidity_offset_msat = 0; + let mut min_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new()); + let mut max_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new()); let mut duration_since_epoch = Duration::from_secs(0); read_tlv_fields!(r, { (0, min_liquidity_offset_msat, required), + (1, min_liquidity_offset_history, option), (2, max_liquidity_offset_msat, required), + (3, max_liquidity_offset_history, option), (4, duration_since_epoch, required), }); // On rust prior to 1.60 `Instant::duration_since` will panic if time goes backwards. @@ -1269,6 +1352,8 @@ impl Readable for ChannelLiquidity { Ok(Self { min_liquidity_offset_msat, max_liquidity_offset_msat, + min_liquidity_offset_history: min_liquidity_offset_history.unwrap(), + max_liquidity_offset_history: max_liquidity_offset_history.unwrap(), last_updated, }) } @@ -1276,7 +1361,7 @@ impl Readable for ChannelLiquidity { #[cfg(test)] mod tests { - use super::{ChannelLiquidity, ProbabilisticScoringParameters, ProbabilisticScorerUsingTime}; + use super::{ChannelLiquidity, HistoricalBucketRangeTracker, ProbabilisticScoringParameters, ProbabilisticScorerUsingTime}; use util::time::Time; use util::time::tests::SinceEpoch; @@ -1459,11 +1544,15 @@ mod tests { let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger) .with_channel(42, ChannelLiquidity { - min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated + min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated, + min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), + max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), }) .with_channel(43, ChannelLiquidity { - min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated + min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated, + min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), + max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), }); let source = source_node_id(); let target = target_node_id(); @@ -1534,7 +1623,9 @@ mod tests { let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger) .with_channel(42, ChannelLiquidity { - min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated + min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated, + min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), + max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), }); let source = source_node_id(); let target = target_node_id(); @@ -1592,7 +1683,9 @@ mod tests { let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger) .with_channel(42, ChannelLiquidity { - min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated + min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated, + min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), + max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), }); let source = source_node_id(); let target = target_node_id(); @@ -1699,7 +1792,9 @@ mod tests { let scorer = ProbabilisticScorer::new(params, &network_graph, &logger) .with_channel(42, ChannelLiquidity { - min_liquidity_offset_msat: 40, max_liquidity_offset_msat: 40, last_updated + min_liquidity_offset_msat: 40, max_liquidity_offset_msat: 40, last_updated, + min_liquidity_offset_history: HistoricalBucketRangeTracker::new(), + max_liquidity_offset_history: HistoricalBucketRangeTracker::new(), }); let source = source_node_id(); let target = target_node_id(); diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index 5b1a86a6a95..845d13a5d23 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -515,6 +515,29 @@ impl_array!(PUBLIC_KEY_SIZE); // for PublicKey impl_array!(COMPACT_SIGNATURE_SIZE); // for Signature impl_array!(1300); // for OnionPacket.hop_data +impl Writeable for [u16; 8] { + #[inline] + fn write(&self, w: &mut W) -> Result<(), io::Error> { + for v in self.iter() { + w.write_all(&v.to_be_bytes())? + } + Ok(()) + } +} + +impl Readable for [u16; 8] { + #[inline] + fn read(r: &mut R) -> Result { + let mut buf = [0u8; 16]; + r.read_exact(&mut buf)?; + let mut res = [0u16; 8]; + for (idx, v) in res.iter_mut().enumerate() { + *v = (buf[idx] as u16) << 8 | (buf[idx + 1] as u16) + } + Ok(res) + } +} + // HashMap impl Writeable for HashMap where K: Writeable + Eq + Hash, From 6852ea974178c60211a37d4558c14678889a2932 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 21 Jul 2022 02:08:28 +0000 Subject: [PATCH 2/4] Calculate a new penalty based on historical channel liquidity range Our current `ProbabilisticScorer` attempts to build a model of the current liquidity across the payment channel network. This works fine to ignore channels we *just* tried to pay through, but it fails to remember patterns over longer time horizons. Specifically, there are *many* channels within the network that are very often either fully saturated in one direction, or are regularly rebalanced and rarely saturated. While our model may discover that, when it decays its offsets or if there is a temporary change in liquidity, it effectively forgets the "normal" state of the channel. This causes substantially suboptimal routing in practice, and avoiding discarding older knowledge when new datapoints come in is a potential solution to this. Here, we implement one such design, using the decaying buckets added in the previous commit to calculate a probability of payment success based on a weighted average of recent liquidity estimates for a channel. For each min/max liquidity bucket pair (where the min liquidity is less than the max liquidity), we can calculate the probability that a payment succeeds using our traditional `amount / capacity` formula. From there, we weigh the probability by the number of points in each bucket pair, calculating a total probability for the payment, and assigning a penalty using the same log-probability calculation used for the non-historical penalties. --- lightning/src/routing/scoring.rs | 194 +++++++++++++++++++++++++++---- 1 file changed, 170 insertions(+), 24 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 2bae959f4fb..8b475ecc0d3 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -350,7 +350,8 @@ pub struct ProbabilisticScoringParameters { pub base_penalty_amount_multiplier_msat: u64, /// A multiplier used in conjunction with the negative `log10` of the channel's success - /// probability for a payment to determine the liquidity penalty. + /// probability for a payment, as determined by our latest estimates of the channel's + /// liquidity, to determine the liquidity penalty. /// /// The penalty is based in part on the knowledge learned from prior successful and unsuccessful /// payments. This knowledge is decayed over time based on [`liquidity_offset_half_life`]. The @@ -359,7 +360,7 @@ pub struct ProbabilisticScoringParameters { /// uncertainty bounds of the channel liquidity balance. Amounts above the upper bound will /// result in a `u64::max_value` penalty, however. /// - /// Default value: 40,000 msat + /// Default value: 30,000 msat /// /// [`liquidity_offset_half_life`]: Self::liquidity_offset_half_life pub liquidity_penalty_multiplier_msat: u64, @@ -380,7 +381,8 @@ pub struct ProbabilisticScoringParameters { pub liquidity_offset_half_life: Duration, /// A multiplier used in conjunction with a payment amount and the negative `log10` of the - /// channel's success probability for the payment to determine the amount penalty. + /// channel's success probability for the payment, as determined by our latest estimates of the + /// channel's liquidity, to determine the amount penalty. /// /// The purpose of the amount penalty is to avoid having fees dominate the channel cost (i.e., /// fees plus penalty) for large payments. The penalty is computed as the product of this @@ -395,9 +397,45 @@ pub struct ProbabilisticScoringParameters { /// probabilities, the multiplier will have a decreasing effect as the negative `log10` will /// fall below `1`. /// - /// Default value: 256 msat + /// Default value: 192 msat pub liquidity_penalty_amount_multiplier_msat: u64, + /// A multiplier used in conjunction with the negative `log10` of the channel's success + /// probability for the payment, as determined based on the history of our estimates of the + /// channel's available liquidity, to determine a penalty. + /// + /// This penalty is similar to [`liquidity_penalty_multiplier_msat`], however, instead of using + /// only our latest estimate for the current liquidity available in the channel, it estimates + /// success probability based on the estimated liquidity available in the channel through + /// history. Specifically, every time we update our liquidity bounds on a given channel, we + /// track which of several buckets those bounds fall into, exponentially decaying the + /// probability of each bucket as new samples are added. + /// + /// Default value: 10,000 msat + /// + /// [`liquidity_penalty_multiplier_msat`]: Self::liquidity_penalty_multiplier_msat + pub historical_liquidity_penalty_multiplier_msat: u64, + + /// A multiplier used in conjunction with the payment amount and the negative `log10` of the + /// channel's success probability for the payment, as determined based on the history of our + /// estimates of the channel's available liquidity, to determine a penalty. + /// + /// The purpose of the amount penalty is to avoid having fees dominate the channel cost for + /// large payments. The penalty is computed as the product of this multiplier and the `2^20`ths + /// of the payment amount, weighted by the negative `log10` of the success probability. + /// + /// This penalty is similar to [`liquidity_penalty_amount_multiplier_msat`], however, instead + /// of using only our latest estimate for the current liquidity available in the channel, it + /// estimates success probability based on the estimated liquidity available in the channel + /// through history. Specifically, every time we update our liquidity bounds on a given + /// channel, we track which of several buckets those bounds fall into, exponentially decaying + /// the probability of each bucket as new samples are added. + /// + /// Default value: 64 msat + /// + /// [`liquidity_penalty_amount_multiplier_msat`]: Self::liquidity_penalty_amount_multiplier_msat + pub historical_liquidity_penalty_amount_multiplier_msat: u64, + /// Manual penalties used for the given nodes. Allows to set a particular penalty for a given /// node. Note that a manual penalty of `u64::max_value()` means the node would not ever be /// considered during path finding. @@ -605,6 +643,8 @@ impl ProbabilisticScoringParameters { liquidity_penalty_multiplier_msat: 0, liquidity_offset_half_life: Duration::from_secs(3600), liquidity_penalty_amount_multiplier_msat: 0, + historical_liquidity_penalty_multiplier_msat: 0, + historical_liquidity_penalty_amount_multiplier_msat: 0, manual_node_penalties: HashMap::new(), anti_probing_penalty_msat: 0, considered_impossible_penalty_msat: 0, @@ -625,9 +665,11 @@ impl Default for ProbabilisticScoringParameters { Self { base_penalty_msat: 500, base_penalty_amount_multiplier_msat: 8192, - liquidity_penalty_multiplier_msat: 40_000, + liquidity_penalty_multiplier_msat: 30_000, liquidity_offset_half_life: Duration::from_secs(3600), - liquidity_penalty_amount_multiplier_msat: 256, + liquidity_penalty_amount_multiplier_msat: 192, + historical_liquidity_penalty_multiplier_msat: 10_000, + historical_liquidity_penalty_amount_multiplier_msat: 64, manual_node_penalties: HashMap::new(), anti_probing_penalty_msat: 250, considered_impossible_penalty_msat: 1_0000_0000_000, @@ -718,14 +760,17 @@ impl, BRT: Deref, fn penalty_msat(&self, amount_msat: u64, params: &ProbabilisticScoringParameters) -> u64 { let max_liquidity_msat = self.max_liquidity_msat(); let min_liquidity_msat = core::cmp::min(self.min_liquidity_msat(), max_liquidity_msat); - if amount_msat <= min_liquidity_msat { + + let mut res = if amount_msat <= min_liquidity_msat { 0 } else if amount_msat >= max_liquidity_msat { // Equivalent to hitting the else clause below with the amount equal to the effective // capacity and without any certainty on the liquidity upper bound, plus the // impossibility penalty. let negative_log10_times_2048 = NEGATIVE_LOG10_UPPER_BOUND * 2048; - self.combined_penalty_msat(amount_msat, negative_log10_times_2048, params) + Self::combined_penalty_msat(amount_msat, negative_log10_times_2048, + params.liquidity_penalty_multiplier_msat, + params.liquidity_penalty_amount_multiplier_msat) .saturating_add(params.considered_impossible_penalty_msat) } else { let numerator = (max_liquidity_msat - amount_msat).saturating_add(1); @@ -738,25 +783,96 @@ impl, BRT: Deref, } else { let negative_log10_times_2048 = approx::negative_log10_times_2048(numerator, denominator); - self.combined_penalty_msat(amount_msat, negative_log10_times_2048, params) + Self::combined_penalty_msat(amount_msat, negative_log10_times_2048, + params.liquidity_penalty_multiplier_msat, + params.liquidity_penalty_amount_multiplier_msat) + } + }; + + if params.historical_liquidity_penalty_multiplier_msat != 0 || + params.historical_liquidity_penalty_amount_multiplier_msat != 0 { + // If historical penalties are enabled, calculate the penalty by walking the set of + // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) + // and, for each, calculate the probability of success given our payment amount, then + // total the weighted average probability of success. + // + // We use a sliding scale to decide which point within a given bucket will be compared + // to the amount being sent - for lower-bounds, the amount being sent is compared to + // the lower edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of + // the last bucket (i.e. 9 times the index, or 63), with each bucket in between + // increasing the comparison point by 1/64th. For upper-bounds, the same applies, + // however with an offset of 1/64th (i.e. starting at one and ending at 64). This + // avoids failing to assign penalties to channels at the edges. + // + // If we used the bottom edge of buckets, we'd end up never assigning any penalty at + // all to such a channel when sending less than ~0.19% of the channel's capacity (e.g. + // ~200k sats for a 1 BTC channel!). + // + // If we used the middle of each bucket we'd never assign any penalty at all when + // sending less than 1/16th of a channel's capacity, or 1/8th if we used the top of the + // bucket. + let mut total_valid_points_tracked = 0; + for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { + for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) { + total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64); + } + } + if total_valid_points_tracked == 0 { + // If we don't have any valid points, redo the non-historical calculation with no + // liquidity bounds tracked and the historical penalty multipliers. + let max_capacity = self.capacity_msat.saturating_sub(amount_msat).saturating_add(1); + let negative_log10_times_2048 = + approx::negative_log10_times_2048(max_capacity, self.capacity_msat.saturating_add(1)); + res = res.saturating_add(Self::combined_penalty_msat(amount_msat, negative_log10_times_2048, + params.historical_liquidity_penalty_multiplier_msat, + params.historical_liquidity_penalty_amount_multiplier_msat)); + return res; } + + let payment_amt_64th_bucket = amount_msat * 64 / self.capacity_msat; + debug_assert!(payment_amt_64th_bucket <= 64); + if payment_amt_64th_bucket > 64 { return res; } + + let mut cumulative_success_prob_times_billion = 0; + for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { + for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) { + let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64) + * 1024 * 1024 / total_valid_points_tracked; + let min_64th_bucket = min_idx as u64 * 9; + let max_64th_bucket = (7 - max_idx as u64) * 9 + 1; + if payment_amt_64th_bucket > max_64th_bucket { + // Success probability 0, the payment amount is above the max liquidity + } else if payment_amt_64th_bucket <= min_64th_bucket { + cumulative_success_prob_times_billion += bucket_prob_times_million * 1024; + } else { + cumulative_success_prob_times_billion += bucket_prob_times_million * + (max_64th_bucket - payment_amt_64th_bucket) * 1024 / + (max_64th_bucket - min_64th_bucket); + } + } + } + let historical_negative_log10_times_2048 = approx::negative_log10_times_2048(cumulative_success_prob_times_billion + 1, 1024 * 1024 * 1024); + res = res.saturating_add(Self::combined_penalty_msat(amount_msat, + historical_negative_log10_times_2048, params.historical_liquidity_penalty_multiplier_msat, + params.historical_liquidity_penalty_amount_multiplier_msat)); } + + res } /// Computes the liquidity penalty from the penalty multipliers. #[inline(always)] - fn combined_penalty_msat( - &self, amount_msat: u64, negative_log10_times_2048: u64, - params: &ProbabilisticScoringParameters + fn combined_penalty_msat(amount_msat: u64, negative_log10_times_2048: u64, + liquidity_penalty_multiplier_msat: u64, liquidity_penalty_amount_multiplier_msat: u64, ) -> u64 { let liquidity_penalty_msat = { // Upper bound the liquidity penalty to ensure some channel is selected. - let multiplier_msat = params.liquidity_penalty_multiplier_msat; + let multiplier_msat = liquidity_penalty_multiplier_msat; let max_penalty_msat = multiplier_msat.saturating_mul(NEGATIVE_LOG10_UPPER_BOUND); (negative_log10_times_2048.saturating_mul(multiplier_msat) / 2048).min(max_penalty_msat) }; let amount_penalty_msat = negative_log10_times_2048 - .saturating_mul(params.liquidity_penalty_amount_multiplier_msat) + .saturating_mul(liquidity_penalty_amount_multiplier_msat) .saturating_mul(amount_msat) / 2048 / AMOUNT_PENALTY_DIVISOR; liquidity_penalty_msat.saturating_add(amount_penalty_msat) @@ -2199,35 +2315,35 @@ mod tests { let usage = ChannelUsage { effective_capacity: EffectiveCapacity::Total { capacity_msat: 3_950_000_000, htlc_maximum_msat: Some(1_000) }, ..usage }; - assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1985); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1983); let usage = ChannelUsage { effective_capacity: EffectiveCapacity::Total { capacity_msat: 4_950_000_000, htlc_maximum_msat: Some(1_000) }, ..usage }; - assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1639); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1637); let usage = ChannelUsage { effective_capacity: EffectiveCapacity::Total { capacity_msat: 5_950_000_000, htlc_maximum_msat: Some(1_000) }, ..usage }; - assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1607); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1606); let usage = ChannelUsage { effective_capacity: EffectiveCapacity::Total { capacity_msat: 6_950_000_000, htlc_maximum_msat: Some(1_000) }, ..usage }; - assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1262); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1331); let usage = ChannelUsage { effective_capacity: EffectiveCapacity::Total { capacity_msat: 7_450_000_000, htlc_maximum_msat: Some(1_000) }, ..usage }; - assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1262); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1387); let usage = ChannelUsage { effective_capacity: EffectiveCapacity::Total { capacity_msat: 7_950_000_000, htlc_maximum_msat: Some(1_000) }, ..usage }; - assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1262); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1379); let usage = ChannelUsage { effective_capacity: EffectiveCapacity::Total { capacity_msat: 8_950_000_000, htlc_maximum_msat: Some(1_000) }, ..usage }; - assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1262); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1363); let usage = ChannelUsage { effective_capacity: EffectiveCapacity::Total { capacity_msat: 9_950_000_000, htlc_maximum_msat: Some(1_000) }, ..usage }; - assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1262); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 1355); } #[test] @@ -2251,7 +2367,7 @@ mod tests { let params = ProbabilisticScoringParameters { base_penalty_msat: 500, liquidity_penalty_multiplier_msat: 1_000, - anti_probing_penalty_msat: 0, ..Default::default() + anti_probing_penalty_msat: 0, ..ProbabilisticScoringParameters::zero_penalty() }; let scorer = ProbabilisticScorer::new(params, &network_graph, &logger); assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 558); @@ -2259,7 +2375,7 @@ mod tests { let params = ProbabilisticScoringParameters { base_penalty_msat: 500, liquidity_penalty_multiplier_msat: 1_000, base_penalty_amount_multiplier_msat: (1 << 30), - anti_probing_penalty_msat: 0, ..Default::default() + anti_probing_penalty_msat: 0, ..ProbabilisticScoringParameters::zero_penalty() }; let scorer = ProbabilisticScorer::new(params, &network_graph, &logger); @@ -2362,6 +2478,36 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), u64::max_value()); } + #[test] + fn remembers_historical_failures() { + let logger = TestLogger::new(); + let network_graph = network_graph(&logger); + let params = ProbabilisticScoringParameters { + historical_liquidity_penalty_multiplier_msat: 1024, + historical_liquidity_penalty_amount_multiplier_msat: 1024, + ..ProbabilisticScoringParameters::zero_penalty() + }; + let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger); + let source = source_node_id(); + let target = target_node_id(); + + let usage = ChannelUsage { + amount_msat: 100, + inflight_htlc_msat: 0, + effective_capacity: EffectiveCapacity::Total { capacity_msat: 1_024, htlc_maximum_msat: Some(1_024) }, + }; + // With no historical data the normal liquidity penalty calculation is used. + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 47); + + scorer.payment_path_failed(&payment_path_for_amount(1).iter().collect::>(), 42); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 2048); + + // Even after we tell the scorer we definitely have enough available liquidity, it will + // still remember that there was some failure in the past, and assign a non-0 penalty. + scorer.payment_path_failed(&payment_path_for_amount(1000).iter().collect::>(), 43); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 198); + } + #[test] fn adds_anti_probing_penalty() { let logger = TestLogger::new(); From ec68f1326d9ad5a4f05ed21a986bc7410f5a06b6 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 22 Aug 2022 22:40:02 +0000 Subject: [PATCH 3/4] Track a reference to scoring parameters in DirectedChannelLiquidity This simplifies adding additional half lives in DirectedChannelLiquidity by simply storing a reference to the full parameter set rather than only the single current half-life. --- lightning/src/routing/scoring.rs | 96 +++++++++++++++----------------- 1 file changed, 45 insertions(+), 51 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 8b475ecc0d3..c8cf3583cd4 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -534,7 +534,7 @@ struct ChannelLiquidity { /// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity and /// decayed with a given half life. -struct DirectedChannelLiquidity, BRT: Deref, T: Time, U: Deref> { +struct DirectedChannelLiquidity<'a, L: Deref, BRT: Deref, T: Time, U: Deref> { min_liquidity_offset_msat: L, max_liquidity_offset_msat: L, min_liquidity_offset_history: BRT, @@ -542,7 +542,7 @@ struct DirectedChannelLiquidity, BRT: Deref>, L: Deref, T: Time> ProbabilisticScorerUsingTime where L::Target: Logger { @@ -574,7 +574,7 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let log_direction = |source, target| { if let Some((directed_info, _)) = chan_debug.as_directed_to(target) { let amt = directed_info.effective_capacity().as_msat(); - let dir_liq = liq.as_directed(source, target, amt, self.params.liquidity_offset_half_life); + let dir_liq = liq.as_directed(source, target, amt, &self.params); log_debug!(self.logger, "Liquidity from {:?} to {:?} via {} is in the range ({}, {})", source, target, scid, dir_liq.min_liquidity_msat(), dir_liq.max_liquidity_msat()); } else { @@ -599,7 +599,7 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU if let Some(liq) = self.channel_liquidities.get(&scid) { if let Some((directed_info, source)) = chan.as_directed_to(target) { let amt = directed_info.effective_capacity().as_msat(); - let dir_liq = liq.as_directed(source, target, amt, self.params.liquidity_offset_half_life); + let dir_liq = liq.as_directed(source, target, amt, &self.params); return Some((dir_liq.min_liquidity_msat(), dir_liq.max_liquidity_msat())); } } @@ -691,9 +691,9 @@ impl ChannelLiquidity { /// Returns a view of the channel liquidity directed from `source` to `target` assuming /// `capacity_msat`. - fn as_directed( - &self, source: &NodeId, target: &NodeId, capacity_msat: u64, half_life: Duration - ) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, T, &T> { + fn as_directed<'a>( + &self, source: &NodeId, target: &NodeId, capacity_msat: u64, params: &'a ProbabilisticScoringParameters + ) -> DirectedChannelLiquidity<'a, &u64, &HistoricalBucketRangeTracker, T, &T> { let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) = if source < target { (&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat, @@ -711,15 +711,15 @@ impl ChannelLiquidity { capacity_msat, last_updated: &self.last_updated, now: T::now(), - half_life, + params, } } /// Returns a mutable view of the channel liquidity directed from `source` to `target` assuming /// `capacity_msat`. - fn as_directed_mut( - &mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, half_life: Duration - ) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, T, &mut T> { + fn as_directed_mut<'a>( + &mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, params: &'a ProbabilisticScoringParameters + ) -> DirectedChannelLiquidity<'a, &mut u64, &mut HistoricalBucketRangeTracker, T, &mut T> { let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) = if source < target { (&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat, @@ -737,7 +737,7 @@ impl ChannelLiquidity { capacity_msat, last_updated: &mut self.last_updated, now: T::now(), - half_life, + params, } } } @@ -754,7 +754,7 @@ const PRECISION_LOWER_BOUND_DENOMINATOR: u64 = approx::LOWER_BITS_BOUND; const AMOUNT_PENALTY_DIVISOR: u64 = 1 << 20; const BASE_AMOUNT_PENALTY_DIVISOR: u64 = 1 << 30; -impl, BRT: Deref, T: Time, U: Deref> DirectedChannelLiquidity { +impl, BRT: Deref, T: Time, U: Deref> DirectedChannelLiquidity<'_, L, BRT, T, U> { /// Returns a liquidity penalty for routing the given HTLC `amount_msat` through the channel in /// this direction. fn penalty_msat(&self, amount_msat: u64, params: &ProbabilisticScoringParameters) -> u64 { @@ -892,13 +892,13 @@ impl, BRT: Deref, fn decayed_offset_msat(&self, offset_msat: u64) -> u64 { self.now.duration_since(*self.last_updated).as_secs() - .checked_div(self.half_life.as_secs()) + .checked_div(self.params.liquidity_offset_half_life.as_secs()) .and_then(|decays| offset_msat.checked_shr(decays as u32)) .unwrap_or(0) } } -impl, BRT: DerefMut, T: Time, U: DerefMut> DirectedChannelLiquidity { +impl, BRT: DerefMut, T: Time, U: DerefMut> DirectedChannelLiquidity<'_, L, BRT, T, U> { /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat`. fn failed_at_channel(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger { if amount_msat < self.max_liquidity_msat() { @@ -995,14 +995,13 @@ impl>, L: Deref, T: Time> Score for Probabilis _ => {}, } - let liquidity_offset_half_life = self.params.liquidity_offset_half_life; let amount_msat = usage.amount_msat; let capacity_msat = usage.effective_capacity.as_msat() .saturating_sub(usage.inflight_htlc_msat); self.channel_liquidities .get(&short_channel_id) .unwrap_or(&ChannelLiquidity::new()) - .as_directed(source, target, capacity_msat, liquidity_offset_half_life) + .as_directed(source, target, capacity_msat, &self.params) .penalty_msat(amount_msat, &self.params) .saturating_add(anti_probing_penalty_msat) .saturating_add(base_penalty_msat) @@ -1010,7 +1009,6 @@ impl>, L: Deref, T: Time> Score for Probabilis fn payment_path_failed(&mut self, path: &[&RouteHop], short_channel_id: u64) { let amount_msat = path.split_last().map(|(hop, _)| hop.fee_msat).unwrap_or(0); - let liquidity_offset_half_life = self.params.liquidity_offset_half_life; log_trace!(self.logger, "Scoring path through to SCID {} as having failed at {} msat", short_channel_id, amount_msat); let network_graph = self.network_graph.read_only(); for (hop_idx, hop) in path.iter().enumerate() { @@ -1030,7 +1028,7 @@ impl>, L: Deref, T: Time> Score for Probabilis self.channel_liquidities .entry(hop.short_channel_id) .or_insert_with(ChannelLiquidity::new) - .as_directed_mut(source, &target, capacity_msat, liquidity_offset_half_life) + .as_directed_mut(source, &target, capacity_msat, &self.params) .failed_at_channel(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); break; } @@ -1038,7 +1036,7 @@ impl>, L: Deref, T: Time> Score for Probabilis self.channel_liquidities .entry(hop.short_channel_id) .or_insert_with(ChannelLiquidity::new) - .as_directed_mut(source, &target, capacity_msat, liquidity_offset_half_life) + .as_directed_mut(source, &target, capacity_msat, &self.params) .failed_downstream(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } else { log_debug!(self.logger, "Not able to penalize channel with SCID {} as we do not have graph info for it (likely a route-hint last-hop).", @@ -1049,7 +1047,6 @@ impl>, L: Deref, T: Time> Score for Probabilis fn payment_path_successful(&mut self, path: &[&RouteHop]) { let amount_msat = path.split_last().map(|(hop, _)| hop.fee_msat).unwrap_or(0); - let liquidity_offset_half_life = self.params.liquidity_offset_half_life; log_trace!(self.logger, "Scoring path through SCID {} as having succeeded at {} msat.", path.split_last().map(|(hop, _)| hop.short_channel_id).unwrap_or(0), amount_msat); let network_graph = self.network_graph.read_only(); @@ -1065,7 +1062,7 @@ impl>, L: Deref, T: Time> Score for Probabilis self.channel_liquidities .entry(hop.short_channel_id) .or_insert_with(ChannelLiquidity::new) - .as_directed_mut(source, &target, capacity_msat, liquidity_offset_half_life) + .as_directed_mut(source, &target, capacity_msat, &self.params) .successful(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } else { log_debug!(self.logger, "Not able to learn for channel with SCID {} as we do not have graph info for it (likely a route-hint last-hop).", @@ -1678,54 +1675,53 @@ mod tests { // Update minimum liquidity. - let liquidity_offset_half_life = scorer.params.liquidity_offset_half_life; let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, liquidity_offset_half_life); + .as_directed(&source, &target, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 100); assert_eq!(liquidity.max_liquidity_msat(), 300); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, liquidity_offset_half_life); + .as_directed(&target, &source, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 700); assert_eq!(liquidity.max_liquidity_msat(), 900); scorer.channel_liquidities.get_mut(&42).unwrap() - .as_directed_mut(&source, &target, 1_000, liquidity_offset_half_life) + .as_directed_mut(&source, &target, 1_000, &scorer.params) .set_min_liquidity_msat(200); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, liquidity_offset_half_life); + .as_directed(&source, &target, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 200); assert_eq!(liquidity.max_liquidity_msat(), 300); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, liquidity_offset_half_life); + .as_directed(&target, &source, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 700); assert_eq!(liquidity.max_liquidity_msat(), 800); // Update maximum liquidity. let liquidity = scorer.channel_liquidities.get(&43).unwrap() - .as_directed(&target, &recipient, 1_000, liquidity_offset_half_life); + .as_directed(&target, &recipient, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 700); assert_eq!(liquidity.max_liquidity_msat(), 900); let liquidity = scorer.channel_liquidities.get(&43).unwrap() - .as_directed(&recipient, &target, 1_000, liquidity_offset_half_life); + .as_directed(&recipient, &target, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 100); assert_eq!(liquidity.max_liquidity_msat(), 300); scorer.channel_liquidities.get_mut(&43).unwrap() - .as_directed_mut(&target, &recipient, 1_000, liquidity_offset_half_life) + .as_directed_mut(&target, &recipient, 1_000, &scorer.params) .set_max_liquidity_msat(200); let liquidity = scorer.channel_liquidities.get(&43).unwrap() - .as_directed(&target, &recipient, 1_000, liquidity_offset_half_life); + .as_directed(&target, &recipient, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 0); assert_eq!(liquidity.max_liquidity_msat(), 200); let liquidity = scorer.channel_liquidities.get(&43).unwrap() - .as_directed(&recipient, &target, 1_000, liquidity_offset_half_life); + .as_directed(&recipient, &target, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 800); assert_eq!(liquidity.max_liquidity_msat(), 1000); } @@ -1748,44 +1744,43 @@ mod tests { assert!(source > target); // Check initial bounds. - let liquidity_offset_half_life = scorer.params.liquidity_offset_half_life; let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, liquidity_offset_half_life); + .as_directed(&source, &target, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 400); assert_eq!(liquidity.max_liquidity_msat(), 800); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, liquidity_offset_half_life); + .as_directed(&target, &source, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 200); assert_eq!(liquidity.max_liquidity_msat(), 600); // Reset from source to target. scorer.channel_liquidities.get_mut(&42).unwrap() - .as_directed_mut(&source, &target, 1_000, liquidity_offset_half_life) + .as_directed_mut(&source, &target, 1_000, &scorer.params) .set_min_liquidity_msat(900); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, liquidity_offset_half_life); + .as_directed(&source, &target, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 900); assert_eq!(liquidity.max_liquidity_msat(), 1_000); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, liquidity_offset_half_life); + .as_directed(&target, &source, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 0); assert_eq!(liquidity.max_liquidity_msat(), 100); // Reset from target to source. scorer.channel_liquidities.get_mut(&42).unwrap() - .as_directed_mut(&target, &source, 1_000, liquidity_offset_half_life) + .as_directed_mut(&target, &source, 1_000, &scorer.params) .set_min_liquidity_msat(400); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, liquidity_offset_half_life); + .as_directed(&source, &target, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 0); assert_eq!(liquidity.max_liquidity_msat(), 600); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, liquidity_offset_half_life); + .as_directed(&target, &source, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 400); assert_eq!(liquidity.max_liquidity_msat(), 1_000); } @@ -1808,44 +1803,43 @@ mod tests { assert!(source > target); // Check initial bounds. - let liquidity_offset_half_life = scorer.params.liquidity_offset_half_life; let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, liquidity_offset_half_life); + .as_directed(&source, &target, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 400); assert_eq!(liquidity.max_liquidity_msat(), 800); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, liquidity_offset_half_life); + .as_directed(&target, &source, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 200); assert_eq!(liquidity.max_liquidity_msat(), 600); // Reset from source to target. scorer.channel_liquidities.get_mut(&42).unwrap() - .as_directed_mut(&source, &target, 1_000, liquidity_offset_half_life) + .as_directed_mut(&source, &target, 1_000, &scorer.params) .set_max_liquidity_msat(300); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, liquidity_offset_half_life); + .as_directed(&source, &target, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 0); assert_eq!(liquidity.max_liquidity_msat(), 300); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, liquidity_offset_half_life); + .as_directed(&target, &source, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 700); assert_eq!(liquidity.max_liquidity_msat(), 1_000); // Reset from target to source. scorer.channel_liquidities.get_mut(&42).unwrap() - .as_directed_mut(&target, &source, 1_000, liquidity_offset_half_life) + .as_directed_mut(&target, &source, 1_000, &scorer.params) .set_max_liquidity_msat(600); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&source, &target, 1_000, liquidity_offset_half_life); + .as_directed(&source, &target, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 400); assert_eq!(liquidity.max_liquidity_msat(), 1_000); let liquidity = scorer.channel_liquidities.get(&42).unwrap() - .as_directed(&target, &source, 1_000, liquidity_offset_half_life); + .as_directed(&target, &source, 1_000, &scorer.params); assert_eq!(liquidity.min_liquidity_msat(), 0); assert_eq!(liquidity.max_liquidity_msat(), 600); } From c8fb859ad6889c67f675052140cf817052542872 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 22 Aug 2022 22:41:31 +0000 Subject: [PATCH 4/4] Decay historical liquidity tracking when no new data is added To avoid scoring based on incredibly old historical liquidity data, we add a new half-life here which is used to (very slowly) decay historical liquidity tracking buckets. --- lightning/src/routing/scoring.rs | 186 +++++++++++++++++++++---------- 1 file changed, 127 insertions(+), 59 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index c8cf3583cd4..73bbbe460e1 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -62,7 +62,7 @@ use util::logger::Logger; use util::time::Time; use prelude::*; -use core::fmt; +use core::{cmp, fmt}; use core::cell::{RefCell, RefMut}; use core::convert::TryInto; use core::ops::{Deref, DerefMut}; @@ -436,6 +436,16 @@ pub struct ProbabilisticScoringParameters { /// [`liquidity_penalty_amount_multiplier_msat`]: Self::liquidity_penalty_amount_multiplier_msat pub historical_liquidity_penalty_amount_multiplier_msat: u64, + /// If we aren't learning any new datapoints for a channel, the historical liquidity bounds + /// tracking can simply live on with increasingly stale data. Instead, when a channel has not + /// seen a liquidity estimate update for this amount of time, the historical datapoints are + /// decayed by half. + /// + /// Note that after 16 or more half lives all historical data will be completely gone. + /// + /// Default value: 14 days + pub historical_no_updates_half_life: Duration, + /// Manual penalties used for the given nodes. Allows to set a particular penalty for a given /// node. Note that a manual penalty of `u64::max_value()` means the node would not ever be /// considered during path finding. @@ -509,10 +519,89 @@ impl HistoricalBucketRangeTracker { self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32); } } + /// Decay all buckets by the given number of half-lives. Used to more aggressively remove old + /// datapoints as we receive newer information. + fn time_decay_data(&mut self, half_lives: u32) { + for e in self.buckets.iter_mut() { + *e = e.checked_shr(half_lives).unwrap_or(0); + } + } } impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) }); +struct HistoricalMinMaxBuckets<'a> { + min_liquidity_offset_history: &'a HistoricalBucketRangeTracker, + max_liquidity_offset_history: &'a HistoricalBucketRangeTracker, +} + +impl HistoricalMinMaxBuckets<'_> { + #[inline] + fn calculate_success_probability_times_billion(&self, required_decays: u32, payment_amt_64th_bucket: u8) -> Option { + // If historical penalties are enabled, calculate the penalty by walking the set of + // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for + // each, calculate the probability of success given our payment amount, then total the + // weighted average probability of success. + // + // We use a sliding scale to decide which point within a given bucket will be compared to + // the amount being sent - for lower-bounds, the amount being sent is compared to the lower + // edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last + // bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the + // comparison point by 1/64th. For upper-bounds, the same applies, however with an offset + // of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign + // penalties to channels at the edges. + // + // If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to + // such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats + // for a 1 BTC channel!). + // + // If we used the middle of each bucket we'd never assign any penalty at all when sending + // less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket. + let mut total_valid_points_tracked = 0; + + // Rather than actually decaying the individual buckets, which would lose precision, we + // simply track whether all buckets would be decayed to zero, in which case we treat it as + // if we had no data. + let mut is_fully_decayed = true; + let mut check_track_bucket_contains_undecayed_points = + |bucket_val: u16| if bucket_val.checked_shr(required_decays).unwrap_or(0) > 0 { is_fully_decayed = false; }; + + for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { + check_track_bucket_contains_undecayed_points(*min_bucket); + for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) { + total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64); + check_track_bucket_contains_undecayed_points(*max_bucket); + } + } + // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat + // it as if we were fully decayed. + if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 || is_fully_decayed { + return None; + } + + let mut cumulative_success_prob_times_billion = 0; + for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { + for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) { + let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64) + * 1024 * 1024 / total_valid_points_tracked; + let min_64th_bucket = min_idx as u8 * 9; + let max_64th_bucket = (7 - max_idx as u8) * 9 + 1; + if payment_amt_64th_bucket > max_64th_bucket { + // Success probability 0, the payment amount is above the max liquidity + } else if payment_amt_64th_bucket <= min_64th_bucket { + cumulative_success_prob_times_billion += bucket_prob_times_million * 1024; + } else { + cumulative_success_prob_times_billion += bucket_prob_times_million * + ((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 / + ((max_64th_bucket - min_64th_bucket) as u64); + } + } + } + + Some(cumulative_success_prob_times_billion) + } +} + /// Accounting for channel liquidity balance uncertainty. /// /// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the @@ -645,6 +734,7 @@ impl ProbabilisticScoringParameters { liquidity_penalty_amount_multiplier_msat: 0, historical_liquidity_penalty_multiplier_msat: 0, historical_liquidity_penalty_amount_multiplier_msat: 0, + historical_no_updates_half_life: Duration::from_secs(60 * 60 * 24 * 14), manual_node_penalties: HashMap::new(), anti_probing_penalty_msat: 0, considered_impossible_penalty_msat: 0, @@ -670,6 +760,7 @@ impl Default for ProbabilisticScoringParameters { liquidity_penalty_amount_multiplier_msat: 192, historical_liquidity_penalty_multiplier_msat: 10_000, historical_liquidity_penalty_amount_multiplier_msat: 64, + historical_no_updates_half_life: Duration::from_secs(60 * 60 * 24 * 14), manual_node_penalties: HashMap::new(), anti_probing_penalty_msat: 250, considered_impossible_penalty_msat: 1_0000_0000_000, @@ -791,35 +882,27 @@ impl, BRT: Deref, if params.historical_liquidity_penalty_multiplier_msat != 0 || params.historical_liquidity_penalty_amount_multiplier_msat != 0 { - // If historical penalties are enabled, calculate the penalty by walking the set of - // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) - // and, for each, calculate the probability of success given our payment amount, then - // total the weighted average probability of success. - // - // We use a sliding scale to decide which point within a given bucket will be compared - // to the amount being sent - for lower-bounds, the amount being sent is compared to - // the lower edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of - // the last bucket (i.e. 9 times the index, or 63), with each bucket in between - // increasing the comparison point by 1/64th. For upper-bounds, the same applies, - // however with an offset of 1/64th (i.e. starting at one and ending at 64). This - // avoids failing to assign penalties to channels at the edges. - // - // If we used the bottom edge of buckets, we'd end up never assigning any penalty at - // all to such a channel when sending less than ~0.19% of the channel's capacity (e.g. - // ~200k sats for a 1 BTC channel!). - // - // If we used the middle of each bucket we'd never assign any penalty at all when - // sending less than 1/16th of a channel's capacity, or 1/8th if we used the top of the - // bucket. - let mut total_valid_points_tracked = 0; - for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { - for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) { - total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64); - } - } - if total_valid_points_tracked == 0 { - // If we don't have any valid points, redo the non-historical calculation with no - // liquidity bounds tracked and the historical penalty multipliers. + let required_decays = self.now.duration_since(*self.last_updated).as_secs() + .checked_div(params.historical_no_updates_half_life.as_secs()) + .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32); + let payment_amt_64th_bucket = amount_msat * 64 / self.capacity_msat; + debug_assert!(payment_amt_64th_bucket <= 64); + if payment_amt_64th_bucket > 64 { return res; } + + let buckets = HistoricalMinMaxBuckets { + min_liquidity_offset_history: &self.min_liquidity_offset_history, + max_liquidity_offset_history: &self.max_liquidity_offset_history, + }; + if let Some(cumulative_success_prob_times_billion) = buckets + .calculate_success_probability_times_billion(required_decays, payment_amt_64th_bucket as u8) { + let historical_negative_log10_times_2048 = approx::negative_log10_times_2048(cumulative_success_prob_times_billion + 1, 1024 * 1024 * 1024); + res = res.saturating_add(Self::combined_penalty_msat(amount_msat, + historical_negative_log10_times_2048, params.historical_liquidity_penalty_multiplier_msat, + params.historical_liquidity_penalty_amount_multiplier_msat)); + } else { + // If we don't have any valid points (or, once decayed, we have less than a full + // point), redo the non-historical calculation with no liquidity bounds tracked and + // the historical penalty multipliers. let max_capacity = self.capacity_msat.saturating_sub(amount_msat).saturating_add(1); let negative_log10_times_2048 = approx::negative_log10_times_2048(max_capacity, self.capacity_msat.saturating_add(1)); @@ -828,33 +911,6 @@ impl, BRT: Deref, params.historical_liquidity_penalty_amount_multiplier_msat)); return res; } - - let payment_amt_64th_bucket = amount_msat * 64 / self.capacity_msat; - debug_assert!(payment_amt_64th_bucket <= 64); - if payment_amt_64th_bucket > 64 { return res; } - - let mut cumulative_success_prob_times_billion = 0; - for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { - for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) { - let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64) - * 1024 * 1024 / total_valid_points_tracked; - let min_64th_bucket = min_idx as u64 * 9; - let max_64th_bucket = (7 - max_idx as u64) * 9 + 1; - if payment_amt_64th_bucket > max_64th_bucket { - // Success probability 0, the payment amount is above the max liquidity - } else if payment_amt_64th_bucket <= min_64th_bucket { - cumulative_success_prob_times_billion += bucket_prob_times_million * 1024; - } else { - cumulative_success_prob_times_billion += bucket_prob_times_million * - (max_64th_bucket - payment_amt_64th_bucket) * 1024 / - (max_64th_bucket - min_64th_bucket); - } - } - } - let historical_negative_log10_times_2048 = approx::negative_log10_times_2048(cumulative_success_prob_times_billion + 1, 1024 * 1024 * 1024); - res = res.saturating_add(Self::combined_penalty_msat(amount_msat, - historical_negative_log10_times_2048, params.historical_liquidity_penalty_multiplier_msat, - params.historical_liquidity_penalty_amount_multiplier_msat)); } res @@ -927,6 +983,12 @@ impl, BRT: DerefMut, BRT: DerefMut, BRT: DerefMut>(), 43); assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 198); + + // Advance the time forward 16 half-lives (which the docs claim will ensure all data is + // gone), and check that we're back to where we started. + SinceEpoch::advance(Duration::from_secs(10 * 16)); + assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 47); } #[test]