Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix substreams info endpoint #5851

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn main() -> Result<(), Error> {
false,
SubgraphLimit::Unlimited,
metrics,
false,
));

loop {
Expand Down
1 change: 1 addition & 0 deletions chain/substreams/examples/substreams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async fn main() -> Result<(), Error> {
false,
SubgraphLimit::Unlimited,
Arc::new(endpoint_metrics),
true,
));

let client = Arc::new(ChainClient::new_firehose(FirehoseEndpoints::for_testing(
Expand Down
1 change: 1 addition & 0 deletions graph/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fn main() {
tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
.extern_path(".sf.substreams.v1", "crate::substreams")
.extern_path(".sf.firehose.v2", "crate::firehose")
.out_dir("src/substreams_rpc")
.compile(&["proto/substreams-rpc.proto"], &["proto"])
.expect("Failed to compile Substreams RPC proto(s)");
Expand Down
157 changes: 87 additions & 70 deletions graph/proto/substreams-rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,46 @@ package sf.substreams.rpc.v2;

import "google/protobuf/any.proto";
import "substreams.proto";
import "firehose.proto";

service Stream {
rpc Blocks(Request) returns (stream Response);
service EndpointInfo {
rpc Info(sf.firehose.v2.InfoRequest) returns (sf.firehose.v2.InfoResponse);
}

service Stream { rpc Blocks(Request) returns (stream Response); }

message Request {
int64 start_block_num = 1;
string start_cursor = 2;
uint64 stop_block_num = 3;

// With final_block_only, you only receive blocks that are irreversible:
// 'final_block_height' will be equal to current block and no 'undo_signal' will ever be sent
// 'final_block_height' will be equal to current block and no 'undo_signal'
// will ever be sent
bool final_blocks_only = 4;

// Substreams has two mode when executing your module(s) either development mode or production
// mode. Development and production modes impact the execution of Substreams, important aspects
// of execution include:
// Substreams has two mode when executing your module(s) either development
// mode or production mode. Development and production modes impact the
// execution of Substreams, important aspects of execution include:
// * The time required to reach the first byte.
// * The speed that large ranges get executed.
// * The module logs and outputs sent back to the client.
//
// By default, the engine runs in developer mode, with richer and deeper output. Differences
// between production and development modes include:
// * Forward parallel execution is enabled in production mode and disabled in development mode
// * The time required to reach the first byte in development mode is faster than in production mode.
// By default, the engine runs in developer mode, with richer and deeper
// output. Differences between production and development modes include:
// * Forward parallel execution is enabled in production mode and disabled in
// development mode
// * The time required to reach the first byte in development mode is faster
// than in production mode.
//
// Specific attributes of development mode include:
// * The client will receive all of the executed module's logs.
// * It's possible to request specific store snapshots in the execution tree (via `debug_initial_store_snapshot_for_modules`).
// * It's possible to request specific store snapshots in the execution tree
// (via `debug_initial_store_snapshot_for_modules`).
// * Multiple module's output is possible.
//
// With production mode`, however, you trade off functionality for high speed enabling forward
// parallel execution of module ahead of time.
// With production mode`, however, you trade off functionality for high speed
// enabling forward parallel execution of module ahead of time.
bool production_mode = 5;

string output_module = 6;
Expand All @@ -47,23 +54,24 @@ message Request {
repeated string debug_initial_store_snapshot_for_modules = 10;
}


message Response {
oneof message {
SessionInit session = 1; // Always sent first
ModulesProgress progress = 2; // Progress of data preparation, before sending in the stream of `data` events.
SessionInit session = 1; // Always sent first
ModulesProgress progress = 2; // Progress of data preparation, before
// sending in the stream of `data` events.
BlockScopedData block_scoped_data = 3;
BlockUndoSignal block_undo_signal = 4;
Error fatal_error = 5;

// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
// Available only in developer mode, and only if
// `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotData debug_snapshot_data = 10;
// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
// Available only in developer mode, and only if
// `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotComplete debug_snapshot_complete = 11;
}
}


// BlockUndoSignal informs you that every bit of data
// with a block number above 'last_valid_block' has been reverted
// on-chain. Delete that data and restart from 'last_valid_cursor'
Expand All @@ -84,16 +92,14 @@ message BlockScopedData {
repeated StoreModuleOutput debug_store_outputs = 11;
}

message SessionInit {
message SessionInit {
string trace_id = 1;
uint64 resolved_start_block = 2;
uint64 linear_handoff_block = 3;
uint64 max_parallel_workers = 4;
}

message InitialSnapshotComplete {
string cursor = 1;
}
message InitialSnapshotComplete { string cursor = 1; }

message InitialSnapshotData {
string module_name = 1;
Expand All @@ -110,9 +116,9 @@ message MapModuleOutput {
}

// StoreModuleOutput are produced for store modules in development mode.
// It is not possible to retrieve store models in production, with parallelization
// enabled. If you need the deltas directly, write a pass through mapper module
// that will get them down to you.
// It is not possible to retrieve store models in production, with
// parallelization enabled. If you need the deltas directly, write a pass
// through mapper module that will get them down to you.
message StoreModuleOutput {
string name = 1;
repeated StoreDelta debug_store_deltas = 2;
Expand All @@ -121,16 +127,18 @@ message StoreModuleOutput {

message OutputDebugInfo {
repeated string logs = 1;
// LogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
// LogsTruncated is a flag that tells you if you received all the logs or if
// they were truncated because you logged too much (fixed limit currently is
// set to 128 KiB).
bool logs_truncated = 2;
bool cached = 3;
}

// ModulesProgress is a message that is sent every 500ms
message ModulesProgress {
// previously: repeated ModuleProgress modules = 1;
// these previous `modules` messages were sent in bursts and are not sent anymore.
// these previous `modules` messages were sent in bursts and are not sent
// anymore.
reserved 1;
// List of jobs running on tier2 servers
repeated Job running_jobs = 2;
Expand All @@ -147,73 +155,82 @@ message ProcessedBytes {
uint64 total_bytes_written = 2;
}


message Error {
string module = 1;
string reason = 2;
repeated string logs = 3;
// FailureLogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
// FailureLogsTruncated is a flag that tells you if you received all the logs
// or if they were truncated because you logged too much (fixed limit
// currently is set to 128 KiB).
bool logs_truncated = 4;
}


message Job {
uint32 stage = 1;
uint64 start_block = 2;
uint64 stop_block = 3;
uint64 processed_blocks = 4;
uint64 duration_ms = 5;
uint32 stage = 1;
uint64 start_block = 2;
uint64 stop_block = 3;
uint64 processed_blocks = 4;
uint64 duration_ms = 5;
}

message Stage {
repeated string modules = 1;
repeated BlockRange completed_ranges = 2;
repeated string modules = 1;
repeated BlockRange completed_ranges = 2;
}

// ModuleStats gathers metrics and statistics from each module, running on tier1 or tier2
// All the 'count' and 'time_ms' values may include duplicate for each stage going over that module
// ModuleStats gathers metrics and statistics from each module, running on tier1
// or tier2 All the 'count' and 'time_ms' values may include duplicate for each
// stage going over that module
message ModuleStats {
// name of the module
string name = 1;
// name of the module
string name = 1;

// total_processed_blocks is the sum of blocks sent to that module code
uint64 total_processed_block_count = 2;
// total_processing_time_ms is the sum of all time spent running that module code
uint64 total_processing_time_ms = 3;
// total_processed_blocks is the sum of blocks sent to that module code
uint64 total_processed_block_count = 2;
// total_processing_time_ms is the sum of all time spent running that module
// code
uint64 total_processing_time_ms = 3;

//// external_calls are chain-specific intrinsics, like "Ethereum RPC calls".
repeated ExternalCallMetric external_call_metrics = 4;
//// external_calls are chain-specific intrinsics, like "Ethereum RPC calls".
repeated ExternalCallMetric external_call_metrics = 4;

// total_store_operation_time_ms is the sum of all time spent running that module code waiting for a store operation (ex: read, write, delete...)
uint64 total_store_operation_time_ms = 5;
// total_store_read_count is the sum of all the store Read operations called from that module code
uint64 total_store_read_count = 6;
// total_store_operation_time_ms is the sum of all time spent running that
// module code waiting for a store operation (ex: read, write, delete...)
uint64 total_store_operation_time_ms = 5;
// total_store_read_count is the sum of all the store Read operations called
// from that module code
uint64 total_store_read_count = 6;

// total_store_write_count is the sum of all store Write operations called from that module code (store-only)
uint64 total_store_write_count = 10;
// total_store_write_count is the sum of all store Write operations called
// from that module code (store-only)
uint64 total_store_write_count = 10;

// total_store_deleteprefix_count is the sum of all store DeletePrefix operations called from that module code (store-only)
// note that DeletePrefix can be a costly operation on large stores
uint64 total_store_deleteprefix_count = 11;
// total_store_deleteprefix_count is the sum of all store DeletePrefix
// operations called from that module code (store-only) note that DeletePrefix
// can be a costly operation on large stores
uint64 total_store_deleteprefix_count = 11;

// store_size_bytes is the uncompressed size of the full KV store for that module, from the last 'merge' operation (store-only)
uint64 store_size_bytes = 12;
// store_size_bytes is the uncompressed size of the full KV store for that
// module, from the last 'merge' operation (store-only)
uint64 store_size_bytes = 12;

// total_store_merging_time_ms is the time spent merging partial stores into a full KV store for that module (store-only)
uint64 total_store_merging_time_ms = 13;
// total_store_merging_time_ms is the time spent merging partial stores into a
// full KV store for that module (store-only)
uint64 total_store_merging_time_ms = 13;

// store_currently_merging is true if there is a merging operation (partial store to full KV store) on the way.
bool store_currently_merging = 14;
// store_currently_merging is true if there is a merging operation (partial
// store to full KV store) on the way.
bool store_currently_merging = 14;

// highest_contiguous_block is the highest block in the highest merged full KV store of that module (store-only)
uint64 highest_contiguous_block = 15;
// highest_contiguous_block is the highest block in the highest merged full KV
// store of that module (store-only)
uint64 highest_contiguous_block = 15;
}

message ExternalCallMetric {
string name = 1;
uint64 count = 2;
uint64 time_ms = 3;
string name = 1;
uint64 count = 2;
uint64 time_ms = 3;
}

message StoreDelta {
Expand Down
Loading
Loading