0
votes

When I used the sparkstreaming of spark2.4 to consume kafka, I found that my logs outside the foreachRDD method were printed, but the logs inside the foreachRDD were not printed. The log api I am using is log4j which version is 1.2.

I have tried adding
spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties
spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties

to the spark-defaults.properties configuration file, and at the beginning I wrote the wrong path when the log level and log configuration file path error information are printed So the spark.executor.extraJavaOptions and spark.driver.extraJavaOptions configuration came into effect.

2

2 Answers

1
votes

Log inside and outside foreach block is executed on different machines, one is on the driver and the other one is on the executor. So if you want to see log inside foreach block, you can visit yarn for more logs.

-1
votes
<code>
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/vdir/mnt/disk2/hadoop/yarn/local/usercache/root/filecache/494/__spark_libs__3795396964941241866.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    19/01/10 14:17:16 ERROR KafkaSparkStreamingKafkaTests: receive+++++++++++++++++++++++++++++++
</code>

    My code:
<code>
    1.if (args[3].equals("consumer1")) {
                logger.error("receive+++++++++++++++++++++++++++++++");
                SparkSQLService sparkSQLService = new SparkSQLService();
                consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
                sparkSQLService.sparkForwardedToKafka(sparkConf,
                        CONSUMER_TOPIC,
                        PRODUCER_TOPIC,
                        new HashMap<String, Object>((Map) consumerProperties));
    ......
    2.public void sparkForwardedToKafka(SparkConf sparkConf, String consumerTopic, String producerTopic, Map<String, Object> kafkaConsumerParamsMap) {
            sparkConf.registerKryoClasses(new Class[]{SparkSQLService.class, FlatMapFunction.class, JavaPairInputDStream.class, Logger.class});
            JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.milliseconds(DURATION_SECONDS));
            Collection<String> topics = Arrays.asList(consumerTopic);
            JavaInputDStream<ConsumerRecord<String, String>> streams =
                    KafkaUtils.createDirectStream(
                            javaStreamingContext,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.Subscribe(topics, kafkaConsumerParamsMap)
                    );
            if (producerTopic != null) {
                JavaPairDStream<Long, String> messages = streams.mapToPair(record -> new Tuple2<>(record.timestamp(), record.value()));
     messages.foreachRDD(rdd ->
                        {
                            rdd.foreachPartition(partition -> {
                                partition.forEachRemaining(tuple2 -> {
                                    LOGGER.error("****"+tuple2._1+"|"+tuple2._2);
                                    KafkaService.getInstance().send(producerTopic, TaskContext.get().partitionId(), tuple2._1, null, tuple2._2);
                                });
                            });
                        }
                );

</code>

And my logger declared: private static final Logger LOGGER = LoggerFactory.getLogger(SparkSQLService.class);