Suppose we have multiple data streams and they share some common features.
For example, we have a stream of Teacher and a stream of Student, and they both have an age field. If I want to find out the eldest student or teacher from the realtime stream, I can implement an operator as below.
public MaxiumAgeFunc extends RichMapFunction<Student,Integer> {
int maxAge;
@Override
public void flatMap(Student s, Collector<Integer> collector) throws Exception {
if(s.age > maxAge){
maxAge = s.age;
}
collector.collect(maxAge);
}
}
To find out the eldest Teacher, we need to implement a similar operator as below
public MaxiumAgeFunc extends RichMapFunction<Teacher,Integer> {
int maxAge;
@Override
public void flatMap(Teacher t, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = t.age;
}
collector.collect(maxAge);
}
}
But actually these two operators have common process logic, so my idea is to define a parent class, such as People.
public class People{
public Integer age;
}
Then Student and Teacher can be defined as their child class, and also keep their own fields.
public class Student extends People {
public Integer grade; // student grade
...
}
public class Student extends People {
public Integer subject; // the subject that teacher teaches
...
}
In this case, I can define an operator as below.
public MaxiumAgeFunc extends RichMapFunction<People,Integer> {
int maxAge;
@Override
public void flatMap(People p, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = p.age;
}
collector.collect(maxAge);
}
}
But when I try to use this operator to implement a Flink execution topology, it won't work because of the unmatched data type.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Student> studentStream = env.addSource(...);
DataStream<Teacher> teacherStream = env.addSource(...);
studentStream.map(new MaxiumAgeFunc()).print();
teacherStream.map(new MaxiumAgeFunc()).print();
And this is my question, is it possible to make an abstract operator for input streams having common fields?