A common use case is where you will need to split a stream of events or route specific events to a specific location.
In this recipe, you are going to consume events from Apache Kafka which have a Priority
value in the payload.
All events with a Critical
or Major
priority will be routed to different outputs.
This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.
The recipe uses Kafka topic input
, containing JSON-encoded records.
{"id":1,"data":"NL31PJMQ9001080613","priority":"MAJOR"}
{"id":2,"data":"AD5934360143N91217Gfa7hA","priority":"MINOR"}
{"id":3,"data":"ST35710189607894938826568","priority":"CRITICAL"}
{"id":4,"data":"IT90R5678911215DpWA54OpHM17","priority":"CRITICAL"}
{"id":5,"data":"RS93089245532951564296","priority":"MAJOR"}
You are going to use Flink's Side Output to split the stream of events. Side Outputs can be used to split a stream in n-ways, into streams of different types with excellent performance.
To use Side Outputs, you first define an OutputTag
to identify your side output stream.
final Map<Event.Priority, OutputTag<Event>> tagsByPriority =
Map.ofEntries(
entry(Event.Priority.CRITICAL, new OutputTag<>("critical") {}),
entry(Event.Priority.MAJOR, new OutputTag<>("major") {}),
entry(Event.Priority.MINOR, new OutputTag<>("minor") {}));
You are going to emit to your defined side output using Flink's ProcessFunction
.
By using the Context
parameter you are emitting data to the previously defined side output.
final SingleOutputStreamOperator<Event> process =
source.process(
new ProcessFunction<>() {
@Override
public void processElement(
Event value,
ProcessFunction<Event, Event>.Context ctx,
Collector<Event> out) {
final OutputTag<Event> selectedOutput =
tagsByPriority.get(value.priority);
ctx.output(selectedOutput, value);
}
});
This recipe is self-contained. You can run the SplitStreamTest#testProductionJob
class to see the full recipe
in action. That test uses an embedded Apache Kafka and Apache Flink setup, so you can run it directly via Maven or in
your favorite editor such as IntelliJ IDEA or Visual Studio Code.
See the comments included in the code for more details.