Reading Avro files using Apache Flink

Jyoti Sachdeva
2 min readJul 1, 2020

In this blog, we will see how to read the Avro files using Flink.

Before reading the files, let’s get an overview of Flink.

There are two types of processing — batch and real-time.

  • Batch Processing: Processing based on the data collected over time.
  • Real-time Processing: Processing based on immediate data for an instant result.

Real-time processing is in demand and Apache Flink is the real-time processing tool.

Some of the flink features include:

  • Fast speed
  • Support for scala and java
  • Low-latency
  • Fault-tolerance
  • Scalability

Let’s get started.

Step 1:

Add the required dependencies in build.sbt:

name := "flink-demo"version := "0.1"scalaVersion := "2.12.8"libraryDependencies ++= Seq("org.apache.flink" %% "flink-scala" % "1.10.0","org.apache.flink" % "flink-avro" % "1.10.0","org.apache.flink" %% "flink-streaming-scala" % "1.10.0")

Step 2:

The next step is to create a pointer to the environment on which this program runs. In spark, it is similar to spark context.

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

Step 3:

Setting parallelism of x here will cause all operators (such as join, map, reduce) to run with x parallel instance.

I am using 1 as it is a demo application.

env.setParallelism(1)

Step 4:

Defining the Input Format.

public AvroInputFormat(Path filePath, Class type)

It takes two parameters. The first one is the path to Avro file and the second one is the Class type. We will be reading the file as Generic Record.

Later if we want we can cast it to specific type using case classes.

val avroInputFormat = new AvroInputFormat[GenericRecord](new org.apache.flink.core.fs.Path("path to avro file"), classOf[GenericRecord])

Step 5:

Creating input data stream with a specific input format.

def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T]

The createInput function takes the input format as a paramter, we will send avroInputFormat to it. It also requires the TypeInformation.

implicit val typeInfo: TypeInformation[GenericRecord] = TypeInformation.of(classOf[GenericRecord])val recordStream: scala.DataStream[GenericRecord] = env.createInput(avroInputFormat)

Step 6:

Let’s print the data that we will read from Avro files. The print function will act as a sink.

recordStream.print()

Step 7:

Streams are lazy. Let’s now trigger the program execution using execute.

env.execute(jobName = "flink-avro-demo")

To download the complete code, visit flink-avro-demo

Thanks for reading.

Conclusion:
I hope after reading this blog, you will be able to understand how we read the Avro files using Flink.

--

--