16
votes

I have a use case where I initialise a HashMap that contains a set of lookup data (information about the physical location etc. of IoT devices). This lookup data serves as reference data for a 2nd dataset which is a PCollection. This PCollection is a dataflow that provides the data that the IoT devices record. The dataflow from the IoT devices uses an Apache Beam pipeline that runs as a Google Dataflow utilising Google Cloud pub/sub.

When I process the PCollection (the device data), I link the Google Cloud pub/sub data to the related lookup entry in the HashMap.

I need to update the HashMap, based on a 2nd pub/sub that pushes changes to its data. Here's how I'm getting a PCollection so far and doing a lookup using the HashMap:

HashMap -> contains pre-loaded lookup data (information about the IoT devices)

PCollection -> contains data from a pipeline dataflow (the data recorded by the IoT devices)

I'm generating a HashMap for the IoT device lookup data as a singleton:

public class MyData {

    private static final MyData instance = new MyData ();

    private MyData () {     
            HashMap myDataMap = new HashMap<String, String>();          
               ... logic to populate the map

            this.referenceData = myDataMap;

    }

    public HashMap<Integer, DeviceReference> referenceData;

    public static DeviceData getInstance(){
        return instance;
    }
}

I then use the HashMap in a different class where I'm subscribing to updates to the data (these are messages that e.g. give me new data that relates to the entities already stored in the HashMap). I'm subscribing to changes using a Google pub/sub with Apache beam:

HashMap<String, String> referenceData = MyData.getInstance().referenceData;

Pipeline pipeLine = Pipeline.create(options);           

// subscribe to changes in data

org.apache.beam.sdk.values.PCollection myDataUpdates;

myDataUpdates = pipeLine.begin()
    .apply(String.format("Subscribe to data updates"),
        PubsubIO.readStrings().fromTopic(
                String.format("myPubSubPath")));

What I want to do is to efficiently apply the data updates to the singleton HashMap (i.e. manipulate the HashMap based on my data subscription). How can I do this?

I have a limited understanding of Apache Beam and I only know how to do transforms on pipeline data to create another, separate PCollection. I think that this is the point of Beam, that it is for transforming large data sets into a different form. Is there a way of achieving what I need (updating a dataset based on a pub/sub subscription) using Apache Beam, or is there another way I can update the HashMap using pub/sub? (I can't poll for the data as it creates too much latency and cost, I need to update the HashMap using a subscription).


The Google cloud docs show a way of directly subscribing to a Google Cloud pub/sub that isn't linked to an Apache Beam pipeline. This is promising as a potential solution, and relies on the following Maven dependency:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>1.53.0</version>
</dependency>

I'm getting a conflict though, which is a conflict with the following Maven dependencies for Apache Beam:

<dependency>
  <groupId>com.google.cloud.dataflow</groupId>
  <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
  <version>2.5.0</version>
</dependency>

The issue is documented in a separate question here - Maven conflict in Java app with google-cloud-core-grpc dependency. From what I'm seeing it seems that it doesn't matter which version of the google-cloud-pubsub Maven artifact I use, as from what I've figured out it looks like the v.2.5.0 beam dependency and below will always conflict with any current version of the google dependency.

(I've raised this as an issue in the Beam Jira - https://issues.apache.org/jira/browse/BEAM-6118)


I'm currently investigating side inputs and combine as a way to achieve updating of the HashMap:

https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine

Example 10 shows a way that .getSideInputsMap() can be applied to a payload. I'm wondering if I can apply this somehow to my subscription to the lookup data changes. If I get a PCollection like this, I can't directly chain .getSideInputsMap() to the PCollection

deviceReferenceDataUpdates = pipeLine.begin()
    .apply("Get changes to the IoT device lookup data"),
         PubsubIO.readMessages().fromTopic("IoT device lookup data")).

I've asked a separate question specifically about how I might be able to use .getSideInputsMap() - Apache Beam - how can I apply .getSideInputsMap to a subscription to a Google pub/sub?

1
I'm a bit confused by this question. Cloud pub/sub is generally used for queuing messages in a distributed system, but a singleton HashMap is the opposite of a distributed system. Do you have a distributed system or don't you? If you want to scale to a distributed solution, you are probably better off abandoning the idea of a single HashMap, and switch to some kind of string-keyed object storage instead.Daniel Pryden
Hi @Daniel. Thanks, you prompted me to better explain the question. I've added an intro to the OP to help clarify my use case. I'm not really understanding the benefit of having string-keyed object storage in another part of the system. Now that I've better explained the use case does that change your assessment? I suspect that you've maybe highlighted a gap in my understanding of best-practice in handling of distributed data. Any references you can point me to are much appreciated :)Chris Halcrow
I still think you're fixated on the idea of a system where you can keep important state in a single in-memory data structure (HashMap in this case). If you are dealing with a distributed system, you shouldn't assume that you have only one of anything. Rather than trying to listen to all mutations everywhere and update a single process's memory on a single machine, you want a distributed object storage system that would serve the same purpose as your HashMap. You tagged this question "bigdata", but if you can fit all your data in memory on one machine, then you don't actually have Big Data.Daniel Pryden
Thanks @Daniel. I'm relatively new to Big Data and distributed system considerations. The idea of the HashMap is to enable rapid lookup of reference data that's related to pipeline data. The reference data may be tens of thousands of records at most and I need to rapidly link it to the pipeline data. I can't poll for the reference data each time pipeline data is processed because of Google Cloud platform cost constraint and scalablity concerns, so I'm not clear what you're suggesting as an approach. I'm subscribing to the reference data updates via Google Cloud Platform pub/sub.Chris Halcrow
Take into account that a plain Java HashMap is not thread-safe. Any attempt of concurrent modification is doomed up-front. Even if you use a ConcurrentHashMap (or any other type of synchronization) you face thread contention performance problem because memory is faster than disk but still can serve only a limited number of clients. As suggested in another comments, I would first consider the possibility of a fully distributed system. Why do you need a singleton Map? Each data collector will probably read data only from a limited number of devices, right? Can you consolidate that data later?Serg M Ten

1 Answers

1
votes

I found a way of doing this within the Apache Beam framework, as follows (not fully tested).

Note - take into account the comment on the OP from @Serg M Ten that a better approach may be to consolidate the data later, instead of trying to join the lookup data as part of the transformation processing.


Singleton HashMap

See my answer here - Accessing a HashMap from a different class


Pipeline (on single thread, implemented in main)

// initialise singleton HashMap containing lookup data on bootstrap:
LookupData lookupData = LookupData.getInstance();

org.apache.beam.sdk.values.PCollection lookupDataUpdateMessage;

lookupDataUpdateMessage = pipeLine.begin()
                              .apply("Extract lookup update data", PubsubIO.readStrings().fromTopic("myLookupUpdatePubSubTopic"))
                              .apply("Transform lookup update data",
                                 ParDo.of(new TransformLookupData.TransformFn()));

                     org.apache.beam.sdk.values.PCollection lookupDataMessage;

Transform

import java.io.Serializable;

import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.json.JSONObject;

import myLookupSingletonClass;
import myLookupUpObjectClass;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Strings;


public class TransformDeviceMeta

    public static class TransformFn extends DoFn<String, MyLookupData> {

        @ProcessElement
        public void processElement(ProcessContext c) 
        {   
            LookupData lookupData = LookupData.getInstance();

            MyLookupData myLookupDataUpdate = new MyLookupData();

            try 
            {           
                byte[] payload = c.element().getBytes();
                String myLookUpDataJson = new JSONObject(new String(payload)).toString();

                ObjectMapper mapper = new ObjectMapper();
                myLookUpDataUpdate = mapper.readValue(myLookUpDataJson , MyLookupData.class);

                String updatedLookupDataId = updatedLookupDataId.id;

                // logic for HashMap updating e.g:

                    lookupData.myHashMap.remove(updatedDeviceId);
                }
                else {
                    lookupData.myHashMap.put(updatedDeviceId, deviceMetaUpdate);    
                }
            } 
            catch (Exception ex) {
                Log.error(ex.getMessage());
                System.out.println("Error " + ex.getMessage());
            }
        }       
    }   
}

MyLookupData = Class that forms the model for the lookup data