Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP input fixes and improvements #20270

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open

AMQP input fixes and improvements #20270

wants to merge 19 commits into from

Conversation

bernd
Copy link
Member

@bernd bernd commented Aug 27, 2024

I am experimenting with some fixes and improvements for our AMQP input.

  • Fix executor leak by removing the "AMQP Executor" named binding and manually start and stop an executor.
  • Replace the bytes-per-second refresh runnable with a memorized supplier to avoid the scheduling overhead.
  • Expose AMQP some client metrics into our metrics registry.
  • Provide a AMQP connection name to help with debugging connection issues.
  • Add a BlockedListeter to AMQP connections so we can log when the client is getting blocked by the broker.
  • Use a custom executor service for the AMQP client that uses "parallelQueues" number of threads instead of relying on the default executor that uses a thread count of num-cpus per AMQP input.

bernd added 8 commits August 26, 2024 19:06
This avoids leaking an executor for each input restart.
Use a loading cache with a 1 second expiry, so we don't have to manage
a scheduled task.
Expose consumer metrics into the input's local registry.
Create a fixed size thread pool based on the "parallelQueues" setting.
Previously, the client used "num processors" as thread pool size.
@bernd bernd changed the title AMQP input fixes AMQP input fixes and improvements Aug 28, 2024
@boosty boosty self-requested a review January 10, 2025 13:03
@boosty
Copy link
Contributor

boosty commented Jan 28, 2025

Hi @bernd! The changes make a lot of sense. Thanks for working on this!

Besides some general smoke testing, I tested one case in particular not to cause a regression:

If Graylog stands up first - it fails to start the input and never retries

This bug was a motivation for previous work in #15401 and was reported via #12447 (comment).

According to my local testing, it still seems fine after your refactoring. An initial connection is being retried and succeeds once RabbitMQ is up.

@boosty boosty removed their request for review January 28, 2025 14:47
This change avoids creating an additional connection every 5 seconds (or whatever the recovery interval has been set to). This could happen e.g. on queue creation errors.
Before:
Could not launch AMQP consumer: (null)

After (example):
Error while opening new AMQP connection: (channel error; protocol method: #method<channel.close> (reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue "log-messages' in vhost'/': received none but current is the value '10' of type 'long', class-id=50,
method-id=10))
@boosty boosty marked this pull request as ready for review February 4, 2025 16:19
@boosty
Copy link
Contributor

boosty commented Feb 4, 2025

@bernd Thanks again for working on this!

I now added a few more changes to this PR:

  • Support passive queue declaration. This new option will allow users to create a queue in RabbitMQ using any available setting (e.g. x-max-length). This will avoid errors when Graylog tries to create a queue using incompatible settings, as described in these issues:
  • Clarify behavior of parallelQueues option in the input form. The support for a %d placeholder in the queue name was not documented, and neither was the difference in behavior when using it.
  • Prevent connection leak on initial connection failures. Prior to this change, the retry behavior could lead to accumulated connections in RabbitMQ, e.g., on a queue creation error.
  • Improve error message in IOException cases, e.g., on a queue creation error.

I tested everything and think this is ready for review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants