0
votes

I've been trying to build a dataflow pipeline that takes in data from Pubsub and publishes it to Bigtable or Bigquery. I can write the raw data for 1 sensor, but I can't do that for thousands of sensors when I try to calculate the mean of a window of data (60 seconds).

To illustrate the scenario:

My data payload

data = {
  "timestamp": "2021-01-27 13:56:01.634717+08:00",
  "location": "location1",
  "name" : "name1",
  "datapoint1" : "Some Integer",
  "datapoint2" : "Some Integer",
  "datapoint3" : "Some String",
   .....
  "datapointN" : "Some Integer",
}

In my example there will be thousands of sensors with the fullname "{location}_{name}". For each sensor, I would like to window the data to 60 seconds and calculate the average of that data.

The final form I am expecting

I will take this final form which exists as 1 element in order to insert into Bigtable and Bigquery

finalform = {
  "timestamp": "2021-01-27 13:56:01.634717+08:00",
  "location": "location1",
  "name" : "name1",
  "datapoint1" : "Integer That Has Been Averaged",
  "datapoint2" : "Integer That Has Been Averaged",
  "datapoint3" : "String that has been left alone",
   .....
  "datapointN" : "Integer That Has Been Averaged",
}

My solution so far which needs help.

p = beam.Pipeline()
rawdata = p | "Read" >> beam.io.ReadFromPubSub(topic=topic)
jsonData = rawdata | "Parse Json" >> beam.Map(json.loads)
windoweddata = jsonData|beam.WindowInto(window.FixedWindows(60))
groupedData = windoweddata | beam.GroupBy(location=lambda s: s["location"], name=lambda s: s["name"]) 

Now after the last line I am stuck. I want to be able to apply the CombinedValues in order to use mean. However, after applying GroupBy I get a tuple (namedkey,value). Then I run a ParDo to work on that to split the json up into (key,value) tuples to prepare it for CombinedValues, all the data is mixed up again, and sensor data from various locations are now mixed in the PCollection.

My challenges

So the in its clearest form I have 2 main challenges:

  1. How do I apply combinedvalues to my pipeline
  2. How do I apply mean onto the pipeline but ignore the "string" type entries

Any help will be greatly welcomed.

My partial solution so far with help from chamikara

import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam import window
class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    print(dir(self))
    return (1,2,3,4,5,6)

  def add_input(self, sum_count, input):
    print("add input",sum_count,input)
    return sum_count

  def merge_accumulators(self, accumulators):
    print(accumulators)
    data = zip(*accumulators)
    return data

  def extract_output(self, sum_count):
    print("extract_output",sum_count)
    data = sum_count
    return data

with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":1,
              "data2":"STRING",
              "data3":3,
          },
      ])
      | beam.GroupBy(location=lambda s: s["location"], name=lambda s: s["name"]) 
      | beam.CombinePerKey(AverageFn())
      | beam.Map(print))
2

2 Answers

1
votes
  1. Please see Combine section (particularly, CombinePerKey) here. You should first arrange your data into a PCollection of KVs with an appropriate key (for example a combination of location and name). This PCollection can be followed by a CombinePerKey with a CombineFn implementation that combines given data objects (by averaging respective fields).
  2. This should be done within your CombineFn implementation, where you should combine relavent fields and ignore string fields.
0
votes

The final answer is as below. The breakthrough for me was to realise not to use GroupBy but instead to use beam.Map because beam.Map is 1 to 1 transformation. I am transforming 1 row of my data into a tuple with (key,data) where the key is basically whatever I specify to be the unique identifier using Beam.Row() for that row that later I will collect and act on using combineperkey

import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam import window

DATA = [
        {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":1,
              "data2":"STRING",
              "data3":5,
              "data4":5,
          },
          {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":9,
              "data2":"STRING",
              "data3":2,
              "data4":2,
          },
          {
              "timestamp": "2021-01-27 13:55:41.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":10,
              "data2":"STRING",
              "data3":4,
              "data4":1,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":11,
              "data2":"STRING",
              "data3":2,
              "data4":7,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":1,
              "data2":"STRING",
              "data3":4,
              "data4":8,
          },
          {
              "timestamp": "2021-01-27 13:55:51.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":9,
              "data2":"STRING",
              "data3":7,
              "data4":8,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L1",
              "name":"S1",
              "data1":2,
              "data2":"STRING",
              "data3":3,
              "data4":5,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L1",
              "name":"S2",
              "data1":6,
              "data2":"STRING",
              "data3":7,
              "data4":6,
          },
          {
              "timestamp": "2021-01-27 13:56:01.634717+08:00",
              "location":"L2",
              "name":"S3",
              "data1":8,
              "data2":"STRING",
              "data3":1,
              "data4":2,
          },
]

class AverageFn2(beam.CombineFn):
  def create_accumulator(self):
    accumulator = {},0 #Set accumulator to be payload and count
    return accumulator

  def add_input(self, accumulator, input):
    rowdata, count = accumulator
    
    # Go through each item and try to add it if it is a float if not it is a string
    for key,value in input.items():
        if key in rowdata:
            try: 
                rowdata[key]+=float(value)
            except:
                rowdata[key]=None
        else:
            rowdata[key]=value
        
    return rowdata , count+1

  def merge_accumulators(self, accumulators):
    rowdata, counts = zip(*accumulators)
    
    payload = {}
    
    # Combine all the accumulators 
    for dictionary in rowdata:
        for key,value in dictionary.items():
            if key in payload:
                try:
                    payload[key]+=float(value)
                except:
                    payload[key]=None
            else:
                payload[key]=value    

    return payload, sum(counts)

  def extract_output(self, accumulator):    
    rowdata, count = accumulator
    
    for key,value in rowdata.items():
        try:
            float(value)
            rowdata[key] = rowdata[key]/count
        except:
            pass
    
    return rowdata

with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create plant counts' >> beam.Create(DATA)
      | beam.Map( lambda item: (beam.Row(location=item["location"],name=item["name"]),item) )
      | beam.CombinePerKey(AverageFn2())
      | beam.Map(print))

Hope this helps another Dataflow newbie like myself.