Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream SyncState response #751

Open
wants to merge 13 commits into
base: tomyrd-sync-component-alt
Choose a base branch
from

Conversation

TomasArrachea
Copy link
Collaborator

@TomasArrachea TomasArrachea commented Feb 20, 2025

This PR updates the client's sync command to adapt to the new Streaming response introduced in 0xPolygonMiden/miden-node#685.

The client is no longer needed to execute the sync request multiple times until the chain tip is reached, it just needs to read the multiple updates from the response Stream.

@TomasArrachea TomasArrachea force-pushed the tomasarrachea-sync-component-with-streaming branch from 11fa97f to 986fb53 Compare February 21, 2025 17:57
@TomasArrachea TomasArrachea force-pushed the tomasarrachea-sync-component-with-streaming branch from 65140f6 to fee56fe Compare February 21, 2025 18:51
Copy link
Collaborator

@tomyrd tomyrd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a full review, I'll wait for the merge/rebase so the changes are a bit more clear.

Comment on lines 155 to 161
while let Some(res) = stream
.message()
.await
.map_err(|e| ClientError::RpcError(ConnectionError(e.message().to_string())))?
{
self.sync_state_step(
res.try_into().map_err(ClientError::RpcError)?,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not sure about having this logic in the client side vs the rpc side. We even have to transform between the rpc and domain types here which is something usually done inside the rpc client.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the problem is that moving this logic to the rpc side would mean that the rpc_api.sync_state call will need to wait until the stream is closed and return all the collected sync responses. Those would be processed on the client side once the stream is closed. I'm not convinced if that is the best way to go, doesn't seem to go along with the streaming logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a good solution would be that the rpc side returns an iterator that lazily maps the response into a domain type, but not sure if that is possible

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with both of you! I think a wrapper over the stream that converts the responses appropriatly could work out well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented this wrapper as SyncStateStream on 9dcdcac

Comment on lines 169 to 176
// New nullfiers should be added for new untracked notes
nullifiers_tags.append(
&mut state_sync_update
.note_updates
.updated_input_notes()
.map(|note| get_nullifier_prefix(&note.nullifier()))
.collect::<Vec<_>>(),
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove the declaration of the nullifier_tags at the start of this function and directly build the array here with the values from the note_updates

}
let transaction_updates = TransactionUpdates::new(transactions, discarded_transactions);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove transactions if we don't use it:

Suggested change
let transaction_updates = TransactionUpdates::new(transactions, discarded_transactions);
let transaction_updates = TransactionUpdates::new(vec![], discarded_transactions);

@@ -322,17 +331,14 @@ impl StateSync {
&mut self,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove the The transaction updates might include: part of the doc comment above as we don't update the transactions here anymore. I would move each bullet point to the section in the code where we make each update.

@tomyrd tomyrd mentioned this pull request Feb 24, 2025
5 tasks
@TomasArrachea TomasArrachea force-pushed the tomasarrachea-sync-component-with-streaming branch from c2b31ab to e6dc2f1 Compare February 24, 2025 20:15
@TomasArrachea TomasArrachea changed the title WIP: stream SyncState response feat: stream SyncState response Feb 24, 2025
@TomasArrachea TomasArrachea force-pushed the tomasarrachea-sync-component-with-streaming branch from 97ee6db to cf5d48b Compare February 24, 2025 20:47
@TomasArrachea TomasArrachea marked this pull request as ready for review February 24, 2025 21:05
Copy link
Collaborator

@tomyrd tomyrd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left some minor comments

Comment on lines 151 to 154
let unspent_nullifiers = unspent_input_notes
.iter()
.map(InputNoteRecord::nullifier)
.chain(unspent_output_notes.iter().filter_map(OutputNoteRecord::nullifier));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't needed. The unspent nullifiers should already be included in the state_sync_update.note_updates. (This also means that we could remove the .clone() from these vecs)

Comment on lines 167 to 168
// * Transactions that were committed. Some of these might be tracked by the client and need
// to be marked as committed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the transaction update is note done here. This part only deals with discarded transactions. I would move this to line 214 where we add committed transactions.

@@ -332,25 +338,13 @@ impl StateSync {
note_updates.insert_updates(Some(public_note), None);
}

//
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an empty comment here

Copy link
Collaborator

@igamigo igamigo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Leaving some comments for now

CHANGELOG.md Outdated
@@ -10,6 +10,7 @@

* Add check for empty pay to id notes (#714).
* [BREAKING] Refactored authentication out of the `Client` and added new separate authenticators (#718).
* Read Sync State response from stream (#751).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's write this in past tense

Comment on lines 142 to 173
// Check for new nullifiers for input notes that were updated
let nullifiers_tags: Vec<u16> = state_sync_update
.note_updates
.updated_input_notes()
.map(|note| note.nullifier().prefix())
.collect();

let new_nullifiers = self
.rpc_api
.check_nullifiers_by_prefix(&nullifiers_tags, current_block_num)
.await?;

// Process nullifiers and track the updates of local tracked transactions that were
// discarded because the notes that they were processing were nullified by an
// another transaction.
let mut discarded_transactions = vec![];

for (nullifier, block_num) in new_nullifiers {
let nullifier_update = NullifierUpdate { nullifier, block_num };

let discarded_transaction =
state_sync_update.note_updates.apply_nullifiers_state_transitions(
&nullifier_update,
state_sync_update.transaction_updates.committed_transactions(),
)?;

if let Some(transaction_id) = discarded_transaction {
discarded_transactions.push(transaction_id);
}
}
let transaction_updates = TransactionUpdates::new(vec![], discarded_transactions);
state_sync_update.transaction_updates.extend(transaction_updates);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could factor this out to its own fucntion (sometihng like sync_nullifiers())

Comment on lines 155 to 161
while let Some(res) = stream
.message()
.await
.map_err(|e| ClientError::RpcError(ConnectionError(e.message().to_string())))?
{
self.sync_state_step(
res.try_into().map_err(ClientError::RpcError)?,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with both of you! I think a wrapper over the stream that converts the responses appropriatly could work out well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants