3
votes

I've two files in cloud storage.Contains of File1 in Avro format that has data from temperature sensor.

time_stamp     |  Temperature
1000           |  T1
2000           |  T2
3000           |  T3
4000           |  T3
5000           |  T4
6000           |  T5

Contains of File2 in Avro format that has data from wind sensor.

time_stamp     |  wind_speed
500            |  w1
1200           |  w2
1500           |  w3
2200           |  w4
2500           |  w5
3000           |  w6

I'want to combine output like below

time_stamp |Temperature|wind_speed
1000       |T1         |w1 (last earliest reading from wind sensor at 500)
2000       |T2         |w3 (last earliest reading from wind sensor at 1500)
3000       |T3         |w6 (wind sensor reading at 3000)
4000       |T3         |w6 (last earliest reading from wind sensor at 3000)
5000       |T4         |w6 (last earliest reading from wind sensor at 3000)
6000       |T5         |w6(last earliest reading from wind sensor at 3000)

I am looking for the solution in apache beam to combine above file. Right now it is reading from file but in future it may come via pubsub. I want to find out custom way of combining two PCollection and create another PCollection tempDataWithWindSpeed.

     PCollection<Temperature> tempData = p.apply(AvroIO
         .read(AvroAutoGenClass.class)
         .from("gs://my_bucket/path/to/temp-sensor-data.avro")

     PCollection<WindSpeed> windData = p.apply(AvroIO
         .read(AvroAutoGenClass.class)
         .from("gs://my_bucket/path/to/wind-sensor-data.avro")

     PCollection<WindSpeed> tempDataWithWindSpeed = ?
1
There is several solutions. Can you add more detail? For example, is the temperature timestamp is as regular as displayed in the example? Is stream processing or always batch processing? Do you perform many extra transform after the merge in your pipeline? What kind of transform? - guillaume blaquiere
Here is a good example, how to join them: beam.apache.org/documentation/pipelines/design-your-pipeline/… - jszule
@guillaumeblaquiere How does transform affect the solution. Right now it is batch processing. - pinakin
@jszule I saw that example and join key used is name of user. I don't have join key directly, I need some custom solution to join. - pinakin
Still you can go with join the sources, just you have to create a KV values like KV<Long, WindData> and KV<Long, TempeData>, where the key is the timestamp bin in both cases where the timestamp belongs to (Eg: 2200 belongs to key 2000, so you have to round down to thousands). Once you have created the groups, you can select the min or max or whatever you need sensor value. Hope this helps :) - jszule

1 Answers

2
votes

The comment by @jszule is a good answer in general for Dataflow/Beam: The best supported join is when the two PCollections have a common key. For most data Beam can figure out a schema and you can use CoGroup.join transform. The design decision you have to make is how to choose the keys, such as rounding down to the nearest 1000.

Your use case has a complication: you need to carry forward values in a time series for keys that have no data. The solution is to use state and timers to generate the "missing" values. You will still need to carefully choose keys since state and timers are per-key-and-window. State and timers also work in batch mode so this is a batch/streaming unified solution.

You may want to read this blog post by Reza Rokni and myself on the subject, or this talk by Reza at the Beam Summit Berlin 2019