Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support batching before processing #172

Closed
victorolinasc opened this issue Mar 30, 2020 · 12 comments
Closed

Support batching before processing #172

victorolinasc opened this issue Mar 30, 2020 · 12 comments

Comments

@victorolinasc
Copy link

Thanks you all for the amazing work on broadway and its adapters. It is really incredible!

I have a feature request: the ability of running batchers BEFORE processors (and also after).

The use case is simple: auditing every message for idempotency checks. What I want is to run a check on a batch of messages and filter the ones I've already processed. Some adapters will publish the same message for several reasons: a restart of the messaging server, a re-partitioning of topics, a at-least once delivery (which might deliver many times) and so on.

Suppose I have an audit of messages on a DB. Each message check will run a query, which will need a connection and that might hit the performance very hard. So, I'd like to run an idempotency check on a batch of messages to use a single connection for batch. Then, I'd process each message individually.

To put it graphically:

                    [producer_1]
                         |
                         |
                      [batcher_1]  <- filters incoming messages in batch
                         / \
                        /   \
                       /     \
                      /       \
               [processor_1] [processor_2]   <- process each message
                      /\     /\
                     /  \   /  \
                    /    \ /    \
                   /      x      \
                  /      / \      \
                 /      /   \      \
                /      /     \      \
           [batcher_sqs]   [batcher_s3]
                /\              \
               /  \               \
              /    \               \
             /      \                \
[batch_sqs_1] [batch_sqs_2]    [batch_s3_1] <- process each batch

Is this a "wanted" use-case?

Regards!

@josevalim
Copy link
Member

So, I'd like to run an idempotency check on a batch of messages to use a single connection for batch.

Do you want to use a single connection or do you want to do a single query?

@victorolinasc
Copy link
Author

A single query. Sorry!

@josevalim
Copy link
Member

So we could make this work trivially because the processors already receive a batch of messages. The problem is how to introduce so without adding extra complexity to users. One possible idea is to introduce a handle_messages callback that by default calls handle_message/2 for each message. But I need to think about the long term impact on the API. You can probably fork the project and make this change trivially though, while we figure things out. The benefit is that it doesn't add any new processes, it is just the same infrastructure.

@victorolinasc
Copy link
Author

That would work wonders for my use case! I agree with the added benefit of no new processes.

I believe this should be a somewhat common case. I'll wait for your thoughts on API impact. I just can't have this fork going into production unfortunately.

@josevalim
Copy link
Member

josevalim commented Mar 30, 2020 via email

@victorolinasc
Copy link
Author

All right! I'll try a PR. Can't commit with a schedule though.

I'll try to look at it this week.

Thanks!

@victorolinasc
Copy link
Author

I've given some thought about how to name this.

First I thought about prepare_messages and it is not exactly what the use case is for. We need the ability to exclude some messages from the batch (if they have been processed before). So, I thought about filter_messages (or filter_incoming_messages). That is closer, but too tied up with the use case I have in mind now and it does not indicate we could (if we wanted) add things to the messages (like a start_time tag or whatever). Next was intercept_messages which is closer to an interceptor pattern, but we would only intercept incoming messages and that might communicate badly. So, maybe in a more functional style we could use reduce_messages which covers the use cases of:

  • filtering out messages
  • adding things to messages
  • returning the batch of messages to be processed only

Implementation wise I was thinking of making the recursive private function handle_messages non-recursive, call this reduce_messages and then delegate to a do_handle_messages or something.

If this seems fine to you I'll start a PR.

@josevalim
Copy link
Member

reduce usually means coming up with an aggregated value. However, I don't think we should sweat too much about the name for now. Let's make sure it works, then we can discuss the best name. :) So I think shipping with prepare_messages is good enough as starting point.

@andrewhr
Copy link

@josevalim isn't this a use-case for #39?

If we got a mechanism to connect broadway pipelines like suggested on the above issue, this requirement can be done as something like a "filtering pipeline" that pushes messages forward conditionally, and the work is done on batchers . The second pipeline will just handle the happy path.

I'm commenting here just as a mean to explore more what's been discussed so far on the topic of Broadway topologies. If that makes send, we can move the conversation there. Thanks in advance for your time!

@josevalim
Copy link
Member

Not really. Adding new processes to the pipeline is expensive. We want to avoid doing as much as possible. Having different processing needs is not a reason for new processors. The only reason for new processes would be repartioning, which is not necessary when filtering.

@3duard0
Copy link
Contributor

3duard0 commented Jun 4, 2020

I have added a pull request showing the code changes required.

#185

@victorolinasc
Copy link
Author

Thanks @3duard0 and @josevalim !

I'll close this now that it has been merged.

Any chance of this slipping though 0.6.2?

Once again, thanks to you all!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants