Skip to content

Commit

Permalink
Merge branch 'main' into mcp-sse
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo authored Mar 4, 2025
2 parents 149d749 + 296162d commit 4f427cf
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 76 deletions.
5 changes: 3 additions & 2 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ impl Service {
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?;
// sometimes for some reason keys isxx empty. investigate the issue
// keys is empty when an entity is updated with StoreUpdateRecord or Member but the entity
// has never been set before. In that case, we dont know the keys
let keys = entity
.keys
.trim_end_matches(SQL_FELT_DELIMITER)
.split(SQL_FELT_DELIMITER)
.map(Felt::from_str)
.filter_map(|key| if key.is_empty() { None } else { Some(Felt::from_str(key)) })
.collect::<Result<Vec<_>, _>>()
.map_err(ParseError::FromStr)?;

Expand Down
4 changes: 4 additions & 0 deletions crates/torii/grpc/src/server/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub(crate) fn match_entity_keys(

if !clause.models.is_empty()
&& !clause.models.iter().any(|clause_model| {
if clause_model.is_empty() {
return true;
}

let (clause_namespace, clause_model) =
clause_model.split_once('-').unwrap();
// if both namespace and model are empty, we should match all.
Expand Down
12 changes: 0 additions & 12 deletions crates/torii/server/src/handlers/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,18 +659,6 @@ impl Handler for McpHandler {
async fn handle(&self, req: Request<Body>, _: IpAddr) -> Response<Body> {
let uri_path = req.uri().path();

// Handle CORS preflight requests
if req.method() == hyper::Method::OPTIONS {
return Response::builder()
.status(StatusCode::OK)
.header("Access-Control-Allow-Origin", "*")
.header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
.header("Access-Control-Allow-Headers", "Content-Type, Authorization")
.header("Access-Control-Max-Age", "86400")
.body(Body::empty())
.unwrap();
}

// Handle message endpoint (for SSE clients)
if uri_path == "/mcp/message" {
return self.handle_message_request(req).await;
Expand Down
17 changes: 7 additions & 10 deletions crates/torii/sqlite/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::{oneshot, Semaphore};
use tokio::task::JoinSet;
use tokio::time::Instant;
use tracing::{debug, error};
use tracing::{debug, error, warn};

use crate::constants::TOKENS_TABLE;
use crate::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -451,15 +451,12 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
entity_updated.updated_model = Some(entity);
entity_updated.deleted = false;

let optimistic_entity = OptimisticEntity {
id: entity_updated.id.clone(),
keys: entity_updated.keys.clone(),
event_id: entity_updated.event_id.clone(),
executed_at: entity_updated.executed_at,
created_at: entity_updated.created_at,
updated_at: entity_updated.updated_at,
updated_model: entity_updated.updated_model.clone(),
deleted: entity_updated.deleted,
if entity_updated.keys.is_empty() {
warn!(target: LOG_TARGET, "Entity has been updated without being set before. Keys are not known and non-updated values will be NULL.");
}

let optimistic_entity = unsafe {
std::mem::transmute::<EntityUpdated, OptimisticEntity>(entity_updated.clone())
};
SimpleBroker::publish(optimistic_entity);

Expand Down
66 changes: 16 additions & 50 deletions crates/torii/sqlite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ use crate::executor::{
use crate::types::Contract;
use crate::utils::utc_dt_string_from_timestamp;

type IsEventMessage = bool;
type IsStoreUpdate = bool;

pub mod cache;
pub mod constants;
pub mod erc;
Expand Down Expand Up @@ -362,13 +359,7 @@ impl Sql {
vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
))?;

self.set_entity_model(
&namespaced_name,
event_id,
(&entity_id, false),
(&entity, keys_str.is_none()),
block_timestamp,
)?;
self.set_entity_model(&namespaced_name, event_id, &entity_id, &entity, block_timestamp)?;

Ok(())
}
Expand Down Expand Up @@ -425,8 +416,8 @@ impl Sql {
self.set_entity_model(
&namespaced_name,
event_id,
(&entity_id, true),
(&entity, false),
&format!("event:{}", entity_id),
&entity,
block_timestamp,
)?;

Expand Down Expand Up @@ -612,35 +603,28 @@ impl Sql {
&mut self,
model_name: &str,
event_id: &str,
entity_id: (&str, IsEventMessage),
entity: (&Ty, IsStoreUpdate),
entity_id: &str,
entity: &Ty,
block_timestamp: u64,
) -> Result<()> {
let (entity_id, is_event_message) = entity_id;
let (entity, is_store_update) = entity;

let mut columns = vec![
"internal_id".to_string(),
"internal_event_id".to_string(),
"internal_executed_at".to_string(),
"internal_updated_at".to_string(),
if is_event_message {
if entity_id.starts_with("event:") {
"internal_event_message_id".to_string()
} else {
"internal_entity_id".to_string()
},
];

let mut arguments = vec![
Argument::String(if is_event_message {
"event:".to_string() + entity_id
} else {
entity_id.to_string()
}),
Argument::String(entity_id.to_string()),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
Argument::String(chrono::Utc::now().to_rfc3339()),
Argument::String(entity_id.to_string()),
Argument::String(entity_id.trim_start_matches("event:").to_string()),
];

fn collect_members(
Expand Down Expand Up @@ -706,33 +690,15 @@ impl Sql {
// Collect all columns and arguments recursively
collect_members("", entity, &mut columns, &mut arguments)?;

// Build the final query
// Build the final query - if an entity is updated, we insert the entity and default to NULL
// for non updated values.
let placeholders: Vec<&str> = arguments.iter().map(|_| "?").collect();
let statement = if is_store_update {
arguments.push(Argument::String(if is_event_message {
"event:".to_string() + entity_id
} else {
entity_id.to_string()
}));

format!(
"UPDATE [{}] SET {} WHERE internal_id = ?",
model_name,
columns
.iter()
.zip(placeholders.iter())
.map(|(column, placeholder)| format!("{} = {}", column, placeholder))
.collect::<Vec<String>>()
.join(", ")
)
} else {
format!(
"INSERT OR REPLACE INTO [{}] ({}) VALUES ({})",
model_name,
columns.join(","),
placeholders.join(",")
)
};
let statement = format!(
"INSERT OR REPLACE INTO [{}] ({}) VALUES ({})",
model_name,
columns.join(","),
placeholders.join(",")
);

// Execute the single query
self.executor.send(QueryMessage::other(statement, arguments))?;
Expand Down
4 changes: 2 additions & 2 deletions examples/spawn-and-move/manifest_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -3768,10 +3768,10 @@
]
},
{
"class_hash": "0x7b56130dc5843f68cf1624f8a6df7c9f21c37afbc6b02a5a374c8e4578cd068",
"class_hash": "0x342e7234ef0066eaf28438eb8e2013a711548ee1b8f4b286f6e2dad44fc5a0b",
"contract_name": "ERC1155Token",
"instance_name": "Rewards",
"address": "0x570ef5185f133d8075af38731091bf2f986f4b6955f615579862f4a4ef79c34",
"address": "0x357e29a60b3c5b809695dc8b2e0d43f07be8e7654dd1b7378989dd6c51bb798",
"abi": [
{
"type": "struct",
Expand Down

0 comments on commit 4f427cf

Please sign in to comment.