1
votes

I am trying to use table and sql api of the flink for a simple example where I read the string from file, convert it to Tuple2 and try to insert it into table. Here is my code.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.table.Table;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class table_streaming_test
{
    public static void main (String[] args) throws Exception
    {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //create execution environment
        StreamTableEnvironment tEnv= StreamTableEnvironment.getTableEnvironment(env); 
        env.setParallelism(1);
        DataStream<String> datastream_in= env.readTextFile("file:/home/rishikesh/new_workspace1/table_streaming/stocks.txt");
         DataStream<Tuple2<String,Integer>> ds=  datastream_in
             .flatMap(new Splitter());  // transformation flatmap
         Table msg=tEnv.fromDataStream(ds).as("symbol,price");
         Table result = msg.select("symbol ='A'");
         DataStream<String> ds2 =tEnv.toDataStream(result, String.class);
         ds2.print();
         env.execute();
    }
public static class Splitter implements FlatMapFunction<String,     Tuple2<String, Integer>> {
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] token= sentence.split(",");
            out.collect(new Tuple2<String, Integer>(token[0],Integer.parseInt(token[1])));
        }
    }
}

Errors are following: (occured at line DataStream<String> ds2 =tEnv.toDataStream(result, String.class); )

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.typeutils.TypeExtractor).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.api.table.Table.<init>(Lorg/apache/flink/api/table/TableEnvironment;Lorg/apache/flink/api/table/plan/logical/LogicalNode;)V
    at org.apache.flink.api.table.StreamTableEnvironment.ingest(StreamTableEnvironment.scala:97)
    at org.apache.flink.api.java.table.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:61)
    at table_streaming_test.main(table_streaming_test.java:87)

Jar files included are:

  1. Flink-dist_2.10-1.1.3.jar
  2. flink-python_2.10-1.1.3.jar
  3. flink-table_2.10-1.1.3.jar
  4. log4j-1.2.17.jar
  5. slf4j-log4j12-1.7.7.jar
  6. JavaSE-1.7

JavaSE-1.7

2
Can you convert the code and the exception from the screenshots into text? There are some parts which are cut off and it would be easier to search and copy. ThanksFabian Hueske
Sure, Fabian. I have edited the post.kadsank
Could you please give an example of stocks.txt ?Yaroslav
What build tool do you use ?Yaroslav

2 Answers

4
votes

One possible cause for error "java.lang.NoSuchMethodError" is when you use different version of flink than what you have installed on your system.

For me, I have Flink 1.4.2 and the version I was using was 1.3.2 . So I updated my pom file to have same version and it worked fine.