Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VID changes #5856

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion core/src/subgraph/inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use graph::{
store::{DeploymentLocator, SourceableStore, SubgraphFork, WritableStore},
subgraph::ProofOfIndexingVersion,
},
data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion},
data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion, SPEC_VERSION_1_3_0},
data_source::DataSourceTemplate,
prelude::BlockNumber,
semver::Version,
};
use std::collections::BTreeSet;
use std::sync::Arc;
Expand All @@ -28,6 +29,7 @@ pub struct IndexingInputs<C: Blockchain> {
pub static_filters: bool,
pub poi_version: ProofOfIndexingVersion,
pub network: String,
pub spec_version: Version,

/// Whether to instrument trigger processing and log additional,
/// possibly expensive and noisy, information
Expand All @@ -53,6 +55,7 @@ impl<C: Blockchain> IndexingInputs<C> {
static_filters,
poi_version,
network,
spec_version,
instrument,
} = self;
IndexingInputs {
Expand All @@ -72,7 +75,14 @@ impl<C: Blockchain> IndexingInputs<C> {
static_filters: *static_filters,
poi_version: *poi_version,
network: network.clone(),
spec_version: spec_version.clone(),
instrument: *instrument,
}
}

// Whether to use strict vid order for the subgraph
// This is true for all subgraphs with spec version 1.3.0 or greater
pub fn strict_vid_order(&self) -> bool {
self.spec_version >= SPEC_VERSION_1_3_0
}
}
1 change: 1 addition & 0 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
static_filters: self.static_filters,
poi_version,
network: network.to_string(),
spec_version: manifest.spec_version.clone(),
instrument,
};

Expand Down
8 changes: 7 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ where
let mut block_state = BlockState::new(
self.inputs.store.clone(),
std::mem::take(&mut self.state.entity_lfu_cache),
self.inputs.strict_vid_order(),
);

let _section = self
Expand Down Expand Up @@ -795,6 +796,7 @@ where
let block_state = BlockState::new(
self.inputs.store.clone(),
std::mem::take(&mut self.state.entity_lfu_cache),
self.inputs.strict_vid_order(),
);

self.ctx
Expand Down Expand Up @@ -1156,7 +1158,11 @@ where
// Using an `EmptyStore` and clearing the cache for each trigger is a makeshift way to
// get causality region isolation.
let schema = ReadStore::input_schema(&self.inputs.store);
let mut block_state = BlockState::new(EmptyStore::new(schema), LfuCache::new());
let mut block_state = BlockState::new(
EmptyStore::new(schema),
LfuCache::new(),
self.inputs.strict_vid_order(),
);

// PoI ignores offchain events.
// See also: poi-ignores-offchain
Expand Down
53 changes: 29 additions & 24 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ pub struct EntityCache {
// Sequence number of the next VID value for this block. The value written
// in the database consist of a block number and this SEQ number.
pub vid_seq: u32,

/// The spec version of the subgraph being processed
pub strict_vid_order: bool,
}

impl Debug for EntityCache {
Expand Down Expand Up @@ -141,6 +144,7 @@ impl EntityCache {
store,
seq: 0,
vid_seq: RESERVED_VIDS,
strict_vid_order: false,
}
}

Expand All @@ -152,7 +156,11 @@ impl EntityCache {
self.schema.make_entity(iter)
}

pub fn with_current(store: Arc<dyn s::ReadStore>, current: EntityLfuCache) -> EntityCache {
pub fn with_current(
store: Arc<dyn s::ReadStore>,
current: EntityLfuCache,
strict_vid_order: bool,
) -> EntityCache {
EntityCache {
current,
updates: HashMap::new(),
Expand All @@ -162,6 +170,7 @@ impl EntityCache {
store,
seq: 0,
vid_seq: RESERVED_VIDS,
strict_vid_order,
}
}

Expand Down Expand Up @@ -213,16 +222,8 @@ impl EntityCache {
// always creates it in a new style.
debug_assert!(match scope {
GetScope::Store => {
// Release build will never call this function and hence it's OK
// when that implementation is not correct.
fn remove_vid(entity: Option<Arc<Entity>>) -> Option<Entity> {
entity.map(|e| {
#[allow(unused_mut)]
let mut entity = (*e).clone();
#[cfg(debug_assertions)]
entity.remove("vid");
entity
})
entity.map(|e| e.clone_no_vid())
}
remove_vid(entity.clone()) == remove_vid(self.store.get(key).unwrap().map(Arc::new))
}
Expand Down Expand Up @@ -397,19 +398,23 @@ impl EntityCache {
*write_capacity_remaining -= weight;
}

// The next VID is based on a block number and a sequence within the block
let vid = ((block as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
let is_object = key.entity_type.is_object_type();

let mut entity = entity;
let old_vid = entity.set_vid(vid).expect("the vid should be set");
// Make sure that there was no VID previously set for this entity.
if let Some(ovid) = old_vid {
bail!(
"VID: {} of entity: {} with ID: {} was already present when set in EntityCache",
ovid,
key.entity_type,
entity.id()
);
if self.strict_vid_order && is_object {
// Make sure that there was no VID previously set for this entity.
if let Some(ovid) = entity.vid() {
bail!(
"VID: {} of entity: {} with ID: {} was already present when set in EntityCache",
ovid,
key.entity_type,
entity.id()
);
}
// The next VID is based on a block number and a sequence within the block
let vid = ((block as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
entity.set_vid(vid);
}

self.entity_op(key.clone(), EntityOp::Update(entity));
Expand Down Expand Up @@ -534,7 +539,7 @@ impl EntityCache {
.map_err(|e| key.unknown_attribute(e))?;
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.cheap_clone()));
if current != data {
if current.clone_no_vid() != data.clone_no_vid() {
Some(Overwrite {
key,
data,
Expand All @@ -549,7 +554,7 @@ impl EntityCache {
(Some(current), EntityOp::Overwrite(data)) => {
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.cheap_clone()));
if current != data {
if current.clone_no_vid() != data.clone_no_vid() {
Some(Overwrite {
key,
data,
Expand Down
4 changes: 2 additions & 2 deletions graph/src/components/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ pub struct BlockState {
}

impl BlockState {
pub fn new(store: impl ReadStore, lfu_cache: EntityLfuCache) -> Self {
pub fn new(store: impl ReadStore, lfu_cache: EntityLfuCache, strict_vid_order: bool) -> Self {
BlockState {
entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache),
entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache, strict_vid_order),
deterministic_errors: Vec::new(),
created_data_sources: Vec::new(),
persisted_data_sources: Vec::new(),
Expand Down
Loading
Loading