Skip to content

Commit

Permalink
refactor(papyrus_p2p_sync): change P2P to P2p everywhere (starkware-l…
Browse files Browse the repository at this point in the history
  • Loading branch information
noamsp-starkware authored Jan 12, 2025
1 parent b1e0c39 commit da8fd1d
Show file tree
Hide file tree
Showing 22 changed files with 121 additions and 121 deletions.
8 changes: 4 additions & 4 deletions crates/papyrus_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use papyrus_config::{ConfigError, ParamPath, ParamPrivacyInput, SerializedParam}
use papyrus_consensus::config::ConsensusConfig;
use papyrus_monitoring_gateway::MonitoringGatewayConfig;
use papyrus_network::NetworkConfig;
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientConfig};
use papyrus_p2p_sync::client::{P2pSyncClient, P2pSyncClientConfig};
#[cfg(feature = "rpc")]
use papyrus_rpc::RpcConfig;
use papyrus_storage::db::DbConfig;
Expand Down Expand Up @@ -60,9 +60,9 @@ pub struct NodeConfig {
/// None if the syncing should be disabled.
pub sync: Option<SyncConfig>,
/// One of p2p_sync or sync must be None.
/// If P2P sync is active, then network must be active too.
// TODO(yair): Change NodeConfig to have an option of enum of SyncConfig or P2PSyncConfig.
pub p2p_sync: Option<P2PSyncClientConfig>,
/// If p2p sync is active, then network must be active too.
// TODO(yair): Change NodeConfig to have an option of enum of SyncConfig or P2pSyncConfig.
pub p2p_sync: Option<P2pSyncClientConfig>,
pub consensus: Option<ConsensusConfig>,
// TODO(shahak): Make network non-optional once it's developed enough.
pub network: Option<NetworkConfig>,
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_node/src/config/pointers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use papyrus_config::ParamPrivacyInput;
use papyrus_config::{ConfigError, ParamPath, SerializedParam};
use papyrus_monitoring_gateway::MonitoringGatewayConfig;
use papyrus_network::NetworkConfig;
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientConfig};
use papyrus_p2p_sync::client::{P2pSyncClient, P2pSyncClientConfig};
#[cfg(feature = "rpc")]
use papyrus_rpc::RpcConfig;
use papyrus_storage::db::DbConfig;
Expand Down
18 changes: 9 additions & 9 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::client::{P2pSyncClient, P2pSyncClientChannels};
use papyrus_p2p_sync::server::{P2pSyncServer, P2pSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_protobuf::consensus::{HeightAndRound, ProposalPart, StreamMessage};
#[cfg(feature = "rpc")]
Expand Down Expand Up @@ -294,13 +294,13 @@ async fn spawn_sync_client(
.register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE);
let class_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE);
let p2p_sync_client_channels = P2PSyncClientChannels::new(
let p2p_sync_client_channels = P2pSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
transaction_client_sender,
class_client_sender,
);
let p2p_sync = P2PSyncClient::new(
let p2p_sync = P2pSyncClient::new(
p2p_sync_client_config,
storage_reader,
storage_writer,
Expand All @@ -317,7 +317,7 @@ fn spawn_p2p_sync_server(
storage_reader: StorageReader,
) -> JoinHandle<anyhow::Result<()>> {
let Some(network_manager) = network_manager else {
info!("P2P Sync is disabled.");
info!("P2p Sync is disabled.");
return tokio::spawn(future::pending());
};

Expand All @@ -332,15 +332,15 @@ fn spawn_p2p_sync_server(
let event_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);

let p2p_sync_server_channels = P2PSyncServerChannels::new(
let p2p_sync_server_channels = P2pSyncServerChannels::new(
header_server_receiver,
state_diff_server_receiver,
transaction_server_receiver,
class_server_receiver,
event_server_receiver,
);

let p2p_sync_server = P2PSyncServer::new(storage_reader.clone(), p2p_sync_server_channels);
let p2p_sync_server = P2pSyncServer::new(storage_reader.clone(), p2p_sync_server_channels);
tokio::spawn(async move {
p2p_sync_server.run().await;
Ok(())
Expand Down Expand Up @@ -396,7 +396,7 @@ async fn run_threads(
.await?
};

// P2P Sync Server task.
// P2p Sync Server task.
let p2p_sync_server_handle = if let Some(handle) = tasks.p2p_sync_server_handle {
handle
} else {
Expand Down Expand Up @@ -449,7 +449,7 @@ async fn run_threads(
res??
}
res = p2p_sync_server_handle => {
error!("P2P Sync server stopped");
error!("P2p Sync server stopped");
res??
}
res = network_handle => {
Expand Down
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/src/client/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::stream_builder::{
DataStreamBuilder,
ParseDataError,
};
use super::{P2PSyncClientError, NETWORK_DATA_TIMEOUT};
use super::{P2pSyncClientError, NETWORK_DATA_TIMEOUT};

impl BlockData for (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber) {
fn write_to_storage(
Expand Down Expand Up @@ -80,7 +80,7 @@ impl DataStreamBuilder<(ApiContractClass, ClassHash)> for ClassStreamBuilder {
let maybe_contract_class =
tokio::time::timeout(NETWORK_DATA_TIMEOUT, classes_response_manager.next())
.await?
.ok_or(P2PSyncClientError::ReceiverChannelTerminated {
.ok_or(P2pSyncClientError::ReceiverChannelTerminated {
type_description: Self::TYPE_DESCRIPTION,
})?;
let Some((api_contract_class, class_hash)) = maybe_contract_class?.0 else {
Expand Down
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::stream_builder::{
DataStreamBuilder,
ParseDataError,
};
use super::{P2PSyncClientError, ALLOWED_SIGNATURES_LENGTH, NETWORK_DATA_TIMEOUT};
use super::{P2pSyncClientError, ALLOWED_SIGNATURES_LENGTH, NETWORK_DATA_TIMEOUT};

impl BlockData for SignedBlockHeader {
#[allow(clippy::as_conversions)] // FIXME: use int metrics so `as f64` may be removed.
Expand Down Expand Up @@ -82,7 +82,7 @@ impl DataStreamBuilder<SignedBlockHeader> for HeaderStreamBuilder {
let maybe_signed_header =
tokio::time::timeout(NETWORK_DATA_TIMEOUT, signed_headers_response_manager.next())
.await?
.ok_or(P2PSyncClientError::ReceiverChannelTerminated {
.ok_or(P2pSyncClientError::ReceiverChannelTerminated {
type_description: Self::TYPE_DESCRIPTION,
})?;
let Some(signed_block_header) = maybe_signed_header?.0 else {
Expand Down
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/src/client/header_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn signed_headers_basic_flow() {
tokio::select! {
sync_result = p2p_sync.run() => {
sync_result.unwrap();
panic!("P2P sync aborted with no failure.");
panic!("P2p sync aborted with no failure.");
}
_ = parse_queries_future => {}
}
Expand Down Expand Up @@ -192,7 +192,7 @@ async fn sync_sends_new_header_query_if_it_got_partial_responses() {
tokio::select! {
sync_result = p2p_sync.run() => {
sync_result.unwrap();
panic!("P2P sync aborted with no failure.");
panic!("P2p sync aborted with no failure.");
}
_ = parse_queries_future => {}
}
Expand Down
34 changes: 17 additions & 17 deletions crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const ALLOWED_SIGNATURES_LENGTH: usize = 1;
const NETWORK_DATA_TIMEOUT: Duration = Duration::from_secs(300);

#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
pub struct P2PSyncClientConfig {
pub struct P2pSyncClientConfig {
pub num_headers_per_query: u64,
pub num_block_state_diffs_per_query: u64,
pub num_block_transactions_per_query: u64,
Expand All @@ -65,7 +65,7 @@ pub struct P2PSyncClientConfig {
pub buffer_size: usize,
}

impl SerializeConfig for P2PSyncClientConfig {
impl SerializeConfig for P2pSyncClientConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
BTreeMap::from_iter([
ser_param(
Expand Down Expand Up @@ -110,9 +110,9 @@ impl SerializeConfig for P2PSyncClientConfig {
}
}

impl Default for P2PSyncClientConfig {
impl Default for P2pSyncClientConfig {
fn default() -> Self {
P2PSyncClientConfig {
P2pSyncClientConfig {
num_headers_per_query: 10000,
// State diffs are split into multiple messages, so big queries can lead to a lot of
// messages in the network buffers.
Expand All @@ -127,7 +127,7 @@ impl Default for P2PSyncClientConfig {
}

#[derive(thiserror::Error, Debug)]
pub enum P2PSyncClientError {
pub enum P2pSyncClientError {
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error("Network returned more responses than expected for a query.")]
TooManyResponses,
Expand All @@ -152,14 +152,14 @@ type StateSqmrDiffSender = SqmrClientSender<StateDiffQuery, DataOrFin<StateDiffC
type TransactionSqmrSender = SqmrClientSender<TransactionQuery, DataOrFin<FullTransaction>>;
type ClassSqmrSender = SqmrClientSender<ClassQuery, DataOrFin<(ApiContractClass, ClassHash)>>;

pub struct P2PSyncClientChannels {
pub struct P2pSyncClientChannels {
header_sender: HeaderSqmrSender,
state_diff_sender: StateSqmrDiffSender,
transaction_sender: TransactionSqmrSender,
class_sender: ClassSqmrSender,
}

impl P2PSyncClientChannels {
impl P2pSyncClientChannels {
pub fn new(
header_sender: HeaderSqmrSender,
state_diff_sender: StateSqmrDiffSender,
Expand All @@ -171,7 +171,7 @@ impl P2PSyncClientChannels {
pub(crate) fn create_stream(
self,
storage_reader: StorageReader,
config: P2PSyncClientConfig,
config: P2pSyncClientConfig,
internal_blocks_receivers: InternalBlocksReceivers,
) -> impl Stream<Item = DataStreamResult> + Send + 'static {
let header_stream = HeaderStreamBuilder::create_stream(
Expand Down Expand Up @@ -210,34 +210,34 @@ impl P2PSyncClientChannels {
}
}

pub struct P2PSyncClient {
config: P2PSyncClientConfig,
pub struct P2pSyncClient {
config: P2pSyncClientConfig,
storage_reader: StorageReader,
storage_writer: StorageWriter,
p2p_sync_channels: P2PSyncClientChannels,
p2p_sync_channels: P2pSyncClientChannels,
internal_blocks_receiver: BoxStream<'static, SyncBlock>,
}

impl P2PSyncClient {
impl P2pSyncClient {
pub fn new(
config: P2PSyncClientConfig,
config: P2pSyncClientConfig,
storage_reader: StorageReader,
storage_writer: StorageWriter,
p2p_sync_channels: P2PSyncClientChannels,
p2p_sync_channels: P2pSyncClientChannels,
internal_blocks_receiver: BoxStream<'static, SyncBlock>,
) -> Self {
Self { config, storage_reader, storage_writer, p2p_sync_channels, internal_blocks_receiver }
}

#[instrument(skip(self), level = "debug", err)]
pub async fn run(self) -> Result<Never, P2PSyncClientError> {
info!("Starting P2P sync client");
pub async fn run(self) -> Result<Never, P2pSyncClientError> {
info!("Starting p2p sync client");

let InternalBlocksChannels {
receivers: internal_blocks_receivers,
senders: mut internal_blocks_senders,
} = InternalBlocksChannels::new();
let P2PSyncClient {
let P2pSyncClient {
config,
storage_reader,
mut storage_writer,
Expand Down
6 changes: 3 additions & 3 deletions crates/papyrus_p2p_sync/src/client/state_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::client::stream_builder::{
DataStreamBuilder,
ParseDataError,
};
use crate::client::{P2PSyncClientError, NETWORK_DATA_TIMEOUT};
use crate::client::{P2pSyncClientError, NETWORK_DATA_TIMEOUT};

impl BlockData for (ThinStateDiff, BlockNumber) {
#[latency_histogram("p2p_sync_state_diff_write_to_storage_latency_seconds", true)]
Expand Down Expand Up @@ -61,7 +61,7 @@ impl DataStreamBuilder<StateDiffChunk> for StateDiffStreamBuilder {
.get_block_header(block_number)?
.expect("A header with number lower than the header marker is missing")
.state_diff_length
.ok_or(P2PSyncClientError::OldHeaderInStorage {
.ok_or(P2pSyncClientError::OldHeaderInStorage {
block_number,
missing_field: "state_diff_length",
})?;
Expand All @@ -72,7 +72,7 @@ impl DataStreamBuilder<StateDiffChunk> for StateDiffStreamBuilder {
state_diff_chunks_response_manager.next(),
)
.await?
.ok_or(P2PSyncClientError::ReceiverChannelTerminated {
.ok_or(P2pSyncClientError::ReceiverChannelTerminated {
type_description: Self::TYPE_DESCRIPTION,
})?;
let Some(state_diff_chunk) = maybe_state_diff_chunk?.0 else {
Expand Down
14 changes: 7 additions & 7 deletions crates/papyrus_p2p_sync/src/client/stream_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use starknet_api::core::ClassHash;
use starknet_state_sync_types::state_sync_types::SyncBlock;
use tracing::{debug, info, warn};

use super::{P2PSyncClientError, STEP};
use super::{P2pSyncClientError, STEP};

pub type DataStreamResult = Result<Box<dyn BlockData>, P2PSyncClientError>;
pub type DataStreamResult = Result<Box<dyn BlockData>, P2pSyncClientError>;

pub(crate) trait BlockData: Send {
/// Write the block data to the storage.
Expand Down Expand Up @@ -211,8 +211,8 @@ where
Some(Ok(DataOrFin(None))) => {
debug!("Network query ending at block {} for {:?} finished", end_block_number, Self::TYPE_DESCRIPTION);
},
Some(_) => Err(P2PSyncClientError::TooManyResponses)?,
None => Err(P2PSyncClientError::ReceiverChannelTerminated {
Some(_) => Err(P2pSyncClientError::TooManyResponses)?,
None => Err(P2pSyncClientError::ReceiverChannelTerminated {
type_description: Self::TYPE_DESCRIPTION
})?,
}
Expand Down Expand Up @@ -262,20 +262,20 @@ pub(crate) enum BadPeerError {
#[derive(thiserror::Error, Debug)]
pub(crate) enum ParseDataError {
#[error(transparent)]
Fatal(#[from] P2PSyncClientError),
Fatal(#[from] P2pSyncClientError),
#[error(transparent)]
BadPeer(#[from] BadPeerError),
}

impl From<StorageError> for ParseDataError {
fn from(err: StorageError) -> Self {
ParseDataError::Fatal(P2PSyncClientError::StorageError(err))
ParseDataError::Fatal(P2pSyncClientError::StorageError(err))
}
}

impl From<tokio::time::error::Elapsed> for ParseDataError {
fn from(err: tokio::time::error::Elapsed) -> Self {
ParseDataError::Fatal(P2PSyncClientError::NetworkTimeout(err))
ParseDataError::Fatal(P2pSyncClientError::NetworkTimeout(err))
}
}

Expand Down
Loading

0 comments on commit da8fd1d

Please sign in to comment.