-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[da-vinci] Enable compatibility of DVRT for BLOB transfer #1280
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for getting on this so quick!
Can we also add an integration test in DaVinciClient
test to validate that this does work with blob transfer?
|
||
// Reset the mock to clear previous interactions | ||
reset(storageEngine); | ||
|
||
// Execute the onRecovery method again to test the case where the classHash file exists | ||
when(storageEngine.getIterator(partitionNumber)).thenReturn(iterator); | ||
recordTransformer.onRecovery(storageEngine, partitionNumber, compressor); | ||
verify(storageEngine, never()).clearPartitionOffset(partitionNumber); | ||
verify(storageEngine, times(1)).getIterator(partitionNumber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we set up the mocks so that this passes?
public Integer getTransformerClassHash() { | ||
return partitionState.getTransformerClassHash(); | ||
} | ||
|
||
public void setTransformerClassHash(Integer classHash) { | ||
this.partitionState.transformerClassHash = classHash; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "OffsetRecord{" + "localVersionTopicOffset=" + getLocalVersionTopicOffset() + ", upstreamOffset=" | ||
+ getPartitionUpstreamOffsetString() + ", leaderTopic=" + getLeaderTopic() + ", offsetLag=" + getOffsetLag() | ||
+ ", eventTimeEpochMs=" + getMaxMessageTimeInMs() + ", latestProducerProcessingTimeInMs=" | ||
+ getLatestProducerProcessingTimeInMs() + ", isEndOfPushReceived=" + isEndOfPushReceived() + ", databaseInfo=" | ||
+ getDatabaseInfo() + ", realTimeProducerState=" + getRealTimeProducerState() + '}'; | ||
+ getDatabaseInfo() + ", realTimeProducerState=" + getRealTimeProducerState() + ", transformerClassHash=" | ||
+ getTransformerClassHash() + '}'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we make it explicit that this belongs to the record transformer? i.e. recordTransformerClassHash
"name": "transformerClassHash", | ||
"doc": "An integer hash code of the DaVinci record transformer for compatibility checks during BLOB transfer", | ||
"type": ["null", "int"], | ||
"default": null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: DaVinci record transformer -> DaVinciRecordTransformer
This will also be utilized when blob transfer isn't enabled. Can we change the phrasing to "during bootstrapping?"
Summary
Added logic to store the DaVinci record transformer class hashcode as a field in OffsetRecord and comparing these hashcodes in the transformer utility to ensure compatibility of new transformer with the current one persisted by the storage engine
How was this PR tested?
Changes to current record transformer unit tests have been made to reflect this new logic
Does this PR introduce any user-facing changes?