Skip to content

Commit

Permalink
shares recovered Merkle data shreds payloads (#4789)
Browse files Browse the repository at this point in the history
Recovered data shreds are concurrently inserted into blockstore while
their payload is sent to retransmit-stage. Using a shared payload
between the two concurrent paths will reduce allocations and memcopies.
  • Loading branch information
behzadnouri authored Feb 7, 2025
1 parent d4cafdb commit 6a95359
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
4 changes: 4 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,10 +1010,14 @@ impl Blockstore {
// are not stored in blockstore.
match shred.shred_type() {
ShredType::Code => {
// Don't need Arc overhead here!
debug_assert_matches!(shred.payload(), shred::Payload::Unique(_));
recovered_shreds.push(shred.into_payload());
None
}
ShredType::Data => {
// Verify that the cloning is cheap here.
debug_assert_matches!(shred.payload(), shred::Payload::Shared(_));
recovered_shreds.push(shred.payload().clone());
Some(shred)
}
Expand Down
15 changes: 14 additions & 1 deletion ledger/src/shred/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,11 @@ fn make_stub_shred(
Shred::ShredData(ShredData {
common_header,
data_header,
payload: Payload::from(payload),
// Recovered data shreds are concurrently inserted into blockstore
// while their payload is sent to retransmit-stage. Using a shared
// payload between the two concurrent paths will reduce allocations
// and memcopies.
payload: Payload::from(std::sync::Arc::new(payload)),
})
};
if let Some(chained_merkle_root) = chained_merkle_root {
Expand Down Expand Up @@ -1768,6 +1772,12 @@ mod test {
}
});
assert_eq!(recovered_shreds, removed_shreds);
for shred in recovered_shreds {
match shred.shred_type() {
ShredType::Code => assert_matches!(shred.payload(), Payload::Unique(_)),
ShredType::Data => assert_matches!(shred.payload(), Payload::Shared(_)),
}
}
}
}

Expand Down Expand Up @@ -2095,6 +2105,9 @@ mod test {
})
.collect();
assert_eq!(recovered_data_shreds.len(), data_shreds.len());
for shred in &recovered_data_shreds {
assert_matches!(shred.payload(), Payload::Shared(_));
}
for (shred, other) in recovered_data_shreds.into_iter().zip(data_shreds) {
match shred {
Shred::ShredCode(_) => panic!("Invalid shred type!"),
Expand Down
9 changes: 8 additions & 1 deletion ledger/src/shred/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
sync::Arc,
};

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq)]
pub enum Payload {
Shared(Arc<Vec<u8>>),
Unique(Vec<u8>),
Expand Down Expand Up @@ -87,6 +87,13 @@ pub(crate) mod serde_bytes_payload {
}
}

impl PartialEq for Payload {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.as_ref() == other.as_ref()
}
}

impl From<Vec<u8>> for Payload {
#[inline]
fn from(bytes: Vec<u8>) -> Self {
Expand Down

0 comments on commit 6a95359

Please sign in to comment.