4
votes

I am writing a simple word count flink job but I keep getting this error:

could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error]  .flatMap{_.toLowerCase.split("\\W+") filter {_.nonEmpty}}

I searched the net but could not get any comprehensible answer.

Here is my code:

object Job {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream = env.readTextFile("file:///home/plivo/code/flink/scala/flinkstream/test/")

    val count = dataStream
                .flatMap{_.toLowerCase.split("\\W+") filter {_.nonEmpty}}
                .map{ (_,1) }
                .groupBy(0)
                .sum(1)


    dataStream.print()
    env.execute("Flink Scala API Skeleton")
    }
}
2
Try the answer to this question, it might help you too: stackoverflow.com/questions/29540121/…richj
I have imported all the necessary libraries including flink.api.scala._ and flink.streaming.api.scala._sidd607
The problem is that there is no groupBy(...) method on a DataStream[(String, Int)] in flink (version 1.0.3). There is a keyBy(Int) method that will produce a KeyedStream[(String, Int), Tuple].richj
Could you try to remove the import flink.api.scala._ because the streaming as well as the batch scala package object import createTypeInformation. So these imports might clash.Till Rohrmann

2 Answers

3
votes

You have to import

import org.apache.flink.api.scala._

to enable implicit conversion instead of creating implicit value for each type that you use.

1
votes

Adding this: implicit val typeInfo = TypeInformation.of(classOf[(String)]) as the first line in def main(args: Array[String]) {...} fixed it for me.

object Job {
  def main(args: Array[String]) {
    implicit val typeInfo = TypeInformation.of(classOf[(String)]) //Add this here
    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream = env.readTextFile("file:///home/plivo/code/flink/scala/flinkstream/test/")

    val count = dataStream
                .flatMap{_.toLowerCase.split("\\W+") filter {_.nonEmpty}}
                .map{ (_,1) }
                .groupBy(0)
                .sum(1)


    dataStream.print()
    env.execute("Flink Scala API Skeleton")
    }
}