4
votes

I have already uploaded a fat jar containing my application code to the /lib folder of all nodes in my Flink cluster. I am trying to start the Flink job from a separate java application, but can't find a good way to do so.

The closest thing to a solution that I have currently found is the Monitoring Rest API which has a run job API. However, this only allows you to run jobs submitted via the job upload function.

I have seen the ClusterClient.java in the flink-client module, but could not see any examples of how I might use this.

Any examples of how someone has submitted jobs successfully through java code would be greatly appreciated!

2

2 Answers

2
votes

You can use RestClusterClient to run a PackagedProgram which points to your Flink job. If your job accepts some arguments, you can pass them.

Here is an example for a standalone cluster running on localhost:8081 :

// import org.apache.flink.api.common.JobSubmissionResult;
// import org.apache.flink.client.deployment.StandaloneClusterId;
// import org.apache.flink.client.program.PackagedProgram;
// import org.apache.flink.client.program.rest.RestClusterClient;
// import org.apache.flink.configuration.Configuration;
// import org.apache.flink.configuration.JobManagerOptions;
// import org.apache.flink.configuration.RestOptions;

String clusterHost = "localhost";
int clusterPort = 8081;

Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, clusterHost);
config.setInteger(RestOptions.PORT, clusterPort);

String jarFilePath = "/opt/flink/examples/streaming/SocketWindowWordCount.jar";
String[] args = new String[]{ "--port", "9000" };
PackagedProgram packagedProgram = new PackagedProgram(new File(jarFilePath), args);

RestClusterClient<StandaloneClusterId> client =
         new RestClusterClient<StandaloneClusterId>(config, StandaloneClusterId.getInstance());

int parallelism = 1;
JobSubmissionResult result = client.run(packagedProgram,  parallelism);
0
votes

This seems to work for version 1.10

private static final int PARALLELISM = 8;
private static final Configuration FLINK_CONFIG = new Configuration();

void foo() throws Exception {
    FLINK_CONFIG.setString(JobManagerOptions.ADDRESS, "localhost");
    FLINK_CONFIG.setInteger(RestOptions.PORT, 8081);
    FLINK_CONFIG.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 3);

    RestClusterClient<StandaloneClusterId> flinkClient = new RestClusterClient<>(FLINK_CONFIG, StandaloneClusterId.getInstance());

    String jar = "/path/to/jar";
    String[] args = new String[]{"..."};
    PackagedProgram program = PackagedProgram.newBuilder()
            .setJarFile(new File(jar))
            .setArguments(args)
            .build();

    JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, FLINK_CONFIG, PARALLELISM, false);

    JobID jobId = flinkClient.submitJob(jobGraph).get();
    ...
}