When I first dealt with consuming Protobuf messages in Apache Flink it took me some time to get everything working. In this post we will go over a simple example from a project I worked on that demonstrates how to consume Protobuf messages in Apache Flink.
Setup: This example consumes Protobuf messages from an Apache Pulsar source, but of course your use-case may differ. The setup for Protobuf should remain the same or similar. The compiled Protobuf Java class is called TelemetryBatch
.
Since our Protobuf messages are a custom type to Flink, we need a serializer. Refer to documentation on 3rd party serializers here.
Added Protobuf serialization dependency (newer versions may be available and usable):
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-protobuf</artifactId>
<version>0.7.6</version>
<!-- exclusions for dependency conversion -->
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.7.0</version>
</dependency>
Register the TelemetryBatch
Protobuf message with the serializer:
...
import com.twitter.chill.protobuf.ProtobufSerializer;
import ...TelemetryBatch;
...
// Apache Flink setup
Configuration config = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
...
env.getConfig().registerTypeWithKryoSerializer(TelemetryBatch.class, ProtobufSerializer.class);
Define a DeserializationSchema
for our custom Protobuf messages:
package flink.io;
import ...TelemetryBatch;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
public class TelemetryBatchProtoDeserializer implements DeserializationSchema<TelemetryBatch> {
@Override
public TypeInformation<TelemetryBatch> getProducedType() {
return TypeInformation.of(TelemetryBatch.class);
}
@Override
public TelemetryBatch deserialize(byte[] message) throws IOException {
return TelemetryBatch.parseFrom(message);
}
@Override
public boolean isEndOfStream(TelemetryBatch nextElement) {
return false;
}
}
Finally, consume the TelemetryBatch
Protobuf message in Flink from the Pulsar source and process it as desired. In this example TelemetryBatchProcessedSplitter
is just a class implementing FlatMapFunction
with some logic to deal with each TelemetryBatch
message and TelemetryMessageProcessed
is a POJO for holding data about a processed message.
public class TelemetryBatchProcessedSplitter implements FlatMapFunction<TelemetryBatch, TelemetryMessageProcessed> {
...
}
FlinkPulsarSource<TelemetryBatch> source = new FlinkPulsarSource<>(serviceUrl, adminUrl,
PulsarDeserializationSchema.valueOnly(new TelemetryBatchProtoDeserializer()), props)
.setStartFromLatest();
DataStream<TelemetryBatch> batchStream = env.addSource(source);
DataStream<TelemetryMessageProcessed> telemetryProcessedStream =
batchStream.flatMap(new TelemetryBatchProcessedSplitter())
.
.