0
votes

I have the following situation in an Apache Flink project.

3 streams with different objects, like

Person -> String id, String firstName, String lastName (i.e. 101, John, Doe)

PersonDetail -> String id, String address, String city, String phoneNumber, long personId (i.e. 99, Stefansplatz 1, +43066012345678, 101)

PersonAddDetail -> String id, String AddDetailType, object AddDetailValue, long personId (i.e. 77, 1, Hansi or 78, 2, 1234 or 80, 3, true)

I'd like to aggregate (not sure if this is the right wording here) objects from these streams to a new object that I put to a new stream. The aggregation should be based on the Person id and as additional catch I need to filter out PersonAddDetail only with specific AddDetailType (let's say I'm only interested in objects with type 1 and 2).

The aggregated object should look somehow like

PersonReport -> long id, String firstName, String lastName, String address, String city, String phoneNumber, ArrayList< PersonAddDetail > details

The question now is if this is possible at all and if yes how can I accomplish it. Every input welcome.

2
Please show your code.4castle
Err, without posting an actual answer with code : I guess you can map these streams to 3 stream<PersonReport>, concat them, group by id, en then reduce/collect them into complete PersonReport.Jeremy Grand
I'm right now starting with that and only have some code snippets from a colleague who had some first tries on that and who is basically new to Java as well. So I suppose the code that I right now have won't help you at all. The question also was more about how to generally approach that so I'll give a shot a Jeremy's proposalhecko84

2 Answers

0
votes

Your problem sounds like a join operation. You could do something like:

personDataStream.join(personDetailDataStream).where(new KeySelector<Person, Long>() {
    ...
}).equalTo(new KeySelector<PersonDetail, Long>() {
    ...
}).window(TumblingEventTimeWindows.of(Time.seconds(2))).apply(new JoinFunction<Person, PersonDetail, PersonWithDetail>() {
   ...
});

Note that in general join operation is impossible on unbounded(infinite) collections so you need to bound it into windows.

0
votes

Thanks to @Jeremy Grand comment I came up with a solution by myself and I'd like to share my thoughts and code. I introduced a new class called PersonContainer

public class PersonContainer {

private String id;

private Person person;
private PersonDetail personDetail;
private List<PersonAddDetail> personAddDetailList = new ArrayList<>();

public PersonContainer(Person person) {
  this.id = person.getID();
  this.person = person;
}



public PersonContainer(PersonDetail personDetail) {
    this.id = personDetail.getOTTRID();
    this.personDetail = personDetail;
  }

  public PersonContainer(PersonAddDetail personAddDetail) {
    this.id = personAddDetail.getOTTRID();
    this.timeStamp = ttrDetailAddEvent.getDATECREATED();
    this.personAddDetailList.add(personAddDetail);
  }

  public PersonContainer merge(PersonContainer other) {
    if (other.person != null) {
      this.person = other.person;
      return this;
    }
    if (other.personDetail != null) {
      this.personDetail = other.personDetail;
      return this;
    }
    if (other.personAddDetailList.size() > 0) {
      this.personAddDetailList.addAll(other.personAddDetailList);
      return this;
    }
    return null;
  }

  public String getId() {
    return id;
  }

  public Person getPerson() {
    return person;
  }

  public PersonDetail getPersonDetail() {
    return personDetail;
  }

  public List<PersonAddDetail> getPersonAddDetailList() {
    return PersonAddDetailList;
  }

  public boolean isComplete() {
    return person != null && personDetail != null && personAddDetailList.size() > 1;
  }
}

That's important part as I'm going to map the objects of the three input streams to this common object at first to union the streams afterwards.

So here is what I do, I described the single steps in the comments. In short I map the three input streams to new streams of the newly introduced container. Then I do a union on the three streams and use the iterate pattern to key these objects and merge them using my custom merge method. Finally I have defined a custom complete method to differ fully merged containers that are in the end mapped to the output and not done yet containers that are fed back into the merge process.

//Filter PersonAddDetail to have just the types needed
DataStream<PersonContainer> filteredPersonAddDetail = unfilteredPersonAddDetail.filter(new FilterFunction<OboTtrDetailAddEvent>() {
      @Override
      public boolean filter(PersonAddDetail personAddDetail) throws Exception {
        return personAddDetail.getAddDetailType().matches("1|2");
      }
    });

//map Person stream to common object
DataStream<PersonContainer> mappedPersonStream = personInputStream.map(new MapFunction<Person, PersonContainer>() {
  @Override
  public PersonContainer map(Person Person) throws Exception {
    return new PersonContainer(Person);
  }
});

//map PersonDetail stream to common object
DataStream<PersonContainer> mappedPersonDetailStream = personDetailInputStream.map(new MapFunction<PersonDetail, PersonContainer>() {
  @Override
  public PersonContainer map(PersonDetail PersonDetail) throws Exception {
    return new PersonContainer(PersonDetail);
  }
});

//map PersonAddDetail stream to common object
DataStream<PersonContainer> mappedPersonAddDetailStream = filteredPersonAddDetail.map(new MapFunction<PersonAddDetail, PersonContainer>() {
  @Override
  public PersonContainer map(PersonAddDetail PersonAddDetail) throws Exception {
    return new PersonContainer(PersonAddDetail);
  }
});

//union the three input streams to one single stream
DataStream<PersonContainer> combinedInput = mappedPersonStream.union(mappedPersonDetailStream, mappedPersonAddDetailStream);

// Iteration pattern is in place here and I'm going to recursively try to merge corresponding objects togehter
IterativeStream<PersonContainer> iteration = combinedInput.iterate();

// Group objects by there shared ID and then use reduce to merge them
DataStream<PersonContainer> iterationBody = iteration.keyBy(new KeySelector<PersonContainer, String>() {
  @Override
  public String getKey(PersonContainer personContainer) throws Exception {
    return personContainer.getId();
  }
})
    .reduce(new ReduceFunction<PersonContainer>() {
      @Override
      public PersonContainer reduce(PersonContainer personContainer, PersonContainer other) throws Exception {
        return personContainer.merge(other);
      }
    });

// use the containers complete method to check whether the merge is finished or we need to wait for further objects in the stream   
DataStream<PersonContainer> containersNotCompleteYet = iterationBody.filter(new FilterFunction<PersonContainer>() {
  @Override
  public boolean filter(PersonContainer PersonContainer) throws Exception {
    return !personContainer.isComplete();
  }
});

// partially merged or not merged at all containers are put back on the stream
iteration.closeWith(containersNotCompleteYet);

// fully merged containers are processed further
DataStream<PersonContainer> completeContainers = iterationBody.filter(new FilterFunction<PersonContainer>() {
  @Override
  public boolean filter(PersonContainer PersonContainer) throws Exception {
    return personContainer.isComplete();
  }
});

// finally the container is mapped to the correct output object
DataStream<PersonReport> personReport = completeContainers.map(new MapFunction<PersonContainer, PersonReport>() {
  @Override
  public PersonReport map(PersonContainer personContainer) throws Exception {

    // map personContainer to final PersonReport

    return personContainer;
  }
});

This approach is working for me, good thing is that I can handle objects that arrive late on the stream (let's say i.e. a PersonAddDetail comes in some minutes after the other objects) and I don't need to define some sort of windowing. Thanks for the input anyway