Consuming Protobuf Messages in Apache Flink Example

7 August 2022

When I first dealt with consuming Protobuf messages in Apache Flink it took me some time to get everything working. In this post we will go over a simple example from a project I worked on that demonstrates how to consume Protobuf messages in Apache Flink.

Setup: This example consumes Protobuf messages from an Apache Pulsar source, but of course your use-case may differ. The setup for Protobuf should remain the same or similar. The compiled Protobuf Java class is called TelemetryBatch.

Since our Protobuf messages are a custom type to Flink, we need a serializer. Refer to documentation on 3rd party serializers here.

Added Protobuf serialization dependency (newer versions may be available and usable):

	<!-- exclusions for dependency conversion -->
<!-- We need protobuf for chill-protobuf -->

Register the TelemetryBatch Protobuf message with the serializer:

import com.twitter.chill.protobuf.ProtobufSerializer;
import ...TelemetryBatch;
// Apache Flink setup
Configuration config = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
env.getConfig().registerTypeWithKryoSerializer(TelemetryBatch.class, ProtobufSerializer.class);

Define a DeserializationSchema for our custom Protobuf messages:


import ...TelemetryBatch;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;


public class TelemetryBatchProtoDeserializer implements DeserializationSchema<TelemetryBatch> {
    public TypeInformation<TelemetryBatch> getProducedType() {
        return TypeInformation.of(TelemetryBatch.class);

    public TelemetryBatch deserialize(byte[] message) throws IOException {
        return TelemetryBatch.parseFrom(message);

    public boolean isEndOfStream(TelemetryBatch nextElement) {
        return false;

Finally, consume the TelemetryBatch Protobuf message in Flink from the Pulsar source and process it as desired. In this example TelemetryBatchProcessedSplitter is just a class implementing FlatMapFunction with some logic to deal with each TelemetryBatch message and TelemetryMessageProcessed is a POJO for holding data about a processed message.

public class TelemetryBatchProcessedSplitter implements FlatMapFunction<TelemetryBatch, TelemetryMessageProcessed> {
FlinkPulsarSource<TelemetryBatch> source = new FlinkPulsarSource<>(serviceUrl, adminUrl,
                PulsarDeserializationSchema.valueOnly(new TelemetryBatchProtoDeserializer()), props)

DataStream<TelemetryBatch> batchStream = env.addSource(source);

DataStream<TelemetryMessageProcessed> telemetryProcessedStream =
        batchStream.flatMap(new TelemetryBatchProcessedSplitter())

comments powered by Disqus