Reading Avro files using Apache Flink

In this blog, we will see how to read the Avro files using Flink.

Before reading the files, let’s get an overview of Flink.

There are two types of processing — batch and real-time.

  • Batch Processing: Processing based on the data collected over time.
  • Real-time Processing: Processing based on immediate data for an instant result.

Real-time processing is in demand and Apache Flink is the real-time processing tool.

Some of the flink features include:

  • Fast speed
  • Support for scala and java
  • Low-latency
  • Fault-tolerance
  • Scalability

Let’s get started.

Step 1:

Add the required dependencies in build.sbt:

name := "flink-demo"version := "0.1"scalaVersion := "2.12.8"libraryDependencies ++= Seq("org.apache.flink" %% "flink-scala" % "1.10.0","org.apache.flink" % "flink-avro" % "1.10.0","org.apache.flink" %% "flink-streaming-scala" % "1.10.0")

Step 2:

The next step is to create a pointer to the environment on which this program runs. In spark, it is similar to spark context.

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

Step 3:

Setting parallelism of x here will cause all operators (such as join, map, reduce) to run with x parallel instance.

I am using 1 as it is a demo application.

env.setParallelism(1)

Step 4:

Defining the Input Format.

public AvroInputFormat(Path filePath, Class type)

It takes two parameters. The first one is the path to Avro file and the second one is the Class type. We will be reading the file as Generic Record.

Later if we want we can cast it to specific type using case classes.

val avroInputFormat = new AvroInputFormat[GenericRecord](new org.apache.flink.core.fs.Path("path to avro file"), classOf[GenericRecord])

Step 5:

Creating input data stream with a specific input format.

def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T]

The createInput function takes the input format as a paramter, we will send avroInputFormat to it. It also requires the TypeInformation.

implicit val typeInfo: TypeInformation[GenericRecord] = TypeInformation.of(classOf[GenericRecord])val recordStream: scala.DataStream[GenericRecord] = env.createInput(avroInputFormat)

Step 6:

Let’s print the data that we will read from Avro files. The print function will act as a sink.

recordStream.print()

Step 7:

Streams are lazy. Let’s now trigger the program execution using execute.

env.execute(jobName = "flink-avro-demo")

To download the complete code, visit flink-avro-demo

Thanks for reading.

Conclusion:
I hope after reading this blog, you will be able to understand how we read the Avro files using Flink.

--

--

--

Data Engineer

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

What Are the Chief Benefits of Data Center Migration?

Definition of Done — The What,Why and practically use it for creating engineering marvels

Building a CRUD with Spring Webflux

AWS Network Diagram Tool

Top 8 Web development trends for 2018

Connecting Wekinator & Arduino

LEGO EV3 Robot that plays a game of ‘Connect Four’

Firebase Free Hosting for Static Websites

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Jyoti Sachdeva

Jyoti Sachdeva

Data Engineer

More from Medium

flatMapValues in Scala

Stream avro data from kafka over ssl to Apache pinot

Understanding Apache Hive LLAP

About reading raw json files in spark