Skip to content

Commit

Permalink
fix substreams info endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Feb 25, 2025
1 parent b27502c commit b627dd3
Show file tree
Hide file tree
Showing 9 changed files with 1,390 additions and 116 deletions.
1 change: 1 addition & 0 deletions chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn main() -> Result<(), Error> {
false,
SubgraphLimit::Unlimited,
metrics,
false,
));

loop {
Expand Down
1 change: 1 addition & 0 deletions chain/substreams/examples/substreams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async fn main() -> Result<(), Error> {
false,
SubgraphLimit::Unlimited,
Arc::new(endpoint_metrics),
true,
));

let client = Arc::new(ChainClient::new_firehose(FirehoseEndpoints::for_testing(
Expand Down
1 change: 1 addition & 0 deletions graph/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fn main() {
tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
.extern_path(".sf.substreams.v1", "crate::substreams")
.extern_path(".sf.firehose.v2", "crate::firehose")
.out_dir("src/substreams_rpc")
.compile(&["proto/substreams-rpc.proto"], &["proto"])
.expect("Failed to compile Substreams RPC proto(s)");
Expand Down
157 changes: 87 additions & 70 deletions graph/proto/substreams-rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,46 @@ package sf.substreams.rpc.v2;

import "google/protobuf/any.proto";
import "substreams.proto";
import "firehose.proto";

service Stream {
rpc Blocks(Request) returns (stream Response);
service EndpointInfo {
rpc Info(sf.firehose.v2.InfoRequest) returns (sf.firehose.v2.InfoResponse);
}

service Stream { rpc Blocks(Request) returns (stream Response); }

message Request {
int64 start_block_num = 1;
string start_cursor = 2;
uint64 stop_block_num = 3;

// With final_block_only, you only receive blocks that are irreversible:
// 'final_block_height' will be equal to current block and no 'undo_signal' will ever be sent
// 'final_block_height' will be equal to current block and no 'undo_signal'
// will ever be sent
bool final_blocks_only = 4;

// Substreams has two mode when executing your module(s) either development mode or production
// mode. Development and production modes impact the execution of Substreams, important aspects
// of execution include:
// Substreams has two mode when executing your module(s) either development
// mode or production mode. Development and production modes impact the
// execution of Substreams, important aspects of execution include:
// * The time required to reach the first byte.
// * The speed that large ranges get executed.
// * The module logs and outputs sent back to the client.
//
// By default, the engine runs in developer mode, with richer and deeper output. Differences
// between production and development modes include:
// * Forward parallel execution is enabled in production mode and disabled in development mode
// * The time required to reach the first byte in development mode is faster than in production mode.
// By default, the engine runs in developer mode, with richer and deeper
// output. Differences between production and development modes include:
// * Forward parallel execution is enabled in production mode and disabled in
// development mode
// * The time required to reach the first byte in development mode is faster
// than in production mode.
//
// Specific attributes of development mode include:
// * The client will receive all of the executed module's logs.
// * It's possible to request specific store snapshots in the execution tree (via `debug_initial_store_snapshot_for_modules`).
// * It's possible to request specific store snapshots in the execution tree
// (via `debug_initial_store_snapshot_for_modules`).
// * Multiple module's output is possible.
//
// With production mode`, however, you trade off functionality for high speed enabling forward
// parallel execution of module ahead of time.
// With production mode`, however, you trade off functionality for high speed
// enabling forward parallel execution of module ahead of time.
bool production_mode = 5;

string output_module = 6;
Expand All @@ -47,23 +54,24 @@ message Request {
repeated string debug_initial_store_snapshot_for_modules = 10;
}


message Response {
oneof message {
SessionInit session = 1; // Always sent first
ModulesProgress progress = 2; // Progress of data preparation, before sending in the stream of `data` events.
SessionInit session = 1; // Always sent first
ModulesProgress progress = 2; // Progress of data preparation, before
// sending in the stream of `data` events.
BlockScopedData block_scoped_data = 3;
BlockUndoSignal block_undo_signal = 4;
Error fatal_error = 5;

// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
// Available only in developer mode, and only if
// `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotData debug_snapshot_data = 10;
// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
// Available only in developer mode, and only if
// `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotComplete debug_snapshot_complete = 11;
}
}


// BlockUndoSignal informs you that every bit of data
// with a block number above 'last_valid_block' has been reverted
// on-chain. Delete that data and restart from 'last_valid_cursor'
Expand All @@ -84,16 +92,14 @@ message BlockScopedData {
repeated StoreModuleOutput debug_store_outputs = 11;
}

message SessionInit {
message SessionInit {
string trace_id = 1;
uint64 resolved_start_block = 2;
uint64 linear_handoff_block = 3;
uint64 max_parallel_workers = 4;
}

message InitialSnapshotComplete {
string cursor = 1;
}
message InitialSnapshotComplete { string cursor = 1; }

message InitialSnapshotData {
string module_name = 1;
Expand All @@ -110,9 +116,9 @@ message MapModuleOutput {
}

// StoreModuleOutput are produced for store modules in development mode.
// It is not possible to retrieve store models in production, with parallelization
// enabled. If you need the deltas directly, write a pass through mapper module
// that will get them down to you.
// It is not possible to retrieve store models in production, with
// parallelization enabled. If you need the deltas directly, write a pass
// through mapper module that will get them down to you.
message StoreModuleOutput {
string name = 1;
repeated StoreDelta debug_store_deltas = 2;
Expand All @@ -121,16 +127,18 @@ message StoreModuleOutput {

message OutputDebugInfo {
repeated string logs = 1;
// LogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
// LogsTruncated is a flag that tells you if you received all the logs or if
// they were truncated because you logged too much (fixed limit currently is
// set to 128 KiB).
bool logs_truncated = 2;
bool cached = 3;
}

// ModulesProgress is a message that is sent every 500ms
message ModulesProgress {
// previously: repeated ModuleProgress modules = 1;
// these previous `modules` messages were sent in bursts and are not sent anymore.
// these previous `modules` messages were sent in bursts and are not sent
// anymore.
reserved 1;
// List of jobs running on tier2 servers
repeated Job running_jobs = 2;
Expand All @@ -147,73 +155,82 @@ message ProcessedBytes {
uint64 total_bytes_written = 2;
}


message Error {
string module = 1;
string reason = 2;
repeated string logs = 3;
// FailureLogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
// FailureLogsTruncated is a flag that tells you if you received all the logs
// or if they were truncated because you logged too much (fixed limit
// currently is set to 128 KiB).
bool logs_truncated = 4;
}


message Job {
uint32 stage = 1;
uint64 start_block = 2;
uint64 stop_block = 3;
uint64 processed_blocks = 4;
uint64 duration_ms = 5;
uint32 stage = 1;
uint64 start_block = 2;
uint64 stop_block = 3;
uint64 processed_blocks = 4;
uint64 duration_ms = 5;
}

message Stage {
repeated string modules = 1;
repeated BlockRange completed_ranges = 2;
repeated string modules = 1;
repeated BlockRange completed_ranges = 2;
}

// ModuleStats gathers metrics and statistics from each module, running on tier1 or tier2
// All the 'count' and 'time_ms' values may include duplicate for each stage going over that module
// ModuleStats gathers metrics and statistics from each module, running on tier1
// or tier2 All the 'count' and 'time_ms' values may include duplicate for each
// stage going over that module
message ModuleStats {
// name of the module
string name = 1;
// name of the module
string name = 1;

// total_processed_blocks is the sum of blocks sent to that module code
uint64 total_processed_block_count = 2;
// total_processing_time_ms is the sum of all time spent running that module code
uint64 total_processing_time_ms = 3;
// total_processed_blocks is the sum of blocks sent to that module code
uint64 total_processed_block_count = 2;
// total_processing_time_ms is the sum of all time spent running that module
// code
uint64 total_processing_time_ms = 3;

//// external_calls are chain-specific intrinsics, like "Ethereum RPC calls".
repeated ExternalCallMetric external_call_metrics = 4;
//// external_calls are chain-specific intrinsics, like "Ethereum RPC calls".
repeated ExternalCallMetric external_call_metrics = 4;

// total_store_operation_time_ms is the sum of all time spent running that module code waiting for a store operation (ex: read, write, delete...)
uint64 total_store_operation_time_ms = 5;
// total_store_read_count is the sum of all the store Read operations called from that module code
uint64 total_store_read_count = 6;
// total_store_operation_time_ms is the sum of all time spent running that
// module code waiting for a store operation (ex: read, write, delete...)
uint64 total_store_operation_time_ms = 5;
// total_store_read_count is the sum of all the store Read operations called
// from that module code
uint64 total_store_read_count = 6;

// total_store_write_count is the sum of all store Write operations called from that module code (store-only)
uint64 total_store_write_count = 10;
// total_store_write_count is the sum of all store Write operations called
// from that module code (store-only)
uint64 total_store_write_count = 10;

// total_store_deleteprefix_count is the sum of all store DeletePrefix operations called from that module code (store-only)
// note that DeletePrefix can be a costly operation on large stores
uint64 total_store_deleteprefix_count = 11;
// total_store_deleteprefix_count is the sum of all store DeletePrefix
// operations called from that module code (store-only) note that DeletePrefix
// can be a costly operation on large stores
uint64 total_store_deleteprefix_count = 11;

// store_size_bytes is the uncompressed size of the full KV store for that module, from the last 'merge' operation (store-only)
uint64 store_size_bytes = 12;
// store_size_bytes is the uncompressed size of the full KV store for that
// module, from the last 'merge' operation (store-only)
uint64 store_size_bytes = 12;

// total_store_merging_time_ms is the time spent merging partial stores into a full KV store for that module (store-only)
uint64 total_store_merging_time_ms = 13;
// total_store_merging_time_ms is the time spent merging partial stores into a
// full KV store for that module (store-only)
uint64 total_store_merging_time_ms = 13;

// store_currently_merging is true if there is a merging operation (partial store to full KV store) on the way.
bool store_currently_merging = 14;
// store_currently_merging is true if there is a merging operation (partial
// store to full KV store) on the way.
bool store_currently_merging = 14;

// highest_contiguous_block is the highest block in the highest merged full KV store of that module (store-only)
uint64 highest_contiguous_block = 15;
// highest_contiguous_block is the highest block in the highest merged full KV
// store of that module (store-only)
uint64 highest_contiguous_block = 15;
}

message ExternalCallMetric {
string name = 1;
uint64 count = 2;
uint64 time_ms = 3;
string name = 1;
uint64 count = 2;
uint64 time_ms = 3;
}

message StoreDelta {
Expand Down
Loading

0 comments on commit b627dd3

Please sign in to comment.