Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL query interface (very experimental) #5799

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ serde_json = { version = "1.0", features = ["arbitrary_precision"] }
serde_regex = "1.1.0"
serde_yaml = "0.9.21"
slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] }
sqlparser = "0.46.0"
sqlparser = { version = "0.46.0", features = ["visitor"] }
strum = { version = "0.26", features = ["derive"] }
syn = { version = "2.0.66", features = ["full"] }
test-store = { path = "./store/test-store" }
Expand Down
82 changes: 82 additions & 0 deletions docs/implementation/sql-interface.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# SQL Queries

**This interface is extremely experimental. There is no guarantee that this
interface will ever be brought to production use. It's solely here to help
evaluate the utility of such an interface**

SQL queries can be issued by posting a JSON document to
`/subgraphs/sql`. The server will respond with a JSON response that
contains the records matching the query in JSON form.

The body of the request must contain the following keys:

* `deployment`: the hash of the deployment against which the query should
be run
* `query`: the SQL query
* `mode`: either `info` or `data`. When the mode is `info` only some
information of the response is reported, with a mode of `data` the query
result is sent in the response

The SQL query can use all the tables of the given subgraph. Table and
attribute names are snake-cased from their form in the GraphQL schema, so
that data for `SomeDailyStuff` is stored in a table `some_daily_stuff`.

The query can use fairly arbitrary SQL, including aggregations and most
functions built into PostgreSQL.

## Example

For a subgraph whose schema defines an entity `Block`, the following query
```json
{
"query": "select number, hash, parent_hash, timestamp from block order by number desc limit 2",
"deployment": "QmSoMeThInG",
"mode": "data"
}
```

might result in this response
```json
{
"data": [
{
"hash": "\\x5f91e535ee4d328725b869dd96f4c42059e3f2728dfc452c32e5597b28ce68d6",
"number": 5000,
"parent_hash": "\\x82e95c1ee3a98cd0646225b5ae6afc0b0229367b992df97aeb669c898657a4bb",
"timestamp": "2015-07-30T20:07:44+00:00"
},
{
"hash": "\\x82e95c1ee3a98cd0646225b5ae6afc0b0229367b992df97aeb669c898657a4bb",
"number": 4999,
"parent_hash": "\\x875c9a0f8215258c3b17fd5af5127541121cca1f594515aae4fbe5a7fbef8389",
"timestamp": "2015-07-30T20:07:36+00:00"
}
]
}
```

## Limitations/Ideas/Disclaimers

Most of these are fairly easy to address:

* queries must finish within `GRAPH_SQL_STATEMENT_TIMEOUT` (unlimited by
default)
* queries are always executed at the subgraph head. It would be easy to add
a way to specify a block at which the query should be executed
* the interface right now pretty much exposes the raw SQL schema for a
subgraph, though system columns like `vid` or `block_range` are made
inaccessible.
* it is not possible to join across subgraphs, though it would be possible
to add that. Implenting that would require some additional plumbing that
hides the effects of sharding.
* JSON as the response format is pretty terrible, and we should change that
to something that isn't so inefficient
* the response contains data that's pretty raw; as the example shows,
binary data uses Postgres' notation for hex strings
* because of how broad the supported SQL is, it is pretty easy to issue
queries that take a very long time. It will therefore not be hard to take
down a `graph-node`, especially when no query timeout is set

Most importantly: while quite a bit of effort has been put into making this
interface safe, in particular, making sure it's not possible to write
through this interface, there's no guarantee that this works without bugs.
10 changes: 8 additions & 2 deletions graph/src/components/graphql.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::data::query::QueryResults;
use crate::data::query::{Query, QueryTarget};
use crate::data::query::{QueryResults, SqlQueryReq};
use crate::data::store::SqlQueryObject;
use crate::data::subscription::{Subscription, SubscriptionError, SubscriptionResult};
use crate::prelude::DeploymentHash;
use crate::prelude::{DeploymentHash, QueryExecutionError};

use async_trait::async_trait;
use futures01::Future;
Expand Down Expand Up @@ -41,6 +42,11 @@ pub trait GraphQlRunner: Send + Sync + 'static {
) -> Result<SubscriptionResult, SubscriptionError>;

fn metrics(&self) -> Arc<dyn GraphQLMetrics>;

async fn run_sql_query(
self: Arc<Self>,
req: SqlQueryReq,
) -> Result<Vec<SqlQueryObject>, QueryExecutionError>;
}

pub trait GraphQLMetrics: Send + Sync + 'static {
Expand Down
4 changes: 3 additions & 1 deletion graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::components::transaction_receipt;
use crate::components::versions::ApiVersion;
use crate::data::query::Trace;
use crate::data::store::ethereum::call;
use crate::data::store::QueryObject;
use crate::data::store::{QueryObject, SqlQueryObject};
use crate::data::subgraph::{status, DeploymentFeatures};
use crate::data::{query::QueryTarget, subgraph::schema::*};
use crate::prelude::{DeploymentState, NodeId, QueryExecutionError, SubgraphName};
Expand Down Expand Up @@ -643,6 +643,8 @@ pub trait QueryStore: Send + Sync {
query: EntityQuery,
) -> Result<(Vec<QueryObject>, Trace), QueryExecutionError>;

fn execute_sql(&self, sql: &str) -> Result<Vec<SqlQueryObject>, QueryExecutionError>;

async fn is_deployment_synced(&self) -> Result<bool, Error>;

async fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError>;
Expand Down
3 changes: 3 additions & 0 deletions graph/src/data/query/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub enum QueryExecutionError {
InvalidSubgraphManifest,
ResultTooBig(usize, usize),
DeploymentNotFound(String),
SqlError(String),
IdMissing,
IdNotString,
ConstraintViolation(String),
Expand Down Expand Up @@ -139,6 +140,7 @@ impl QueryExecutionError {
| IdMissing
| IdNotString
| ConstraintViolation(_) => false,
SqlError(_) => false,
}
}
}
Expand Down Expand Up @@ -289,6 +291,7 @@ impl fmt::Display for QueryExecutionError {
IdMissing => write!(f, "entity is missing an `id` attribute"),
IdNotString => write!(f, "entity `id` attribute is not a string"),
ConstraintViolation(msg) => write!(f, "internal constraint violated: {}", msg),
SqlError(e) => write!(f, "sql error: {}", e),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion graph/src/data/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ mod trace;

pub use self::cache_status::CacheStatus;
pub use self::error::{QueryError, QueryExecutionError};
pub use self::query::{Query, QueryTarget, QueryVariables};
pub use self::query::{Query, QueryTarget, QueryVariables, SqlQueryMode, SqlQueryReq};
pub use self::result::{LatestBlockInfo, QueryResult, QueryResults};
pub use self::trace::Trace;
26 changes: 25 additions & 1 deletion graph/src/data/query/query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use serde::de::Deserializer;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::convert::TryFrom;
use std::hash::{DefaultHasher, Hash as _, Hasher as _};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

Expand Down Expand Up @@ -165,3 +166,26 @@ impl Query {
}
}
}

#[derive(Copy, Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum SqlQueryMode {
Data,
Info,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SqlQueryReq {
pub deployment: DeploymentHash,
pub query: String,
pub mode: SqlQueryMode,
}

impl SqlQueryReq {
pub fn query_hash(&self) -> u64 {
let mut hasher = DefaultHasher::new();
self.deployment.hash(&mut hasher);
self.query.hash(&mut hasher);
hasher.finish()
}
}
4 changes: 4 additions & 0 deletions graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,10 @@ pub struct QueryObject {
pub entity: r::Object,
}

/// An object that is returned from a SQL query. It wraps an `r::Value`
#[derive(CacheWeight, Serialize)]
pub struct SqlQueryObject(pub r::Value);

impl CacheWeight for QueryObject {
fn indirect_weight(&self) -> usize {
self.parent.indirect_weight() + self.entity.indirect_weight()
Expand Down
43 changes: 42 additions & 1 deletion graphql/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use crate::metrics::GraphQLMetrics;
use crate::prelude::{QueryExecutionOptions, StoreResolver, SubscriptionExecutionOptions};
use crate::query::execute_query;
use crate::subscription::execute_prepared_subscription;
use graph::data::query::{CacheStatus, SqlQueryReq};
use graph::data::store::SqlQueryObject;
use graph::futures03::future;
use graph::prelude::MetricsRegistry;
use graph::prelude::{ApiVersion, MetricsRegistry};
use graph::{
components::store::SubscriptionManager,
prelude::{
Expand Down Expand Up @@ -312,4 +314,43 @@ where
fn metrics(&self) -> Arc<dyn GraphQLMetricsTrait> {
self.graphql_metrics.clone()
}

async fn run_sql_query(
self: Arc<Self>,
req: SqlQueryReq,
) -> Result<Vec<SqlQueryObject>, QueryExecutionError> {
let store = self
.store
.query_store(
QueryTarget::Deployment(req.deployment.clone(), ApiVersion::default()),
false,
)
.await?;

let query_hash = req.query_hash();
self.load_manager
.decide(
&store.wait_stats().map_err(QueryExecutionError::from)?,
store.shard(),
store.deployment_id(),
query_hash,
&req.query,
)
.to_result()?;

let query_start = Instant::now();
let result = store
.execute_sql(&req.query)
.map_err(|e| QueryExecutionError::from(e));

self.load_manager.record_work(
store.shard(),
store.deployment_id(),
query_hash,
query_start.elapsed(),
CacheStatus::Miss,
);

result
}
}
Loading