0
votes

I am new to Flink. I am trying to use Flink 1.3.2 to read from our Kinesis stream and write the output to a Cassandra table. And the program is able to stream data in from Kinesis.

The problem is that when I do 'CassandraSink.addSink(countsStreaming)', it gives me 'Type mismatch. expected: DataStream[NotInferredIN], actual: DataStream[(String,Long)]'. I have gone through the documentation and source code and noticed that the addSink takes DataStream[IN].

Can someone please help me understand what the 'IN' type is and what to do to get this resolved?

Thanks in advance!

val env = StreamExecutionEnvironment.getExecutionEnvironment
val mapper = new ObjectMapper
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
  "kinesis-stream", new SimpleStringSchema, ConsumerConfig))

//DataStream[(String, Long)]
val countsStreaming: DataStream[(String, Long)] = kinesis.map(x => mapper.readValue(x,classOf[java.util.Map[String,String]]))
  .map(x => x.get("game_name"))
  .map({x => (x,1L) })
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1)

countsStreaming.print()

CassandraSink.addSink(countsStreaming)
  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.setClusterBuilder(new ClusterBuilder() {
  override def buildCluster(builder: Cluster.Builder): Cluster = {
    builder.addContactPoint("0.0.0.0").build()
  }
}).build()

env.execute("StreamingExample")
1
How did you resolve this issue? Any pointers? I am experiencing something similar.neoeahit

1 Answers

0
votes

The problem is that CassandraSink.addSink only accepts Java DataStream.

You need to add .javaStream after scala DataStream then type mismatch should gone.