1
votes

I have some code reading message from kafka like below:

def main(args: Array[String]): Unit = {
    System.setProperty("java.security.auth.login.config", "someValue")
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val consumerProperties = new Properties()
    consumerProperties.setProperty("security.protocol", "SASL_PLAINTEXT")
    consumerProperties.setProperty("sasl.mechanism", "PLAIN")
    val kafkaConsumer = new FlinkKafkaConsumer011[ObjectNode](consumerProperties.getProperty("topic"), new JsonNodeDeserializationSchema, consumerProperties)

    val stream = env.addSource(kafkaConsumer)
}

When the source try to read message from Apache Kafka, org.apache.kafka.common.security.JaasContext.defaultContext function will load "java.security.auth.login.config" property.

But the property is only set in JobManager, and when my job get running, the property can not load correctly in TaskManager, so the source will fail.

I tried to set extra JVM_OPTS like "-Dxxx=yyy", but the flink cluster is deployed in standalone mode, environment variable can not be changed very often.

Is there any way to set property in TaskManager?

1

1 Answers

1
votes

The file bin/config.sh of Flink standalone cluster holds a property named DEFAULT_ENV_JAVA_OPTS.

Also if you export $JVM_ARGS="your parameters" the file bin/config.sh will load it using these lines:

# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
if [ -z "${JVM_ARGS}" ]; then
    JVM_ARGS=""
fi