-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: stream SyncState response #751
base: tomyrd-sync-component-alt
Are you sure you want to change the base?
feat: stream SyncState response #751
Conversation
11fa97f
to
986fb53
Compare
65140f6
to
fee56fe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a full review, I'll wait for the merge/rebase so the changes are a bit more clear.
while let Some(res) = stream | ||
.message() | ||
.await | ||
.map_err(|e| ClientError::RpcError(ConnectionError(e.message().to_string())))? | ||
{ | ||
self.sync_state_step( | ||
res.try_into().map_err(ClientError::RpcError)?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still not sure about having this logic in the client side vs the rpc side. We even have to transform between the rpc and domain types here which is something usually done inside the rpc client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the problem is that moving this logic to the rpc side would mean that the rpc_api.sync_state
call will need to wait until the stream is closed and return all the collected sync responses. Those would be processed on the client side once the stream is closed. I'm not convinced if that is the best way to go, doesn't seem to go along with the streaming logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a good solution would be that the rpc side returns an iterator that lazily maps the response into a domain type, but not sure if that is possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with both of you! I think a wrapper over the stream that converts the responses appropriatly could work out well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented this wrapper as SyncStateStream
on 9dcdcac
// New nullfiers should be added for new untracked notes | ||
nullifiers_tags.append( | ||
&mut state_sync_update | ||
.note_updates | ||
.updated_input_notes() | ||
.map(|note| get_nullifier_prefix(¬e.nullifier())) | ||
.collect::<Vec<_>>(), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove the declaration of the nullifier_tags at the start of this function and directly build the array here with the values from the note_updates
} | ||
let transaction_updates = TransactionUpdates::new(transactions, discarded_transactions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove transactions
if we don't use it:
let transaction_updates = TransactionUpdates::new(transactions, discarded_transactions); | |
let transaction_updates = TransactionUpdates::new(vec![], discarded_transactions); |
@@ -322,17 +331,14 @@ impl StateSync { | |||
&mut self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove the The transaction updates might include:
part of the doc comment above as we don't update the transactions here anymore. I would move each bullet point to the section in the code where we make each update.
c2b31ab
to
e6dc2f1
Compare
97ee6db
to
cf5d48b
Compare
…onent-with-streaming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, left some minor comments
let unspent_nullifiers = unspent_input_notes | ||
.iter() | ||
.map(InputNoteRecord::nullifier) | ||
.chain(unspent_output_notes.iter().filter_map(OutputNoteRecord::nullifier)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this isn't needed. The unspent nullifiers should already be included in the state_sync_update.note_updates
. (This also means that we could remove the .clone() from these vecs)
// * Transactions that were committed. Some of these might be tracked by the client and need | ||
// to be marked as committed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part of the transaction update is note done here. This part only deals with discarded transactions. I would move this to line 214 where we add committed transactions.
@@ -332,25 +338,13 @@ impl StateSync { | |||
note_updates.insert_updates(Some(public_note), None); | |||
} | |||
|
|||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an empty comment here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Leaving some comments for now
CHANGELOG.md
Outdated
@@ -10,6 +10,7 @@ | |||
|
|||
* Add check for empty pay to id notes (#714). | |||
* [BREAKING] Refactored authentication out of the `Client` and added new separate authenticators (#718). | |||
* Read Sync State response from stream (#751). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Let's write this in past tense
// Check for new nullifiers for input notes that were updated | ||
let nullifiers_tags: Vec<u16> = state_sync_update | ||
.note_updates | ||
.updated_input_notes() | ||
.map(|note| note.nullifier().prefix()) | ||
.collect(); | ||
|
||
let new_nullifiers = self | ||
.rpc_api | ||
.check_nullifiers_by_prefix(&nullifiers_tags, current_block_num) | ||
.await?; | ||
|
||
// Process nullifiers and track the updates of local tracked transactions that were | ||
// discarded because the notes that they were processing were nullified by an | ||
// another transaction. | ||
let mut discarded_transactions = vec![]; | ||
|
||
for (nullifier, block_num) in new_nullifiers { | ||
let nullifier_update = NullifierUpdate { nullifier, block_num }; | ||
|
||
let discarded_transaction = | ||
state_sync_update.note_updates.apply_nullifiers_state_transitions( | ||
&nullifier_update, | ||
state_sync_update.transaction_updates.committed_transactions(), | ||
)?; | ||
|
||
if let Some(transaction_id) = discarded_transaction { | ||
discarded_transactions.push(transaction_id); | ||
} | ||
} | ||
let transaction_updates = TransactionUpdates::new(vec![], discarded_transactions); | ||
state_sync_update.transaction_updates.extend(transaction_updates); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could factor this out to its own fucntion (sometihng like sync_nullifiers()
)
while let Some(res) = stream | ||
.message() | ||
.await | ||
.map_err(|e| ClientError::RpcError(ConnectionError(e.message().to_string())))? | ||
{ | ||
self.sync_state_step( | ||
res.try_into().map_err(ClientError::RpcError)?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with both of you! I think a wrapper over the stream that converts the responses appropriatly could work out well
This PR updates the client's sync command to adapt to the new Streaming response introduced in 0xPolygonMiden/miden-node#685.
The client is no longer needed to execute the sync request multiple times until the chain tip is reached, it just needs to read the multiple updates from the response Stream.