0
votes

I am launching Flink Jobs in EMR from AWS Console (using Add Step) and would like to pass JVM Arguments. As per the Flink Documentation we should be passing the args as below.

flink run -m yarn-cluster -p 2 /home/hadoop/flink-0.0.1-SNAPSHOT.jar -yD env.java.opts="-Dspring.profiles.active=dev -Djob.name=Flink_Job_1 -Dkafka.brokers:<>:9092" -yid application_1591030260237_0047

I am having two issues

  1. JVM arguments are not available to flink-0.0.1-SNAPSHOT.jar

  2. Even though we are providing -yid these jobs are not submitted to the Yarn Application that is passed.

Here is what i am getting when i list the Yarn Applications

application_1591030260237_0047  Flink session cluster           Apache Flink          root         default                 RUNNING               UNDEFINED                 100% http://<>:50100
application_1591030260237_0061  Flink per-job cluster           Apache Flink        hadoop         default                 RUNNING               UNDEFINED                 100% http://<>:50100
application_1591030260237_0060  Flink per-job cluster           Apache Flink        hadoop         default                 RUNNING               UNDEFINED                 100% http://<>:50100
application_1591030260237_0062  Flink per-job cluster           Apache Flink        hadoop         default                 RUNNING               UNDEFINED                 100% http://<>:50101

Thanks Sateesh

1

1 Answers

0
votes

I wrote a simple Test App using Spring and Flink and trying to see whether correct profile is getting loaded. Looks like the spring.active.profiles is not getting passed.

@SpringBootApplication
public class ConsoleApplication implements ApplicationRunner {

    @Autowired
    Environment env;

    //private static final Logger logger = LoggerFactory.getLogger(ConsoleApplication.class);

    public static void main(String[] args) {
        System.out.println("===================== STARTING THE APPLICATION ====================== ");
        System.out.println(".run() : No.of arguments ="+args.length);
        Arrays.stream(args).forEach(arg->{
            System.out.println(".main() : Argument={}"+arg);
        });

        Map<String,String> envPropsMap = System.getenv();
        System.out.println(".main() : No.of Environment Entries =["+envPropsMap.size()+"]");
        Iterator<String> iter = envPropsMap.keySet().iterator();
        String key = null;
        while(iter.hasNext()){
            key = iter.next();
            System.out.println(".main() : Environment Key =["+key+"] Value =["+envPropsMap.get(key)+"]");
        }

        Properties props = System.getProperties();
        System.out.println(".main() : No.of Props Entries =["+props.size()+"]");
        Enumeration enumProps = props.propertyNames();
        String prop = null;
        while(enumProps.hasMoreElements()){
            prop = (String)enumProps.nextElement();
            System.out.println(".main() : Properties Key =["+prop+"] Value =["+System.getProperty(prop)+"]");
        }


        try {
            /*
            new SpringApplicationBuilder(ConsoleApplication.class)
                    .profiles("prifile1")
                    .run(args);*/
            ApplicationArguments arguments = new DefaultApplicationArguments(args);
            ConfigurableApplicationContext context = SpringApplication.run(ConsoleApplication.class, args);
            context.close();
        }
        catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("APPLICATION FINISHED");
        System.out.println(".main() : Before Calling System.exit(0)");
        System.exit(0);
        System.out.println(".main() : After Calling System.exit(0)");
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {

        String environment = null;
        Set<String> headerNamesList = new HashSet<String>();
        System.out.println("No.of Options =["+args.getOptionNames().size()+"]");
        args.getOptionNames().stream().forEach(optionName -> {
            System.out.println("Option Name =["+optionName+"] Value=["+args.getOptionValues(optionName)+"]");
        });
        try {
            if (env.getActiveProfiles().length > 0) {
                environment = env.getActiveProfiles()[0];
                System.out.println(".run() : Environment ={}"+ environment);
            }
        }
        catch(Exception e) {
            System.err.println(".run() : Exception in run() :{}"+e);
        }
    }
}

And i am running the Flink Job using the following in EMR.

flink run -m yarn-cluster -p 1 -yid application_1591030260237_0077 -yD env.java.opts= "-Dspring.profiles.active=dev -Djob.name=Flink_JVM_Args_Test" -yD env.java.opts.client="-Dattr1=val1 -Dattr2 =val2" -yD env.java.opts.jobmanager="-Dattr_jm1=val1 -Dattr_jm2 =val2" -yD env.java.opts.taskmanager="-Dattr_tm1=val1 -Dattr_tm2 =val2" /home/hadoop/flink-test-0.0.1-SNAPSHOT.jar

And here is what i am getting in the logs

2020-06-25 01:40:52,549 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Dynamic Property set: env.java.opts.taskmanager=-Dattr_tm1=val1 -Dattr_tm2 =val2
2020-06-25 01:40:52,550 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Dynamic Property set: env.java.opts.client=-Dattr1=val1 -Dattr2 =val2
2020-06-25 01:40:52,550 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Dynamic Property set: env.java.opts=
2020-06-25 01:40:52,550 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Dynamic Property set: env.java.opts.jobmanager=-Dattr_jm1=val1 -Dattr_jm2 =val2
===================== STARTING THE APPLICATION ====================== 
.run() : No.of arguments =0
.main() : No.of Environment Entries =[46]
.main() : Environment Key =[PATH] Value =[/usr/lib64/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin]
.main() : Environment Key =[HADOOP_CONF_DIR] Value =[/etc/hadoop/conf]
.main() : Environment Key =[HISTCONTROL] Value =[ignoredups]
.main() : Environment Key =[JAVA_LIBRARY_PATH] Value =[:/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native]
.main() : Environment Key =[HADOOP_ROOT_LOGGER] Value =[INFO,DRFA]
.main() : Environment Key =[HISTSIZE] Value =[1000]
.main() : Environment Key =[JAVA_HOME] Value =[/etc/alternatives/jre]
.main() : Environment Key =[AWS_DEFAULT_REGION] Value =[us-gov-west-1]
.main() : Environment Key =[TEZ_CONF_DIR] Value =[/etc/tez/conf]
.main() : Environment Key =[FLINK_PLUGINS_DIR] Value =[/usr/lib/flink/plugins]
.main() : Environment Key =[TEZ_JARS] Value =[/usr/lib/tez]
.main() : Environment Key =[LANG] Value =[en_US.UTF-8]
.main() : Environment Key =[FLINK_LOG_DIR] Value =[/var/log/flink-cli]
.main() : Environment Key =[HADOOP_LIBEXEC_DIR] Value =[///usr/lib/hadoop/libexec]
.main() : Environment Key =[HADOOP_DATANODE_HEAPSIZE] Value =[1761]
.main() : Environment Key =[HADOOP_CLASSPATH] Value =[:/etc/tez/conf:/usr/lib/tez/*:/usr/lib/tez/lib/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*]
.main() : Environment Key =[MAIL] Value =[/var/spool/mail/hadoop]
.main() : Environment Key =[LD_LIBRARY_PATH] Value =[::/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native]
.main() : Environment Key =[FLINK_CONF_DIR] Value =[/etc/flink/conf]
.main() : Environment Key =[LOGNAME] Value =[hadoop]
.main() : Environment Key =[PWD] Value =[/mnt/var/lib/hadoop/steps/s-1EJF1Q5ZR7TEU]
.main() : Environment Key =[HADOOP_CLIENT_OPTS] Value =[-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-1EJF1Q5ZR7TEU/tmp]
.main() : Environment Key =[HADOOP_PREFIX] Value =[/usr/lib/hadoop]
.main() : Environment Key =[FLINK_OPT_DIR] Value =[/usr/lib/flink/opt]
.main() : Environment Key =[LESSOPEN] Value =[||/usr/bin/lesspipe.sh %s]
.main() : Environment Key =[YARN_CONF_DIR] Value =[/etc/hadoop/conf]
.main() : Environment Key =[FLINK_HOME] Value =[/usr/lib/flink]
.main() : Environment Key =[QTINC] Value =[/usr/lib64/qt-3.3/include]
    .main() : Environment Key =[HADOOP_OPTS] Value =[ -server -XX:OnOutOfMemoryError='kill -9 %p' -Dhadoop.log.dir=/mnt/var/log/hadoop/steps/s-1EJF1Q5ZR7TEU -Dhadoop.log.file=syslog -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str=hadoop -Dhadoop.root.logger=INFO,DRFA -Djava.library.path=:/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-1EJF1Q5ZR7TEU/tmp -Dhadoop.security.logger=INFO,NullAppender -Dsun.net.inetaddr.ttl=30]
    .main() : Environment Key =[HADOOP_NAMENODE_HEAPSIZE] Value =[6758]
    .main() : Environment Key =[FLINK_LIB_DIR] Value =[/usr/lib/flink/lib]
    .main() : Environment Key =[HADOOP_JOB_HISTORYSERVER_HEAPSIZE] Value =[3399]
    .main() : Environment Key =[USER] Value =[hadoop]
    .main() : Environment Key =[CLASSPATH] Value =[/etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/.//*::/etc/tez/conf:/usr/lib/tez/*:/usr/lib/tez/lib/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*]
    .main() : Environment Key =[HADOOP_LOGFILE] Value =[syslog]
    .main() : Environment Key =[HOSTNAME] Value =[ip-10-22-27-251]
    .main() : Environment Key =[QTDIR] Value =[/usr/lib64/qt-3.3]
    .main() : Environment Key =[HADOOP_HOME] Value =[/usr/lib/hadoop]
    .main() : Environment Key =[HADOOP_LOG_DIR] Value =[/mnt/var/log/hadoop/steps/s-1EJF1Q5ZR7TEU]
    .main() : Environment Key =[EMR_STEP_ID] Value =[s-1EJF1Q5ZR7TEU]
    .main() : Environment Key =[QTLIB] Value =[/usr/lib64/qt-3.3/lib]
    .main() : Environment Key =[HOME] Value =[/home/hadoop]
    .main() : Environment Key =[SHLVL] Value =[2]
    .main() : Environment Key =[FLINK_BIN_DIR] Value =[/usr/lib/flink/bin]
    .main() : Environment Key =[HADOOP_IDENT_STRING] Value =[hadoop]
    .main() : Environment Key =[MALLOC_ARENA_MAX] Value =[4]
    .main() : No.of Props Entries =[56]
    .main() : Properties Key =[zookeeper.sasl.client] Value =[true]
    .main() : Properties Key =[java.runtime.name] Value =[OpenJDK Runtime Environment]
    .main() : Properties Key =[sun.boot.library.path] Value =[/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/amd64]
    .main() : Properties Key =[java.vm.version] Value =[25.252-b09]
    .main() : Properties Key =[java.vm.vendor] Value =[Amazon.com Inc.]
    .main() : Properties Key =[java.vendor.url] Value =[https://aws.amazon.com/corretto/]
    .main() : Properties Key =[path.separator] Value =[:]
    .main() : Properties Key =[java.vm.name] Value =[OpenJDK 64-Bit Server VM]
    .main() : Properties Key =[file.encoding.pkg] Value =[sun.io]
    .main() : Properties Key =[user.country] Value =[US]
    .main() : Properties Key =[sun.java.launcher] Value =[SUN_STANDARD]
    .main() : Properties Key =[sun.os.patch.level] Value =[unknown]
    .main() : Properties Key =[java.vm.specification.name] Value =[Java Virtual Machine Specification]
    .main() : Properties Key =[user.dir] Value =[/mnt/var/lib/hadoop/steps/s-1EJF1Q5ZR7TEU]
    .main() : Properties Key =[java.runtime.version] Value =[1.8.0_252-b09]
    .main() : Properties Key =[java.awt.graphicsenv] Value =[sun.awt.X11GraphicsEnvironment]
    .main() : Properties Key =[java.endorsed.dirs] Value =[/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/endorsed]
    .main() : Properties Key =[os.arch] Value =[amd64]
    .main() : Properties Key =[java.io.tmpdir] Value =[/tmp]
    .main() : Properties Key =[line.separator] Value =[]
    .main() : Properties Key =[java.vm.specification.vendor] Value =[Oracle Corporation]
    .main() : Properties Key =[os.name] Value =[Linux]
    .main() : Properties Key =[log4j.configuration] Value =[file:/etc/flink/conf/log4j-cli.properties]
    .main() : Properties Key =[sun.jnu.encoding] Value =[UTF-8]
    .main() : Properties Key =[java.library.path] Value =[::/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib]
    .main() : Properties Key =[java.specification.name] Value =[Java Platform API Specification]
    .main() : Properties Key =[java.class.version] Value =[52.0]
    .main() : Properties Key =[sun.management.compiler] Value =[HotSpot 64-Bit Tiered Compilers]
    .main() : Properties Key =[os.version] Value =[4.14.173-137.229.amzn2.x86_64]
    .main() : Properties Key =[user.home] Value =[/home/hadoop]
    .main() : Properties Key =[user.timezone] Value =[UTC]
    .main() : Properties Key =[java.awt.printerjob] Value =[sun.print.PSPrinterJob]
    .main() : Properties Key =[file.encoding] Value =[UTF-8]
    .main() : Properties Key =[java.specification.version] Value =[1.8]
    .main() : Properties Key =[user.name] Value =[hadoop]
.main() : Properties Key =[log.file] Value =[/var/log/flink-cli/flink-hadoop-client-ip-10-22-27-251.log]
.main() : Properties Key =[java.vm.specification.version] Value =[1.8]
.main() : Properties Key =[sun.arch.data.model] Value =[64]
.main() : Properties Key =[java.home] Value =[/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre]
.main() : Properties Key =[sun.java.command] Value =[org.apache.flink.client.cli.CliFrontend run -m yarn-cluster -p 1 -yid application_1591030260237_0077 -yD env.java.opts= -Dspring.profiles.active=dev -Djob.name=Flink_JVM_Args_Test -yD env.java.opts.client=-Dattr1=val1 -Dattr2 =val2 -yD env.java.opts.jobmanager=-Dattr_jm1=val1 -Dattr_jm2 =val2 -yD env.java.opts.taskmanager=-Dattr_tm1=val1 -Dattr_tm2 =val2 /home/hadoop/swim-flink-test-0.0.1-SNAPSHOT.jar]
.main() : Properties Key =[java.specification.vendor] Value =[Oracle Corporation]
.main() : Properties Key =[user.language] Value =[en]
.main() : Properties Key =[awt.toolkit] Value =[sun.awt.X11.XToolkit]
.main() : Properties Key =[java.vm.info] Value =[mixed mode]
.main() : Properties Key =[java.version] Value =[1.8.0_252]
.main() : Properties Key =[java.ext.dirs] Value =[/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/ext:/usr/java/packages/lib/ext]
.main() : Properties Key =[sun.boot.class.path] Value =[/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/rt.jar:/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/sunrsasign.jar:/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/jfr.jar:/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/classes]
.main() : Properties Key =[java.vendor] Value =[Amazon.com Inc.]
.main() : Properties Key =[logback.configurationFile] Value =[file:/etc/flink/conf/logback.xml]
.main() : Properties Key =[java.security.auth.login.config] Value =[/tmp/jaas-4831190358920038586.conf]
.main() : Properties Key =[file.separator] Value =[/]
.main() : Properties Key =[java.vendor.url.bug] Value =[https://github.com/corretto/corretto-8/issues/]
.main() : Properties Key =[sun.cpu.endian] Value =[little]
.main() : Properties Key =[sun.io.unicode.encoding] Value =[UnicodeLittle]
.main() : Properties Key =[sun.cpu.isalist] Value =[]

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::       (v0.0.1-SNAPSHOT)

env.java.opts =[null]
.run() : Environment ={}local
APPLICATION FINISHED
.main() : Before Calling System.exit(0)

As you can see the only way i can get the info is i need to parse the System Property "sun.java.command" and extract the profile myself !!

Note: I have change all my logger statements with System.out as i cannot find the logger output in the log files. Haven't spent much time yet to figure out why it is not writing to log files.

Thanks Sateesh