0
votes

Use Case: I have a Google Cloud Dataflow pipeline that processes a stream coming from PubSub. One of the transformations in the pipeline needs to read from a file that kas key value pairs. The size of the file is 130.5 MB. The file works as a lookup file where one column works as they key for the lookup. This is a pubsub to bigquery streaming pipeline.

My current Implementation: I have created an API that is hosted on App Engine. This API takes in a key as a parameter, searches the file and outputs the information based on that key.

My dataflow pipeline has a transformation that calls this API for each and every element of the stream that I get from pubsub. This currently amounts to an average of 1,000,000 calls to the API from the Dataflow job each day. I am expecting this number to grow significantly.

Problem: This transformation has broken down multiple times. I consistently get network issues in the dataflow VMs. This causes the dataflow job to cease processing data and get stuck. This transformation has become a bottleneck for the functioning of the pipeline. As a safeguard, I have set a timeout of 5 seconds on each call to the API. I notice that the API timeouts about 5-6 times each hour.

I believe that this is not the best way to implement this transformation. I am wondering if there is a better way to do this lookup from a dataflow job.

Follow up:

Trying to implement the use case as side input:

The file that I am trying to read is GeoIP2-City.mmdb file. Based on MaxMind documentation, I am using the following code to create a reader object for the file. I want to pass in the DatabaseReader Object as a side input because I don’t want to create this object for each element that I get from the stream.

> PcollectionView<DatabaseReader> reader = pipeline.apply(
>        ParDo.of(
>                new DoFn<Long, DatabaseReader>() {
>                    @ProcessElement
>                    public void process(
>                            @Element Long input, 
    OutputReceiver<DatabaseReader> o) throws IOException { //Following
> MaxMind Documentation to read file
>                            File database = new File(“GeoIP2-City.mmdb”);
>                            DatabaseReader reader = new DatabaseReader
>                                    .Builder(database)
>                                    .fileMode(com.maxmind.db.Reader.FileMode.MEMORY)
>                                    .withCache(new CHMCache())
>                                    .build();
>                            o.output(reader);
>                    }
>                })) .apply(View.asSingleton());

Here is the code where this side input is used:

> PCollection<TableRow> newTableRow =   
> tableRowBeforeTransformation.apply("mmdb lookup", ParDo.of(new
> MmdbReaderFn(reader)).withSideInputs(reader));

I have created a MmdbReaderFn PTransformation class that applies the lookup logic using the side input.

> DatabaseReader reader; PCollectionView<DatabaseReader> readerView;
> 
> public MmdbReaderFn(PCollectionView readerView){    this.readerView =
> readerView;
> 
> }
> 
> @ProcessElement public void processElement(ProcessContext context)
> throws IOException {    TableRow tableRow = context.element();   
> reader = context.sideInput(readerView);    String ip =
> String.valueOf(tableRow.get("ip"));    try {
>        InetAddress ipAddress = InetAddress.getByName(ip);
> 
>        CityResponse response = reader.city(ipAddress);
>        if(response == null) {
>            log.info("Invalid IP address");
>        } else {
> 
>            if(response.getCountry() != null && response.getCountry().getName() != null) {
>                tableRow.set(COUNTRY_NAME, response.getCountry().getName());      // 'Unite States'
>            } else {
>                tableRow.set(COUNTRY_NAME, null);
>            }
> 
>           
>            if(response.getCity() != null && response.getCity().getName() != null) {
>                tableRow.set(CITY_NAME, response.getCity().getName());
>            } else {
>                tableRow.set(CITY_NAME, null);
>            }
> 
>                } catch (Exception e) {
>        log.error("Error process GeoIP2", e);
> 
>    }    context.output(tableRow); }

However, I am getting the following error when trying to create the PCollectionView for the reader.

java.lang.IllegalStateException: Unable to return a default Coder for ParDo(Anonymous)8/ParMultiDo(Anonymous).output [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for com.maxmind.geoip2.DatabaseReader. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.

1
ah that's an interesting use case. thanks for sharing! Is the side input a good option here?Pablo

1 Answers

1
votes

If the 130MB file is not expected to grow in the future you can make use of a SideInput, which will avoid the rpc call out of the pipeline.

Note that the side input is something that does need to fit in memory on the workers, some good content on analysis of the memory needs is available in this article.

If the file is periodically changing you can make use of the the slowly updating side input pattern.