0
votes

I recently updated to flink 1.10.0 from 1.9.0 and started getting this error when trying to execute a job locally. Surprisingly it works fine from the IDE. Only when i try to run the executable jar from the command line (java -jar) I get this error.

Here It says to add a dependency but I already have that. Any thoughts?

for reference I have:

"org.apache.flink" %% "flink-scala" "1.10.0",
"org.apache.flink" %% "flink-streaming-scala" % "1.10.0",
"org.apache.flink" %% "flink-connector-kafka" % "1.10.0",

-

Exception in thread "main" java.lang.NullPointerException: Cannot find compatible factory for specified execution.target (=local)
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1726)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1634)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:667)
at workflow.task.engineTask.DERFTask.execute(DERFTask.scala:146)

EDITED: I did some debugging and it seems that the "Iterator factories" is empty when running from the command line (as a jar) whereas it's not from the IDE. Hence it never enters the while loop. Weird..

public PipelineExecutorFactory getExecutorFactory(Configuration configuration) {
    Preconditions.checkNotNull(configuration);
    List<PipelineExecutorFactory> compatibleFactories = new ArrayList();
    Iterator factories = defaultLoader.iterator();

    while(factories.hasNext()) {
        try {
            PipelineExecutorFactory factory = (PipelineExecutorFactory)factories.next();
            if (factory != null && factory.isCompatibleWith(configuration)) {
                compatibleFactories.add(factory);
            }
        } catch (Throwable var5) {
            if (!(var5.getCause() instanceof NoClassDefFoundError)) {
                throw var5;
            }

            LOG.info("Could not load factory due to missing dependencies.");
        }
    }

    if (compatibleFactories.size() > 1) {
        String configStr = (String)configuration.toMap().entrySet().stream().map((e) -> {
            return (String)e.getKey() + "=" + (String)e.getValue();
        }).collect(Collectors.joining("\n"));
        throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
    } else {
        return compatibleFactories.isEmpty() ? null : (PipelineExecutorFactory)compatibleFactories.get(0);
    }
2

2 Answers

0
votes

looks like you are missing this dependency :

"org.apache.flink" %% "flink-clients" % "1.10.0"
0
votes

I did some debugging and it seems that the "Iterator factories" is empty when running from the command line (as a jar) whereas it's not from the IDE. Hence it never enters the while loop. Weird..

public PipelineExecutorFactory getExecutorFactory(Configuration configuration) {
    Preconditions.checkNotNull(configuration);
    List<PipelineExecutorFactory> compatibleFactories = new ArrayList();
    Iterator factories = defaultLoader.iterator();

    while(factories.hasNext()) {
        try {
            PipelineExecutorFactory factory = (PipelineExecutorFactory)factories.next();
            if (factory != null && factory.isCompatibleWith(configuration)) {
                compatibleFactories.add(factory);
            }
        } catch (Throwable var5) {
            if (!(var5.getCause() instanceof NoClassDefFoundError)) {
                throw var5;
            }

            LOG.info("Could not load factory due to missing dependencies.");
        }
    }

    if (compatibleFactories.size() > 1) {
        String configStr = (String)configuration.toMap().entrySet().stream().map((e) -> {
            return (String)e.getKey() + "=" + (String)e.getValue();
        }).collect(Collectors.joining("\n"));
        throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
    } else {
        return compatibleFactories.isEmpty() ? null : (PipelineExecutorFactory)compatibleFactories.get(0);
    }