I need to implement a pipeline using apache beam API and is going to run the pipeline on Google Cloud Dataflow. The pipeline logic is like this:
- read real-time ingested unbounded events from Kafka (call it "RawEvent")
- load a reference table from Google BigQuery (this table is updated daily, so everyday at some time point the pipeline need to load it in, call it "RefTable")
- For every RawEvent, if the id show in RefTable then discard the RawEvent, otherwise add it into final output.
So basically I need to have the RefTable stay there as kind of "static reference" in the pipeline for a day and then reload it on a daily basis.
I wonder if load RefTable as side-input is the right solution. There are several questions I cannot answer which really confused me. I wonder if anybody can point me to the right direction, especially with regarding to what happened under the hood of the pipeline. :
will the side-input stay and available through out the pipeline life time? Particularly in my case, if I load my RefTable via
BigQueryIO, make it a side-input, will it be available to all unbounded RawEvent? Or it's like if the window passed, then the RefTable's just gone? (Plus I don't even know if I need to apply windowing/triggering on RefTable since it's bounded data)the other side of the input is unbounded RawEvent, windowing/triggering is required. Then, is the windowing/triggering to the RefTable required too?
How I can specify in the pipeline that the RefTable need to be reload once a day?
Update: Upon @jkff's hacking way, I figured out the following working version. What it can do is: load the RefTable every 1 minute, which is very close to my goal, but not yet.
I couldn't feature out a way of using BigQueryIO in my "Load Ref Table" ParDo. So that I'd have to somehow re-invent the wheel by using BigQuery client library. and then it added more headaches to worry about exception handling and etc.
by using fixed window to generate "ticks", my "Filter" transform will only be triggered at the end of the window. In the reality, what I want is to reload RefTable only every 1 hour (or longer) and each time when RefTable is completely loaded (it usually only take less than 1 minute to finish the loading), the filter should start and be applied to any raw data which is with a later timestamp. Right now, with my code, if I change the ticks window from 1 minute to 1 hour, I need to wait for 1 hour for the "Filter" transform to be triggered, even it only took less than a minute for the RefTable to be actually loaded in.
Assuming there's a way to early trigger "Filter" before the end of a "ticks" window, how I can define the trigger based off the complishment of RefTable load? (I mean, trigger "Filter" whenever the RefTable is all loaded and also)
4, Last but not least import thing is : when next reload is about to start, I need to hold new raw data on waiting until reloading finishes. because all new data comes in the next hour is suppposed to be filtered against the refreshed RefTable.
Could you privide any insights on this. Really appreciated!
PCollection<Long> ticks = p
// Produce 1 "tick" per 1 second
.apply(GenerateSequence.from(0).withRate(1L,Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))
)
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());
// Produce a collection of maps, 1 per each 1-minute window
PCollectionView<Map<String,String>> banedDeviceMapView = ticks
.apply("Load Ref Table"
,ParDo.of(new DoFn<Long,KV<String,String>>(){
@ProcessElement
public void processElement(ProcessContext c)
{
TableId table = TableId.of("project","dataset","RefTable");
TableResult tableData =
BIGQUERY_CLIENT.listTableData(table,GetSchema());
Map<String,String> resultMap = new HashMap();
for (FieldValueList row : tableData.iterateAll()) {
Object key = row.get("HardwareId").getValue();
if(key!=null)
{
String hardwareId = (String)key;
resultMap.putIfAbsent(hardwareId,hardwareId);
}
}
int num = 0;
for (Map.Entry<String, String> entry : resultMap.entrySet()) {
c.output(KV.of(entry.getKey(),entry.getValue()));
num++;
}
System.out.println(num + " banded devices have been loaded.");
}
})
)
.apply(View.asMap());
PCollection<KafkaRecord<String, GPS>> rawLoad = p.apply("Read Events from Kafka"
, KafkaIO.<String, GPS>read()
.withBootstrapServers("localhost:9092")
.withTopic(SOURCE_GPS_TOPIC_NAME)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(GPSEventAvroDeserializer.class)
.updateConsumerProperties(ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
);
PCollection<KV<String, GPS>> validGps = rawLoad.apply("Extract Gps from Kafka", ParDo.of(
new DoFn<KafkaRecord<String, GPS>, KV<String, GPS>>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.print("KafkaRecord: KV.of("+ c.element().getKV().getKey());
System.out.println("," + c.element().getKV().getValue().getSpeed() + ")");
c.output(c.element().getKV());
}
}))
.apply("Windowing Raw Events",Window.into(FixedWindows.of(Duration.standardSeconds(1)))
)
.apply("Filter",ParDo.of(
new DoFn<KV<String,GPS>,KV<String,GPS>>(){
@ProcessElement
public void processElement(ProcessContext c){
Map<String,String> bandedDevices = c.sideInput(banedDeviceMapView);
String deviceId = c.element().getKey();
System.out.print("Checking device: "+ deviceId);
System.out.println(" - in bandedDevices? " + bandedDevices.containsKey(deviceId));
if(!bandedDevices.containsKey(deviceId)){
c.output(c.element());
}else{
System.out.println("Device " + deviceId + " is removed from results");
}
}
}).withSideInputs(banedDeviceMapView)
);