I have Flink script in java with Kafka connector. I'm getting the data from Kafka with no issues, first step I'm doing a .map to get the time stamp from the messages. To use event time windows i extracted the time stamp in milliseconds from the data and return it to flink. To do this i used "assignTimestampsAndWatermarks"
DataStream<String> kafkaData = env.addSource(new FlinkKafkaConsumer("CorID_0", new SimpleStringSchema(), p));
kafkaData.map(new MapFunction<
String, Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>>()
{
public Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double> map(String value)
{
String[] words = value.split(",");
return new Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>
(words[0], words[1], words[2], words[3], words[4], words[5], Double.parseDouble(words[6]),
Long.parseLong(words[7]), Double.parseDouble(words[8]), Long.parseLong(words[9]),
Long.parseLong(words[10]), Integer.parseInt(words[11]),
Long.parseLong(words[12]), Double.parseDouble(words[13]),
Long.parseLong(words[14]), Double.parseDouble(words[15]),
Double.parseDouble(words[16]), Integer.parseInt(words[17]),
Double.parseDouble(words[18]));
}
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double>>()
{
private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
public long extractAscendingTimestamp(Tuple19<String, String, String, String, String,
String, Double, Long, Double, Long,
Long, Integer, Long, Double, Long,
Double, Double, Integer, Double> value)
{
try
{
Timestamp ts = new Timestamp(sdf.parse(value.f3).getTime());
return ts.getTime();
} catch (Exception e)
{
throw new RuntimeException("Parsing Error");
}
}
});
Second step is where the calculation start. I'm trying to do some manipulation of the data and to do this I need to get the data from the kafka message, this is where i got stuck basically.
DataStream<String> largeDelta = kafkaData .keyBy(new KeySelector<Tuple19<String,String,String,String,String,
String,Double,Long,Double,Long,
Long,Integer,Long,Double,Long,
Double,Double, Integer,Double>, String>()
{
public String getKey(Tuple19<String,String,String,String,String,
String,Double,Long,Double,Long,
Long,Integer,Long,Double,Long,
Double,Double, Integer,Double> value)
{
return value.f2;
}
})
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new TrackChanges(5));
largeDelta.writeAsText("/Alert.txt");
env.execute("ABCD");
The problem is that i have an error message that is telling me "cannot resolve method 'KeyBy(anonymous org.apache.flink.api.java.functions....'
Any help would be really welcome as Im struggling to understand what I'm missing.
Thanks
