Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap entity with VIDs as a separate field #5853

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use crate::cheap_clone::CheapClone;
use crate::components::store::write::EntityModification;
use crate::components::store::{self as s, Entity, EntityOperation};
use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator};
use crate::data::store::{EntityV, EntityValidationError, Id, IdType, IntoEntityIterator};
use crate::prelude::{CacheWeight, ENV_VARS};
use crate::schema::{EntityKey, InputSchema};
use crate::util::intern::Error as InternError;
Expand All @@ -33,8 +33,8 @@ pub enum GetScope {
#[derive(Debug, Clone)]
enum EntityOp {
Remove,
Update(Entity),
Overwrite(Entity),
Update(EntityV),
Overwrite(EntityV),
}

impl EntityOp {
Expand All @@ -45,10 +45,10 @@ impl EntityOp {
use EntityOp::*;
match (self, entity) {
(Remove, _) => Ok(None),
(Overwrite(new), _) | (Update(new), None) => Ok(Some(new)),
(Overwrite(new), _) | (Update(new), None) => Ok(Some(new.e)),
(Update(updates), Some(entity)) => {
let mut e = entity.borrow().clone();
e.merge_remove_null_fields(updates)?;
e.merge_remove_null_fields(updates.e)?;
Ok(Some(e))
}
}
Expand All @@ -69,7 +69,7 @@ impl EntityOp {
match self {
// This is how `Overwrite` is constructed, by accumulating `Update` onto `Remove`.
Remove => *self = Overwrite(update),
Update(current) | Overwrite(current) => current.merge(update),
Update(current) | Overwrite(current) => current.e.merge(update.e),
}
}
}
Expand Down Expand Up @@ -304,9 +304,9 @@ impl EntityCache {
) -> Result<Option<Entity>, anyhow::Error> {
match op {
EntityOp::Update(entity) | EntityOp::Overwrite(entity)
if query.matches(key, entity) =>
if query.matches(key, &entity.e) =>
{
Ok(Some(entity.clone()))
Ok(Some(entity.e.clone()))
}
EntityOp::Remove => Ok(None),
_ => Ok(None),
Expand Down Expand Up @@ -400,19 +400,19 @@ impl EntityCache {
// The next VID is based on a block number and a sequence within the block
let vid = ((block as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
let mut entity = entity;
let old_vid = entity.set_vid(vid).expect("the vid should be set");
// Make sure that there was no VID previously set for this entity.
if let Some(ovid) = old_vid {
bail!(
"VID: {} of entity: {} with ID: {} was already present when set in EntityCache",
ovid,
key.entity_type,
entity.id()
);
}

self.entity_op(key.clone(), EntityOp::Update(entity));
// let mut entity = entity;
// let old_vid = entity.set_vid(vid).expect("the vid should be set");
// // Make sure that there was no VID previously set for this entity.
// if let Some(ovid) = old_vid {
// bail!(
// "VID: {} of entity: {} with ID: {} was already present when set in EntityCache",
// ovid,
// key.entity_type,
// entity.id()
// );
// }

self.entity_op(key.clone(), EntityOp::Update(EntityV::new(entity, vid)));

// The updates we were given are not valid by themselves; force a
// lookup in the database and check again with an entity that merges
Expand Down Expand Up @@ -517,20 +517,23 @@ impl EntityCache {
// Entity was created
(None, EntityOp::Update(mut updates))
| (None, EntityOp::Overwrite(mut updates)) => {
updates.remove_null_fields();
let data = Arc::new(updates);
let vid = updates.vid;
updates.e.remove_null_fields();
let data = Arc::new(updates.e.clone());
self.current.insert(key.clone(), Some(data.cheap_clone()));
Some(Insert {
key,
data,
block,
end: None,
vid,
})
}
// Entity may have been changed
(Some(current), EntityOp::Update(updates)) => {
let vid = updates.vid;
let mut data = current.as_ref().clone();
data.merge_remove_null_fields(updates)
data.merge_remove_null_fields(updates.e)
.map_err(|e| key.unknown_attribute(e))?;
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.cheap_clone()));
Expand All @@ -540,21 +543,24 @@ impl EntityCache {
data,
block,
end: None,
vid,
})
} else {
None
}
}
// Entity was removed and then updated, so it will be overwritten
(Some(current), EntityOp::Overwrite(data)) => {
let data = Arc::new(data);
let vid = data.vid;
let data = Arc::new(data.e.clone());
self.current.insert(key.clone(), Some(data.cheap_clone()));
if current != data {
Some(Overwrite {
key,
data,
block,
end: None,
vid,
})
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::cheap_clone::CheapClone;
use crate::components::store::write::EntityModification;
use crate::constraint_violation;
use crate::data::store::scalar::Bytes;
use crate::data::store::{Id, IdList, Value};
use crate::data::store::{EntityV, Id, IdList, Value};
use crate::data::value::Word;
use crate::data_source::CausalityRegion;
use crate::derive::CheapClone;
Expand Down Expand Up @@ -829,7 +829,7 @@ where
pub enum EntityOperation {
/// Locates the entity specified by `key` and sets its attributes according to the contents of
/// `data`. If no entity exists with this key, creates a new entity.
Set { key: EntityKey, data: Entity },
Set { key: EntityKey, data: EntityV },

/// Removes an entity with the specified key, if one exists.
Remove { key: EntityKey },
Expand Down
15 changes: 13 additions & 2 deletions graph/src/components/store/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ pub enum EntityModification {
data: Arc<Entity>,
block: BlockNumber,
end: Option<BlockNumber>,
vid: i64,
},
/// Update the entity by overwriting it
Overwrite {
key: EntityKey,
data: Arc<Entity>,
block: BlockNumber,
end: Option<BlockNumber>,
vid: i64,
},
/// Remove the entity
Remove { key: EntityKey, block: BlockNumber },
Expand All @@ -67,6 +69,7 @@ pub struct EntityWrite<'a> {
// The end of the block range for which this write is valid. The value
// of `end` itself is not included in the range
pub end: Option<BlockNumber>,
vid: i64,
}

impl std::fmt::Display for EntityWrite<'_> {
Expand All @@ -89,24 +92,28 @@ impl<'a> TryFrom<&'a EntityModification> for EntityWrite<'a> {
data,
block,
end,
vid,
} => Ok(EntityWrite {
id: &key.entity_id,
entity: data,
causality_region: key.causality_region,
block: *block,
end: *end,
vid: *vid,
}),
EntityModification::Overwrite {
key,
data,
block,
end,
vid,
} => Ok(EntityWrite {
id: &key.entity_id,
entity: &data,
causality_region: key.causality_region,
block: *block,
end: *end,
vid: *vid,
}),

EntityModification::Remove { .. } => Err(()),
Expand Down Expand Up @@ -213,11 +220,13 @@ impl EntityModification {
data,
block,
end,
vid,
} => Ok(Insert {
key,
data,
block,
end,
vid,
}),
Remove { key, .. } => {
return Err(constraint_violation!(
Expand Down Expand Up @@ -271,21 +280,23 @@ impl EntityModification {
}

impl EntityModification {
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
EntityModification::Insert {
key,
data: Arc::new(data),
block,
end: None,
vid,
}
}

pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
EntityModification::Overwrite {
key,
data: Arc::new(data),
block,
end: None,
vid,
}
}

Expand Down
13 changes: 13 additions & 0 deletions graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,19 @@ impl std::fmt::Debug for Entity {
}
}

/// An entity wrapper that has VID too.
#[derive(Debug, Clone, CacheWeight, PartialEq, Eq, Serialize)]
pub struct EntityV {
pub e: Entity,
pub vid: i64,
}

impl EntityV {
pub fn new(e: Entity, vid: i64) -> Self {
Self { e, vid }
}
}

/// An object that is returned from a query. It's a an `r::Value` which
/// carries the attributes of the object (`__typename`, `id` etc.) and
/// possibly a pointer to its parent if the query that constructed it is one
Expand Down
2 changes: 1 addition & 1 deletion server/index-node/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ fn entity_changes_to_graphql(entity_changes: Vec<EntityOperation>) -> r::Value {
.push(key.entity_id);
}
EntityOperation::Set { key, data } => {
updates.entry(key.entity_type).or_default().push(data);
updates.entry(key.entity_type).or_default().push(data.e);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use crate::{
},
};
use graph::components::store::{AttributeNames, DerivedEntityQuery};
use graph::data::store::{Id, IdList, IdType, BYTES_SCALAR};
use graph::data::store::{EntityV, Id, IdList, IdType, BYTES_SCALAR};
use graph::data::subgraph::schema::POI_TABLE;
use graph::prelude::{
anyhow, info, BlockNumber, DeploymentHash, Entity, EntityChange, EntityOperation, Logger,
Expand Down Expand Up @@ -731,9 +731,10 @@ impl Layout {
let entity_id = data.id();
processed_entities.insert((entity_type.clone(), entity_id.clone()));

let vid = data.vid();
changes.push(EntityOperation::Set {
key: entity_type.key_in(entity_id, CausalityRegion::from_entity(&data)),
data,
data: EntityV::new(data, vid),
});
}

Expand Down
Loading