I have some code reading message from kafka like below:
def main(args: Array[String]): Unit = {
System.setProperty("java.security.auth.login.config", "someValue")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val consumerProperties = new Properties()
consumerProperties.setProperty("security.protocol", "SASL_PLAINTEXT")
consumerProperties.setProperty("sasl.mechanism", "PLAIN")
val kafkaConsumer = new FlinkKafkaConsumer011[ObjectNode](consumerProperties.getProperty("topic"), new JsonNodeDeserializationSchema, consumerProperties)
val stream = env.addSource(kafkaConsumer)
}
When the source try to read message from Apache Kafka, org.apache.kafka.common.security.JaasContext.defaultContext function will load "java.security.auth.login.config" property.
But the property is only set in JobManager, and when my job get running, the property can not load correctly in TaskManager, so the source will fail.
I tried to set extra JVM_OPTS like "-Dxxx=yyy", but the flink cluster is deployed in standalone mode, environment variable can not be changed very often.
Is there any way to set property in TaskManager?