Use Case: I have a Google Cloud Dataflow pipeline that processes a stream coming from PubSub. One of the transformations in the pipeline needs to read from a file that kas key value pairs. The size of the file is 130.5 MB. The file works as a lookup file where one column works as they key for the lookup. This is a pubsub to bigquery streaming pipeline.
My current Implementation: I have created an API that is hosted on App Engine. This API takes in a key as a parameter, searches the file and outputs the information based on that key.
My dataflow pipeline has a transformation that calls this API for each and every element of the stream that I get from pubsub. This currently amounts to an average of 1,000,000 calls to the API from the Dataflow job each day. I am expecting this number to grow significantly.
Problem: This transformation has broken down multiple times. I consistently get network issues in the dataflow VMs. This causes the dataflow job to cease processing data and get stuck. This transformation has become a bottleneck for the functioning of the pipeline. As a safeguard, I have set a timeout of 5 seconds on each call to the API. I notice that the API timeouts about 5-6 times each hour.
I believe that this is not the best way to implement this transformation. I am wondering if there is a better way to do this lookup from a dataflow job.
Follow up:
Trying to implement the use case as side input:
The file that I am trying to read is GeoIP2-City.mmdb file. Based on MaxMind documentation, I am using the following code to create a reader object for the file. I want to pass in the DatabaseReader Object as a side input because I don’t want to create this object for each element that I get from the stream.
> PcollectionView<DatabaseReader> reader = pipeline.apply(
> ParDo.of(
> new DoFn<Long, DatabaseReader>() {
> @ProcessElement
> public void process(
> @Element Long input,
OutputReceiver<DatabaseReader> o) throws IOException { //Following
> MaxMind Documentation to read file
> File database = new File(“GeoIP2-City.mmdb”);
> DatabaseReader reader = new DatabaseReader
> .Builder(database)
> .fileMode(com.maxmind.db.Reader.FileMode.MEMORY)
> .withCache(new CHMCache())
> .build();
> o.output(reader);
> }
> })) .apply(View.asSingleton());
Here is the code where this side input is used:
> PCollection<TableRow> newTableRow =
> tableRowBeforeTransformation.apply("mmdb lookup", ParDo.of(new
> MmdbReaderFn(reader)).withSideInputs(reader));
I have created a MmdbReaderFn PTransformation class that applies the lookup logic using the side input.
> DatabaseReader reader; PCollectionView<DatabaseReader> readerView;
>
> public MmdbReaderFn(PCollectionView readerView){ this.readerView =
> readerView;
>
> }
>
> @ProcessElement public void processElement(ProcessContext context)
> throws IOException { TableRow tableRow = context.element();
> reader = context.sideInput(readerView); String ip =
> String.valueOf(tableRow.get("ip")); try {
> InetAddress ipAddress = InetAddress.getByName(ip);
>
> CityResponse response = reader.city(ipAddress);
> if(response == null) {
> log.info("Invalid IP address");
> } else {
>
> if(response.getCountry() != null && response.getCountry().getName() != null) {
> tableRow.set(COUNTRY_NAME, response.getCountry().getName()); // 'Unite States'
> } else {
> tableRow.set(COUNTRY_NAME, null);
> }
>
>
> if(response.getCity() != null && response.getCity().getName() != null) {
> tableRow.set(CITY_NAME, response.getCity().getName());
> } else {
> tableRow.set(CITY_NAME, null);
> }
>
> } catch (Exception e) {
> log.error("Error process GeoIP2", e);
>
> } context.output(tableRow); }
However, I am getting the following error when trying to create the PCollectionView for the reader.
java.lang.IllegalStateException: Unable to return a default Coder for ParDo(Anonymous)8/ParMultiDo(Anonymous).output [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for com.maxmind.geoip2.DatabaseReader. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.