Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce artificial random storage service failures in storage-service for simulations. #3304

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions linera-storage-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ repository.workspace = true
version.workspace = true

[features]
artificial_random_read_error = ["dep:rand"]
artificial_random_write_error = ["dep:rand"]
metrics = ["linera-views/metrics"]
rocksdb = ["linera-views/rocksdb"]
storage-service = []
Expand All @@ -31,6 +33,7 @@ linera-base.workspace = true
linera-version.workspace = true
linera-views.workspace = true
prost.workspace = true
rand = { workspace = true, optional = true, features = ["small_rng"] }
serde.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
Expand Down
119 changes: 113 additions & 6 deletions linera-storage-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@ use std::{
},
};

#[cfg(any(
feature = "artificial_random_read_error",
feature = "artificial_random_write_error"
))]
use {
linera_views::random::{make_deterministic_rng, DeterministicRng},
rand::Rng,
std::{ops::DerefMut, sync::Mutex},
};

#[cfg(feature = "artificial_random_read_error")]
const READ_ERROR_FREQUENCY: usize = 10;

#[cfg(feature = "artificial_random_write_error")]
const WRITE_ERROR_FREQUENCY: usize = 100;

use async_lock::{Semaphore, SemaphoreGuard};
use futures::future::join_all;
use linera_base::ensure;
Expand Down Expand Up @@ -73,6 +89,11 @@ pub struct ServiceStoreClientInternal {
prefix_len: usize,
start_key: Vec<u8>,
root_key_written: Arc<AtomicBool>,
#[cfg(any(
feature = "artificial_random_read_error",
feature = "artificial_random_write_error"
))]
rng: Arc<Mutex<DeterministicRng>>,
}

impl WithError for ServiceStoreClientInternal {
Expand All @@ -89,6 +110,9 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
}

async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ServiceStoreError> {
#[cfg(feature = "artificial_random_read_error")]
self.trigger_read_error()?;

ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
let mut full_key = self.start_key.clone();
full_key.extend(key);
Expand All @@ -112,6 +136,9 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
}

async fn contains_key(&self, key: &[u8]) -> Result<bool, ServiceStoreError> {
#[cfg(feature = "artificial_random_read_error")]
self.trigger_read_error()?;

ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
let mut full_key = self.start_key.clone();
full_key.extend(key);
Expand All @@ -127,6 +154,9 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
}

async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, ServiceStoreError> {
#[cfg(feature = "artificial_random_read_error")]
self.trigger_read_error()?;

let mut full_keys = Vec::new();
for key in keys {
ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
Expand All @@ -149,6 +179,9 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
&self,
keys: Vec<Vec<u8>>,
) -> Result<Vec<Option<Vec<u8>>>, ServiceStoreError> {
#[cfg(feature = "artificial_random_read_error")]
self.trigger_read_error()?;

let mut full_keys = Vec::new();
for key in keys {
ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
Expand Down Expand Up @@ -180,6 +213,9 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
&self,
key_prefix: &[u8],
) -> Result<Vec<Vec<u8>>, ServiceStoreError> {
#[cfg(feature = "artificial_random_read_error")]
self.trigger_read_error()?;

ensure!(
key_prefix.len() <= MAX_KEY_SIZE,
ServiceStoreError::KeyTooLong
Expand Down Expand Up @@ -211,6 +247,9 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
&self,
key_prefix: &[u8],
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ServiceStoreError> {
#[cfg(feature = "artificial_random_read_error")]
self.trigger_read_error()?;

ensure!(
key_prefix.len() <= MAX_KEY_SIZE,
ServiceStoreError::KeyTooLong
Expand Down Expand Up @@ -247,6 +286,9 @@ impl WritableKeyValueStore for ServiceStoreClientInternal {
const MAX_VALUE_SIZE: usize = usize::MAX;

async fn write_batch(&self, batch: Batch) -> Result<(), ServiceStoreError> {
#[cfg(feature = "artificial_random_write_error")]
self.trigger_write_error()?;

if batch.operations.is_empty() {
return Ok(());
}
Expand Down Expand Up @@ -406,6 +448,73 @@ impl ServiceStoreClientInternal {
}
Ok(bcs::from_bytes(&value)?)
}

#[cfg(any(
feature = "artificial_random_read_error",
feature = "artificial_random_write_error"
))]
fn build(
channel: Channel,
semaphore: Option<Arc<Semaphore>>,
max_stream_queries: usize,
prefix_len: usize,
start_key: Vec<u8>,
) -> Self {
let root_key_written = Arc::new(AtomicBool::new(false));
let rng = make_deterministic_rng();
let rng = Arc::new(Mutex::new(rng));
Self {
channel,
semaphore,
max_stream_queries,
prefix_len,
start_key,
root_key_written,
rng,
}
}

#[cfg(not(any(
feature = "artificial_random_read_error",
feature = "artificial_random_write_error"
)))]
fn build(
channel: Channel,
semaphore: Option<Arc<Semaphore>>,
max_stream_queries: usize,
prefix_len: usize,
start_key: Vec<u8>,
) -> Self {
let root_key_written = Arc::new(AtomicBool::new(false));
Self {
channel,
semaphore,
max_stream_queries,
prefix_len,
start_key,
root_key_written,
}
}

#[cfg(feature = "artificial_random_read_error")]
fn trigger_read_error(&self) -> Result<(), ServiceStoreError> {
let mut rng = self.rng.lock().expect("The rng");
let rng = rng.deref_mut();
match rng.gen_range(0..READ_ERROR_FREQUENCY) {
0 => Err(ServiceStoreError::ArtificialReadError),
_ => Ok(()),
}
}

#[cfg(feature = "artificial_random_write_error")]
fn trigger_write_error(&self) -> Result<(), ServiceStoreError> {
let mut rng = self.rng.lock().expect("The rng");
let rng = rng.deref_mut();
match rng.gen_range(0..WRITE_ERROR_FREQUENCY) {
0 => Err(ServiceStoreError::ArtificialWriteBatchError),
_ => Ok(()),
}
}
}

impl AdminKeyValueStore for ServiceStoreClientInternal {
Expand Down Expand Up @@ -433,14 +542,13 @@ impl AdminKeyValueStore for ServiceStoreClientInternal {
let endpoint = config.http_address();
let endpoint = Endpoint::from_shared(endpoint)?;
let channel = endpoint.connect_lazy();
Ok(Self {
Ok(Self::build(
channel,
semaphore,
max_stream_queries,
prefix_len,
start_key,
root_key_written: Arc::new(AtomicBool::new(false)),
})
))
}

fn clone_with_root_key(&self, root_key: &[u8]) -> Result<Self, ServiceStoreError> {
Expand All @@ -450,14 +558,13 @@ impl AdminKeyValueStore for ServiceStoreClientInternal {
let max_stream_queries = self.max_stream_queries;
let mut start_key = self.start_key[..prefix_len].to_vec();
start_key.extend(root_key);
Ok(Self {
Ok(Self::build(
channel,
semaphore,
max_stream_queries,
prefix_len,
start_key,
root_key_written: Arc::new(AtomicBool::new(false)),
})
))
}

async fn list_all(config: &Self::Config) -> Result<Vec<String>, ServiceStoreError> {
Expand Down
10 changes: 10 additions & 0 deletions linera-storage-service/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ pub enum ServiceStoreError {
/// An error occurred during BCS serialization
#[error(transparent)]
BcsError(#[from] bcs::Error),

#[cfg(feature = "artificial_random_read_error")]
/// An artificial read error occurred
#[error("An artificial read error occurred")]
ArtificialReadError,

#[cfg(feature = "artificial_random_write_error")]
/// An artificial write batch error occurred
#[error("An artificial write batch error occurred")]
ArtificialWriteBatchError,
}

impl KeyValueStoreError for ServiceStoreError {
Expand Down
1 change: 0 additions & 1 deletion linera-views/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ pub mod metrics;
mod graphql;

/// Functions for random generation
#[cfg(with_testing)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you must have this, please add #[doc(hidden)]

pub mod random;

/// Helper types for tests.
Expand Down