Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PostCommit Java Nexmark Dataflow job #33979

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Amar3tto
Copy link
Collaborator

@Amar3tto Amar3tto commented Feb 13, 2025

Fixes #30606
Successful run: https://github.com/Amar3tto/beam/actions/runs/13314417715

Reason:
Instances of ServiceLoad class are not safe for use by multiple concurrent threads. And we had singleton private static final ServiceLoader<SchemaInformationProvider> INSTANCE.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions github-actions bot added the java label Feb 13, 2025
Copy link

codecov bot commented Feb 13, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 59.09%. Comparing base (fc43c12) to head (4979d65).
Report is 60 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff            @@
##             master   #33979   +/-   ##
=========================================
  Coverage     59.08%   59.09%           
- Complexity     3238     3240    +2     
=========================================
  Files          1156     1156           
  Lines        176924   176934   +10     
  Branches       3391     3392    +1     
=========================================
+ Hits         104543   104555   +12     
+ Misses        69015    69014    -1     
+ Partials       3366     3365    -1     
Flag Coverage Δ
java 70.34% <ø> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@Amar3tto
Copy link
Collaborator Author

R: @Abacn

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@Abacn
Copy link
Contributor

Abacn commented Feb 13, 2025

shall we instead make a lock? Initialize a ServiceLoader everytime on method call sounds expensive

@Amar3tto
Copy link
Collaborator Author

shall we instead make a lock? Initialize a ServiceLoader everytime on method call sounds expensive

Changed to lazy loading with AtomicReference<List>. What do you think?

SCHEMA_INFORMATION_PROVIDERS = new AtomicReference<>();

private static List<SchemaInformationProvider> getSchemaInformationProviders() {
return SCHEMA_INFORMATION_PROVIDERS.updateAndGet(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes List only update once. I'm not sure if it is possible if iterating over "SchemaInformationProviders.INSTANCE" would return different values overtime currently

forwarding to experts who I remember reviewed / fixed concurrency issues R: @scwhittle @kennknowles thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not sure when providers will be registered. From ServiceLoader docs it caches internally but looks for new things to load each time.

It seems like a smaller change to semantics to synchronize access to the loader.
Can you add a static Object to use as a lock, annotate INSTANCE as GuardedBy it and synchronize iteration?

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

}
} catch (Exception e) {
LOG.debug("No Schema information found for type {}", outputType, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message doesn't seem fully accurate, since we are not yet done checking for schemas, right?

@Amar3tto
Copy link
Collaborator Author

Run Java_IOs_Direct PreCommit

@Amar3tto
Copy link
Collaborator Author

Run Java PreCommit

@Amar3tto Amar3tto requested a review from Abacn February 14, 2025 06:31
SCHEMA_INFORMATION_PROVIDERS = new AtomicReference<>();

private static List<SchemaInformationProvider> getSchemaInformationProviders() {
return SCHEMA_INFORMATION_PROVIDERS.updateAndGet(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not sure when providers will be registered. From ServiceLoader docs it caches internally but looks for new things to load each time.

It seems like a smaller change to semantics to synchronize access to the loader.
Can you add a static Object to use as a lock, annotate INSTANCE as GuardedBy it and synchronize iteration?

@@ -85,57 +100,58 @@ public ConvertedSchemaInformation(
public static <T> ConvertedSchemaInformation<T> getConvertedSchemaInformation(
Schema inputSchema, TypeDescriptor<T> outputType, SchemaRegistry schemaRegistry) {

ConvertedSchemaInformation<T> schemaInformation = null;
ConvertedSchemaInformation<T> schemaInformation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this definition to where assigned

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, new classes would have to be loaded / added to classpath. I think the way we use service loaders is fixed when things are launched, unless someone is getting quite fancy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

The PostCommit Java Nexmark Dataflow job is flaky
4 participants