-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMQP input fixes and improvements #20270
base: master
Are you sure you want to change the base?
Conversation
This avoids leaking an executor for each input restart.
Use a loading cache with a 1 second expiry, so we don't have to manage a scheduled task.
Expose consumer metrics into the input's local registry.
Create a fixed size thread pool based on the "parallelQueues" setting. Previously, the client used "num processors" as thread pool size.
Hi @bernd! The changes make a lot of sense. Thanks for working on this! Besides some general smoke testing, I tested one case in particular not to cause a regression:
This bug was a motivation for previous work in #15401 and was reported via #12447 (comment). According to my local testing, it still seems fine after your refactoring. An initial connection is being retried and succeeds once RabbitMQ is up. |
This change avoids creating an additional connection every 5 seconds (or whatever the recovery interval has been set to). This could happen e.g. on queue creation errors.
Before: Could not launch AMQP consumer: (null) After (example): Error while opening new AMQP connection: (channel error; protocol method: #method<channel.close> (reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue "log-messages' in vhost'/': received none but current is the value '10' of type 'long', class-id=50, method-id=10))
@bernd Thanks again for working on this! I now added a few more changes to this PR:
I tested everything and think this is ready for review. |
I am experimenting with some fixes and improvements for our AMQP input.
BlockedListeter
to AMQP connections so we can log when the client is getting blocked by the broker.