Consuming Protobuf Messages in Apache Flink Example

7 August 2022

When I first dealt with consuming Protobuf messages in Apache Flink it took me some time to get everything working. In this post we will go over a simple example from a project I worked on that demonstrates how to consume Protobuf messages in Apache Flink.

Setup: This example consumes Protobuf messages from an Apache Pulsar source, but of course your use-case may differ. The setup for Protobuf should remain the same or similar. The compiled Protobuf Java class is called TelemetryBatch.

Since our Protobuf messages are a custom type to Flink, we need a serializer. Refer to documentation on 3rd party serializers here.

Added Protobuf serialization dependency (newer versions may be available and usable):

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>chill-protobuf</artifactId>
	<version>0.7.6</version>
	<!-- exclusions for dependency conversion -->
	<exclusions>
		<exclusion>
			<groupId>com.esotericsoftware.kryo</groupId>
			<artifactId>kryo</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>3.7.0</version>
</dependency>

Register the TelemetryBatch Protobuf message with the serializer:

...
import com.twitter.chill.protobuf.ProtobufSerializer;
import ...TelemetryBatch;
...
// Apache Flink setup
Configuration config = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
...
env.getConfig().registerTypeWithKryoSerializer(TelemetryBatch.class, ProtobufSerializer.class);

Define a DeserializationSchema for our custom Protobuf messages:

package flink.io;

import ...TelemetryBatch;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;

public class TelemetryBatchProtoDeserializer implements DeserializationSchema<TelemetryBatch> {
    @Override
    public TypeInformation<TelemetryBatch> getProducedType() {
        return TypeInformation.of(TelemetryBatch.class);
    }

    @Override
    public TelemetryBatch deserialize(byte[] message) throws IOException {
        return TelemetryBatch.parseFrom(message);
    }

    @Override
    public boolean isEndOfStream(TelemetryBatch nextElement) {
        return false;
    }
}

Finally, consume the TelemetryBatch Protobuf message in Flink from the Pulsar source and process it as desired. In this example TelemetryBatchProcessedSplitter is just a class implementing FlatMapFunction with some logic to deal with each TelemetryBatch message and TelemetryMessageProcessed is a POJO for holding data about a processed message.

public class TelemetryBatchProcessedSplitter implements FlatMapFunction<TelemetryBatch, TelemetryMessageProcessed> {
    ...
}
FlinkPulsarSource<TelemetryBatch> source = new FlinkPulsarSource<>(serviceUrl, adminUrl,
                PulsarDeserializationSchema.valueOnly(new TelemetryBatchProtoDeserializer()), props)
                .setStartFromLatest();

DataStream<TelemetryBatch> batchStream = env.addSource(source);

DataStream<TelemetryMessageProcessed> telemetryProcessedStream =
        batchStream.flatMap(new TelemetryBatchProcessedSplitter())
        .
        .

comments powered by Disqus