Skip to content

Commit

Permalink
removes trailing bytes before de-duplicating shreds (#4769)
Browse files Browse the repository at this point in the history
For backward compatibility we need to allow trailing bytes in the
packet after the shred payload.

The extra bytes are truncated in Shred{Code,Data}::from_payload:
https://github.com/anza-xyz/agave/blob/744482070/ledger/src/shred/merkle.rs#L513
https://github.com/anza-xyz/agave/blob/744482070/ledger/src/shred/merkle.rs#L574

but that happens late after shred deduper which might allow duplicate
shreds to pass through.

The commit updates shred::wire::get_shred{,_mut} to remove trailing
bytes and excludes them from the deduper.
  • Loading branch information
behzadnouri authored Feb 5, 2025
1 parent 3b213aa commit a3be625
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 32 deletions.
4 changes: 3 additions & 1 deletion ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ impl From<ShredVariant> for ShredType {
}

impl From<ShredVariant> for u8 {
#[inline]
fn from(shred_variant: ShredVariant) -> u8 {
match shred_variant {
ShredVariant::LegacyCode => u8::from(ShredType::Code),
Expand Down Expand Up @@ -723,6 +724,7 @@ impl From<ShredVariant> for u8 {

impl TryFrom<u8> for ShredVariant {
type Error = Error;
#[inline]
fn try_from(shred_variant: u8) -> Result<Self, Self::Error> {
if shred_variant == u8::from(ShredType::Code) {
Ok(ShredVariant::LegacyCode)
Expand Down Expand Up @@ -1261,7 +1263,7 @@ mod tests {
|_| false, // drop_unchained_merkle_shreds
&mut stats
));
assert_eq!(stats.bad_parent_offset, 1);
assert_eq!(stats.index_overrun, 5);
}
{
let mut stats = ShredFetchStats::default();
Expand Down
84 changes: 60 additions & 24 deletions ledger/src/shred/wire.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Helper methods to extract pieces of the shred from the payload without
// deserializing the entire payload.
#![deny(clippy::indexing_slicing)]
use {
crate::shred::{
self, merkle::SIZE_OF_MERKLE_ROOT, Error, Nonce, ShredFlags, ShredId, ShredType,
ShredVariant, SignedData, SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_NONCE,
self, merkle::SIZE_OF_MERKLE_ROOT, traits::Shred, Error, Nonce, ShredFlags, ShredId,
ShredType, ShredVariant, SignedData, SIZE_OF_COMMON_SHRED_HEADER,
},
solana_perf::packet::Packet,
solana_sdk::{
Expand All @@ -21,36 +22,39 @@ use {
};

#[inline]
fn get_shred_size(packet: &Packet) -> Option<usize> {
let size = packet.data(..)?.len();
if packet.meta().repair() {
size.checked_sub(SIZE_OF_NONCE)
} else {
Some(size)
}
fn get_shred_size(shred: &[u8]) -> Option<usize> {
// Legacy data shreds have zero padding at the end which might have been
// trimmed. Other variants do not have any trailing zeros.
Some(match get_shred_variant(shred).ok()? {
ShredVariant::LegacyCode => shred::legacy::ShredCode::SIZE_OF_PAYLOAD,
ShredVariant::LegacyData => shred::legacy::ShredData::SIZE_OF_PAYLOAD.min(shred.len()),
ShredVariant::MerkleCode { .. } => shred::merkle::ShredCode::SIZE_OF_PAYLOAD,
ShredVariant::MerkleData { .. } => shred::merkle::ShredData::SIZE_OF_PAYLOAD,
})
}

#[inline]
pub fn get_shred(packet: &Packet) -> Option<&[u8]> {
let size = get_shred_size(packet)?;
packet.data(..size)
let data = packet.data(..)?;
data.get(..get_shred_size(data)?)
}

#[inline]
pub fn get_shred_mut(packet: &mut Packet) -> Option<&mut [u8]> {
let size = get_shred_size(packet)?;
packet.buffer_mut().get_mut(..size)
let buffer = packet.buffer_mut();
buffer.get_mut(..get_shred_size(buffer)?)
}

#[inline]
pub fn get_shred_and_repair_nonce(packet: &Packet) -> Option<(&[u8], Option<Nonce>)> {
let data = packet.data(..)?;
let shred = data.get(..get_shred_size(data)?)?;
if !packet.meta().repair() {
return Some((data, None));
return Some((shred, None));
}
let offset = data.len().checked_sub(4)?;
let (shred, nonce) = data.split_at(offset);
let nonce = u32::from_le_bytes(<[u8; 4]>::try_from(nonce).unwrap());
let nonce = <[u8; 4]>::try_from(data.get(offset..)?).ok()?;
let nonce = u32::from_le_bytes(nonce);
Some((shred, Some(nonce)))
}

Expand Down Expand Up @@ -353,6 +357,7 @@ pub fn resign_shred(shred: &mut [u8], keypair: &Keypair) -> Result<(), Error> {

// Minimally corrupts the packet so that the signature no longer verifies.
#[cfg(test)]
#[allow(clippy::indexing_slicing)]
pub(crate) fn corrupt_packet<R: Rng>(
rng: &mut R,
packet: &mut Packet,
Expand Down Expand Up @@ -391,7 +396,10 @@ pub(crate) fn corrupt_packet<R: Rng>(
let size = shred.len() - if resigned { SIGNATURE_BYTES } else { 0 };
size - offset..size
})
.or_else(|| get_signed_data_offsets(shred));
.or_else(|| {
let Range { start, end } = get_signed_data_offsets(shred)?;
Some(start + 1..end) // +1 to exclude ShredVariant.
});
modify_packet(rng, packet, offsets.unwrap());
}
// Assert that the signature no longer verifies.
Expand Down Expand Up @@ -434,6 +442,36 @@ mod tests {
Signature::from(signature)
}

fn write_shred<R: Rng>(
rng: &mut R,
shred: impl AsRef<[u8]>,
nonce: Option<Nonce>,
packet: &mut Packet,
) {
let buffer = packet.buffer_mut();
let capacity = buffer.len();
let mut cursor = Cursor::new(buffer);
cursor.write_all(shred.as_ref()).unwrap();
// Write some random many bytes trailing shred payload.
let mut bytes = {
let size = capacity
- cursor.position() as usize
- if nonce.is_some() {
std::mem::size_of::<Nonce>()
} else {
0
};
vec![0u8; rng.gen_range(0..=size)]
};
rng.fill(&mut bytes[..]);
cursor.write_all(&bytes).unwrap();
// Write nonce after random trailing bytes.
if let Some(nonce) = nonce {
cursor.write_all(&nonce.to_le_bytes()).unwrap();
}
packet.meta_mut().size = usize::try_from(cursor.position()).unwrap();
}

#[test_case(false, false, false)]
#[test_case(false, false, true)]
#[test_case(false, true, false)]
Expand Down Expand Up @@ -466,13 +504,11 @@ mod tests {
}
for shred in &shreds {
let nonce = repaired.then(|| rng.gen::<Nonce>());
let mut cursor = Cursor::new(packet.buffer_mut());
cursor.write_all(shred.payload()).unwrap();
if let Some(nonce) = nonce {
cursor.write_all(&nonce.to_le_bytes()).unwrap();
}
packet.meta_mut().size = usize::try_from(cursor.position()).unwrap();
assert_eq!(get_shred_size(&packet).unwrap(), shred.payload().len());
write_shred(&mut rng, shred.payload(), nonce, &mut packet);
assert_eq!(
packet.data(..).map(get_shred_size).unwrap().unwrap(),
shred.payload().len()
);
assert_eq!(get_shred(&packet).unwrap(), shred.payload().as_ref());
assert_eq!(
get_shred_mut(&mut packet).unwrap(),
Expand Down
17 changes: 10 additions & 7 deletions turbine/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,25 +146,28 @@ fn run_shred_sigverify<const K: usize>(
stats.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
stats.num_discards_pre += count_discards(&packets);
// Repair shreds include a randomly generated u32 nonce, so it does not
// make sense to deduplicate them (i.e. they are not duplicate of any other
// shred).
// make sense to deduplicate the entire packet payload (i.e. they are not
// duplicate of any other packet.data(..)).
// If the nonce is excluded from the deduper then false positives might
// prevent us from repairing a block until the deduper is reset after
// DEDUPER_RESET_CYCLE. A workaround is to also repair "coding" shreds to
// add some redundancy but that is not implemented at the moment.
// Because the repair nonce is already verified in shred-fetch-stage we can
// exclude repair shreds from the deduper.
// exclude repair shreds from the deduper, but we still need to pass the
// repair shred to the deduper to filter out duplicates from the turbine
// path once a shred is repaired.
// For backward compatibility we need to allow trailing bytes in the packet
// after the shred payload, but have to exclude them here from the deduper.
stats.num_duplicates += thread_pool.install(|| {
packets
.par_iter_mut()
.flatten()
.filter(|packet| {
!packet.meta().discard()
&& !packet.meta().repair()
&& packet
.data(..)
.map(|data| deduper.dedup(data))
&& shred::wire::get_shred(packet)
.map(|shred| deduper.dedup(shred))
.unwrap_or(true)
&& !packet.meta().repair()
})
.map(|packet| packet.meta_mut().set_discard(true))
.count()
Expand Down

0 comments on commit a3be625

Please sign in to comment.