I've been trying to build a dataflow pipeline that takes in data from Pubsub and publishes it to Bigtable or Bigquery. I can write the raw data for 1 sensor, but I can't do that for thousands of sensors when I try to calculate the mean of a window of data (60 seconds).
To illustrate the scenario:
My data payload
data = {
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location": "location1",
"name" : "name1",
"datapoint1" : "Some Integer",
"datapoint2" : "Some Integer",
"datapoint3" : "Some String",
.....
"datapointN" : "Some Integer",
}
In my example there will be thousands of sensors with the fullname "{location}_{name}". For each sensor, I would like to window the data to 60 seconds and calculate the average of that data.
The final form I am expecting
I will take this final form which exists as 1 element in order to insert into Bigtable and Bigquery
finalform = {
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location": "location1",
"name" : "name1",
"datapoint1" : "Integer That Has Been Averaged",
"datapoint2" : "Integer That Has Been Averaged",
"datapoint3" : "String that has been left alone",
.....
"datapointN" : "Integer That Has Been Averaged",
}
My solution so far which needs help.
p = beam.Pipeline()
rawdata = p | "Read" >> beam.io.ReadFromPubSub(topic=topic)
jsonData = rawdata | "Parse Json" >> beam.Map(json.loads)
windoweddata = jsonData|beam.WindowInto(window.FixedWindows(60))
groupedData = windoweddata | beam.GroupBy(location=lambda s: s["location"], name=lambda s: s["name"])
Now after the last line I am stuck. I want to be able to apply the CombinedValues in order to use mean. However, after applying GroupBy I get a tuple (namedkey,value). Then I run a ParDo to work on that to split the json up into (key,value) tuples to prepare it for CombinedValues, all the data is mixed up again, and sensor data from various locations are now mixed in the PCollection.
My challenges
So the in its clearest form I have 2 main challenges:
- How do I apply combinedvalues to my pipeline
- How do I apply mean onto the pipeline but ignore the "string" type entries
Any help will be greatly welcomed.
My partial solution so far with help from chamikara
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam import window
class AverageFn(beam.CombineFn):
def create_accumulator(self):
print(dir(self))
return (1,2,3,4,5,6)
def add_input(self, sum_count, input):
print("add input",sum_count,input)
return sum_count
def merge_accumulators(self, accumulators):
print(accumulators)
data = zip(*accumulators)
return data
def extract_output(self, sum_count):
print("extract_output",sum_count)
data = sum_count
return data
with beam.Pipeline() as pipeline:
total = (
pipeline
| 'Create plant counts' >> beam.Create([
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L1",
"name":"S1",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L1",
"name":"S2",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:41.634717+08:00",
"location":"L2",
"name":"S3",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L1",
"name":"S1",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L1",
"name":"S2",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:55:51.634717+08:00",
"location":"L2",
"name":"S3",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L1",
"name":"S1",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L1",
"name":"S2",
"data1":1,
"data2":"STRING",
"data3":3,
},
{
"timestamp": "2021-01-27 13:56:01.634717+08:00",
"location":"L2",
"name":"S3",
"data1":1,
"data2":"STRING",
"data3":3,
},
])
| beam.GroupBy(location=lambda s: s["location"], name=lambda s: s["name"])
| beam.CombinePerKey(AverageFn())
| beam.Map(print))