Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: leader schedule generation #4662

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 102 additions & 34 deletions ledger/src/leader_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
rand::distributions::{Distribution, WeightedIndex},
rand_chacha::{rand_core::SeedableRng, ChaChaRng},
solana_pubkey::Pubkey,
solana_sdk::clock::Epoch,
std::{collections::HashMap, convert::identity, ops::Index, sync::Arc},
};

Expand All @@ -22,35 +23,60 @@ pub struct LeaderSchedule {

impl LeaderSchedule {
// Note: passing in zero stakers will cause a panic.
pub fn new(ids_and_stakes: &[(Pubkey, u64)], seed: [u8; 32], len: u64, repeat: u64) -> Self {
let (ids, stakes): (Vec<_>, Vec<_>) = ids_and_stakes.iter().cloned().unzip();
let rng = &mut ChaChaRng::from_seed(seed);
pub fn new_keyed_by_validator_identity(
epoch_staked_nodes: &HashMap<Pubkey, u64>,
epoch: Epoch,
len: u64,
repeat: u64,
) -> Self {
let keyed_stakes: Vec<_> = epoch_staked_nodes
.iter()
.map(|(pubkey, stake)| (pubkey, *stake))
.collect();
let slot_leaders = Self::stake_weighted_slot_leaders(keyed_stakes, epoch, len, repeat);
Self::new_from_schedule(slot_leaders)
}

// Note: passing in zero stakers will cause a panic.
fn stake_weighted_slot_leaders(
mut keyed_stakes: Vec<(&Pubkey, u64)>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could avoid the intermediate collection here by taking in an iter (flipped) Iterator<Item = (u64, &Pubkey)> and using the iter variants in sort_stakes:

stakes.sorted_unstable().dedup().reverse()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IterTools often allocates internally in its helper methods FYI:

fn sorted_unstable(..) {
        let mut v = Vec::from_iter(self);
        v.sort_unstable();
        v.into_iter()
}

epoch: Epoch,
len: u64,
repeat: u64,
) -> Vec<Pubkey> {
sort_stakes(&mut keyed_stakes);
let (keys, stakes): (Vec<_>, Vec<_>) = keyed_stakes.into_iter().unzip();
let weighted_index = WeightedIndex::new(stakes).unwrap();
let mut current_node = Pubkey::default();
let slot_leaders = (0..len)
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
let rng = &mut ChaChaRng::from_seed(seed);
let mut current_slot_leader = Pubkey::default();
(0..len)
.map(|i| {
if i % repeat == 0 {
current_node = ids[weighted_index.sample(rng)];
current_slot_leader = keys[weighted_index.sample(rng)];
}
current_node
current_slot_leader
})
.collect();
Self::new_from_schedule(slot_leaders)
.collect()
}

pub fn new_from_schedule(slot_leaders: Vec<Pubkey>) -> Self {
let index = slot_leaders
Self {
index: Self::index_from_slot_leaders(&slot_leaders),
slot_leaders,
}
}

fn index_from_slot_leaders(slot_leaders: &[Pubkey]) -> HashMap<Pubkey, Arc<Vec<usize>>> {
slot_leaders
.iter()
.enumerate()
.map(|(i, pk)| (*pk, i))
.into_group_map()
.into_iter()
.map(|(k, v)| (k, Arc::new(v)))
.collect();
Self {
slot_leaders,
index,
}
.collect()
}

pub fn get_slot_leaders(&self) -> &[Pubkey] {
Expand Down Expand Up @@ -97,6 +123,22 @@ impl Index<u64> for LeaderSchedule {
}
}

fn sort_stakes(stakes: &mut Vec<(&Pubkey, u64)>) {
// Sort first by stake. If stakes are the same, sort by pubkey to ensure a
// deterministic result.
// Note: Use unstable sort, because we dedup right after to remove the equal elements.
stakes.sort_unstable_by(|(l_pubkey, l_stake), (r_pubkey, r_stake)| {
if r_stake == l_stake {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could compare the tuples directly:

(r_stake, r_pubkey).cmp(&(l_stake, l_pubkey))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR just moves this sort_stakes function from another file, I'm fine with leaving it as is

r_pubkey.cmp(l_pubkey)
} else {
r_stake.cmp(l_stake)
}
});

// Now that it's sorted, we can do an O(n) dedup.
stakes.dedup();
}

#[cfg(test)]
mod tests {
use {super::*, rand::Rng, std::iter::repeat_with};
Expand All @@ -114,35 +156,34 @@ mod tests {
#[test]
fn test_leader_schedule_basic() {
let num_keys = 10;
let stakes: Vec<_> = (0..num_keys)
let stakes: HashMap<_, _> = (0..num_keys)
.map(|i| (solana_pubkey::new_rand(), i))
.collect();

let seed = solana_pubkey::new_rand();
let mut seed_bytes = [0u8; 32];
seed_bytes.copy_from_slice(seed.as_ref());
let epoch: Epoch = rand::random();
let len = num_keys * 10;
let leader_schedule = LeaderSchedule::new(&stakes, seed_bytes, len, 1);
let leader_schedule2 = LeaderSchedule::new(&stakes, seed_bytes, len, 1);
assert_eq!(leader_schedule.slot_leaders.len() as u64, len);
let leader_schedule =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1);
let leader_schedule2 =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1);
assert_eq!(leader_schedule.num_slots() as u64, len);
// Check that the same schedule is reproducibly generated
assert_eq!(leader_schedule, leader_schedule2);
}

#[test]
fn test_repeated_leader_schedule() {
let num_keys = 10;
let stakes: Vec<_> = (0..num_keys)
let stakes: HashMap<_, _> = (0..num_keys)
.map(|i| (solana_pubkey::new_rand(), i))
.collect();

let seed = solana_pubkey::new_rand();
let mut seed_bytes = [0u8; 32];
seed_bytes.copy_from_slice(seed.as_ref());
let epoch = rand::random::<Epoch>();
let len = num_keys * 10;
let repeat = 8;
let leader_schedule = LeaderSchedule::new(&stakes, seed_bytes, len, repeat);
assert_eq!(leader_schedule.slot_leaders.len() as u64, len);
let leader_schedule =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, repeat);
assert_eq!(leader_schedule.num_slots() as u64, len);
let mut leader_node = Pubkey::default();
for (i, node) in leader_schedule.slot_leaders.iter().enumerate() {
if i % repeat as usize == 0 {
Expand All @@ -157,17 +198,17 @@ mod tests {
fn test_repeated_leader_schedule_specific() {
let alice_pubkey = solana_pubkey::new_rand();
let bob_pubkey = solana_pubkey::new_rand();
let stakes = vec![(alice_pubkey, 2), (bob_pubkey, 1)];
let stakes: HashMap<_, _> = [(alice_pubkey, 2), (bob_pubkey, 1)].into_iter().collect();

let seed = Pubkey::default();
let mut seed_bytes = [0u8; 32];
seed_bytes.copy_from_slice(seed.as_ref());
let epoch = 0;
let len = 8;
// What the schedule looks like without any repeats
let leaders1 = LeaderSchedule::new(&stakes, seed_bytes, len, 1).slot_leaders;
let leaders1 =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1).slot_leaders;

// What the schedule looks like with repeats
let leaders2 = LeaderSchedule::new(&stakes, seed_bytes, len, 2).slot_leaders;
let leaders2 =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 2).slot_leaders;
assert_eq!(leaders1.len(), leaders2.len());

let leaders1_expected = vec![
Expand Down Expand Up @@ -219,4 +260,31 @@ mod tests {
}
}
}

#[test]
fn test_sort_stakes_basic() {
let pubkey0 = solana_pubkey::new_rand();
let pubkey1 = solana_pubkey::new_rand();
let mut stakes = vec![(&pubkey0, 1), (&pubkey1, 2)];
sort_stakes(&mut stakes);
assert_eq!(stakes, vec![(&pubkey1, 2), (&pubkey0, 1)]);
}

#[test]
fn test_sort_stakes_with_dup() {
let pubkey0 = solana_pubkey::new_rand();
let pubkey1 = solana_pubkey::new_rand();
let mut stakes = vec![(&pubkey0, 1), (&pubkey1, 2), (&pubkey0, 1)];
sort_stakes(&mut stakes);
assert_eq!(stakes, vec![(&pubkey1, 2), (&pubkey0, 1)]);
}

#[test]
fn test_sort_stakes_with_equal_stakes() {
let pubkey0 = Pubkey::default();
let pubkey1 = solana_pubkey::new_rand();
let mut stakes = vec![(&pubkey0, 1), (&pubkey1, 1)];
sort_stakes(&mut stakes);
assert_eq!(stakes, vec![(&pubkey1, 1), (&pubkey0, 1)]);
}
}
61 changes: 5 additions & 56 deletions ledger/src/leader_schedule_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,9 @@ use {
/// Return the leader schedule for the given epoch.
pub fn leader_schedule(epoch: Epoch, bank: &Bank) -> Option<LeaderSchedule> {
bank.epoch_staked_nodes(epoch).map(|stakes| {
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
let mut stakes: Vec<_> = stakes
.iter()
.map(|(pubkey, stake)| (*pubkey, *stake))
.collect();
sort_stakes(&mut stakes);
LeaderSchedule::new(
LeaderSchedule::new_keyed_by_validator_identity(
&stakes,
seed,
epoch,
bank.get_slots_in_epoch(epoch),
NUM_CONSECUTIVE_LEADER_SLOTS,
)
Expand Down Expand Up @@ -65,22 +58,6 @@ pub fn first_of_consecutive_leader_slots(slot: Slot) -> Slot {
(slot / NUM_CONSECUTIVE_LEADER_SLOTS) * NUM_CONSECUTIVE_LEADER_SLOTS
}

fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
// Sort first by stake. If stakes are the same, sort by pubkey to ensure a
// deterministic result.
// Note: Use unstable sort, because we dedup right after to remove the equal elements.
stakes.sort_unstable_by(|(l_pubkey, l_stake), (r_pubkey, r_stake)| {
if r_stake == l_stake {
r_pubkey.cmp(l_pubkey)
} else {
r_stake.cmp(l_stake)
}
});

// Now that it's sorted, we can do an O(n) dedup.
stakes.dedup();
}

#[cfg(test)]
mod tests {
use {
Expand All @@ -98,15 +75,14 @@ mod tests {
.genesis_config;
let bank = Bank::new_for_tests(&genesis_config);

let pubkeys_and_stakes: Vec<_> = bank
let pubkeys_and_stakes: HashMap<_, _> = bank
.current_epoch_staked_nodes()
.iter()
.map(|(pubkey, stake)| (*pubkey, *stake))
.collect();
let seed = [0u8; 32];
let leader_schedule = LeaderSchedule::new(
let leader_schedule = LeaderSchedule::new_keyed_by_validator_identity(
&pubkeys_and_stakes,
seed,
0,
genesis_config.epoch_schedule.slots_per_epoch,
NUM_CONSECUTIVE_LEADER_SLOTS,
);
Expand All @@ -125,31 +101,4 @@ mod tests {
let bank = Bank::new_for_tests(&genesis_config);
assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey);
}

#[test]
fn test_sort_stakes_basic() {
let pubkey0 = solana_pubkey::new_rand();
let pubkey1 = solana_pubkey::new_rand();
let mut stakes = vec![(pubkey0, 1), (pubkey1, 2)];
sort_stakes(&mut stakes);
assert_eq!(stakes, vec![(pubkey1, 2), (pubkey0, 1)]);
}

#[test]
fn test_sort_stakes_with_dup() {
let pubkey0 = solana_pubkey::new_rand();
let pubkey1 = solana_pubkey::new_rand();
let mut stakes = vec![(pubkey0, 1), (pubkey1, 2), (pubkey0, 1)];
sort_stakes(&mut stakes);
assert_eq!(stakes, vec![(pubkey1, 2), (pubkey0, 1)]);
}

#[test]
fn test_sort_stakes_with_equal_stakes() {
let pubkey0 = Pubkey::default();
let pubkey1 = solana_pubkey::new_rand();
let mut stakes = vec![(pubkey0, 1), (pubkey1, 1)];
sort_stakes(&mut stakes);
assert_eq!(stakes, vec![(pubkey1, 1), (pubkey0, 1)]);
}
}
Loading