Skip to content

Latest commit

 

History

History

continuous-file-reading

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Continuously reading CSV files

Reading CSV files in Apache Flink

To get started with your first event processing application, you will need to read data from one or multiple sources.

In this recipe, you are going to continuously read CSV-formatted files from a folder and transform this data into your data model.

This recipe for Apache Flink is a self-contained recipe that you can directly copy and run from your favorite editor. There is no need to download Apache Flink or Apache Kafka.

The CSV input data

The recipe will generate one or more comma-separated values (CSV) files in a temporary directory. The files are encoded in UTF-8.

1,Shadowfax,Sauerfort
2,Quickbeam,Tremblayside
3,Galadriel,Latriciamouth
4,Denethor,Koeppshire
5,Théoden,Larsonland
6,Tom Bombadil,Hegmannton
7,Barliman Butterbur,Madelinechester

The data model

You want to consume these records in your Apache Flink application and make them available in the data model. The data model is defined in the following POJO:

@JsonPropertyOrder({"id", "character", "location"})
public class Event {

    /** A Flink POJO must have public fields, or getters and setters */
    public long id;

    public String character;
    public String location;

You have to explicitly define the order of fields using the @JsonPropertyOrder annotation.

Setup a FileSource

You are using the Apache Flink FileSource connector in the application to connect to your local file system. You can use Flink's pluggable file systems to connect to other file systems, such as S3.

You can specify a location by providing the argument --inputFolder.

    public static void main(String[] args) throws Exception {
        final ParameterTool parameters = ParameterTool.fromArgs(args);

        Path inputFolder = new Path(parameters.getRequired("inputFolder"));

        runJob(inputFolder);
    }

Then you configure the CsvReaderFormat to use the defined POJO to parse the CSV files.

        CsvReaderFormat<Event> csvFormat = CsvReaderFormat.forPojo(Event.class);

To complete the setup you configure the FileSource connector with the defined csvFormat and the directory that you want to monitor. You configure the connector to monitor the directory every 5 seconds for any new files. Because you are monitoring this directory continuously, the connector is set to streaming (unbounded) mode.

        FileSource<Event> source =
                FileSource.forRecordStreamFormat(csvFormat, dataDirectory)
                        .monitorContinuously(Duration.of(5, SECONDS))
                        .build();

        final DataStreamSource<Event> file =
                env.fromSource(source, WatermarkStrategy.noWatermarks(), "File");

The full recipe

This recipe is self-contained. You can run the ContinuousFileReadingTest#testProductionJob class to see the full recipe in action. The test generates csv files into a temporary directory and will output the data to the console. You can run it directly via Maven or in your favorite editor such as IntelliJ IDEA or Visual Studio Code.

See the comments included in the code for more details.