0
votes

I've already trawled the web for answers here, and I cannot get anything to work, so maybe somebody has a fresh perspective.

  • I'm trying to write logs to a Kafka topic from inside an Apache Spark 2.2 application.
  • Because Spark still uses Log4j v1, I have to try and get the v1 Kafka appender to work, instead of being able to use the default Kafka appender provided with Log4j v2.
  • I can do this in a little demo app running via IntelliJ, using the following library (from build.sbt):

    // Old version of Kafka needed for v1 Log4j appender libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.2"

  • But I cannot find a way to get this to run via e.g. spark-shell or spark-submit.

  • I can configure the appender in Spark's log4j.properties using the same settings as in my dummy app.

  • But when the Spark shell starts up, it seems it fires up the logger before it loads any extra JARs, then throws an error immediately because it can't find the Kafka appender:

    log4j:ERROR Could not instantiate class [kafka.producer.KafkaLog4jAppender]. java.lang.ClassNotFoundException: kafka.producer.KafkaLog4jAppender

  • I have tried all kinds of options, in the Spark config files or on the CLI, to get the JARs to load up first e.g. --jars, --files, --driver-class-path, setting spark.driver.extraClassPath and spark.executor.extraClassPath in spark-default.conf, etc etc.

Nothing seems to work, so has anybody ever got this to work i.e. Spark 2.2. logging to Kafka via Log4j, and if so, can they suggest the right config options to allow me to do this?

By the way, there are several similar questions here on SO, but none of them has solved the problem for me, so please don't mark this as a duplicate.

Thanks for any tips you can offer!

1
Have you tried to assembly your application with sbt into an uber-jar and then submit ?eliasah
I need my application logging to be compatible with Apache Spark's logging, rather than bundling my own logging infrastructure with the app. Spark is supposed to be able to do this, I just can't figure out how.Chris Webster
In case it helps,I can load the log appender JARs like this: spark-shell --jars ./kafka_2.11-0.8.2.2.jar,./kafka-clients-0.8.2.2.jar . I can then import the kafka.producer classes in the shell later on, but I still get the same NoClassDefFound error when the shell first tries to start the logger, because at that point it has not yet loaded the JARs.Chris Webster
Did you manage to make it work?Ross Brigoli

1 Answers

1
votes

kafka-log4j-appender with Spark

I managed to use spark-submit 2.1.1 in cluster mode with kafka-log4j-appender 2.3.0, but I believe other versions will behave similarly.


Preparation

First of all, I think it is really helpful to read the logs so you need to be able to read both application yarn logs and spark-submit informations. Sometimes when the application hanged in ACCEPT phase (because of kafka producer missconfiguration) it was necessary to read the logs from the Hadoop Yarn application overview.

So whenever I was starting my app, it was very important to grab

19/08/01 10:52:46 INFO yarn.Client: Application report for application_1564028288963_2380 (state: RUNNING)

line and download all the logs from YARN when it was completed

yarn logs -applicationId application_1564028288963_2380

Ok, lets try!


Provide kafka-log4j-appender for Spark

Basically, spark is missing kafka-log4j-appender.

Generally, you should be able to provide kafka-log4j-appender in your fat jar. I had some previous experience with similar problem where it does not work. Simply because in a cluster environment your classpath is overridden by Spark. So if it does not work for you either, move on.

Option A. Manually download jars:

kafka-log4j-appender-2.3.0.jar
kafka-clients-2.3.0.jar

You actually need both, because appender won't work without clients.
Place them on the same machine you fire spark-submit from.
The benefit is, that you can name them as you like.

Now for client mode

JARS='/absolute/path/kafka-log4j-appender-2.3.0.jar,/absolute/path/kafka-clients-2.3.0.jar'
JARS_CLP='/absolute/path/kafka-log4j-appender-2.3.0.jar:/absolute/path/kafka-clients-2.3.0.jar'
JARS_NAMES='kafka-log4j-appender-2.3.0.jar:kafka-clients-2.3.0.jar'

spark-submit \
    --deploy-mode client \
    --jars "$JARS"
    --conf "spark.driver.extraClassPath=$JARS_CLP" \
    --conf "spark.executor.extraClassPath=$JARS_NAMES" \

Or for cluster mode

spark-submit \
    --deploy-mode cluster \
    --jars "$JARS"
    --conf "spark.driver.extraClassPath=$JARS_NAMES" \
    --conf "spark.executor.extraClassPath=$JARS_NAMES" \

Option B. Use --packages to download jars from maven:

I think this is more convenient, but you have to get the name precisely.

You need to look for those kinds of lines during run:

19/11/15 19:44:08 INFO yarn.Client: Uploading resource file:/srv/cortb/home/atais/.ivy2/jars/org.apache.kafka_kafka-log4j-appender-2.3.0.jar -> hdfs:///user/atais/.sparkStaging/application_1569430771458_10776/org.apache.kafka_kafka-log4j-appender-2.3.0.jar
19/11/15 19:44:08 INFO yarn.Client: Uploading resource file:/srv/cortb/home/atais/.ivy2/jars/org.apache.kafka_kafka-clients-2.3.0.jar -> hdfs:///user/atais/.sparkStaging/application_1569430771458_10776/org.apache.kafka_kafka-clients-2.3.0.jar

and note down how the jars are called inside application_1569430771458_10776 folder on hdfs.

Now for client mode

JARS_CLP='/srv/cortb/home/atais/.ivy2/jars/org.apache.kafka_kafka-log4j-appender-2.3.0.jar:/srv/cortb/home/atais/.ivy2/jars/org.apache.kafka_kafka-clients-2.3.0.jar'
KAFKA_JARS='org.apache.kafka_kafka-log4j-appender-2.3.0.jar:org.apache.kafka_kafka-clients-2.3.0.jar'

spark-submit \
    --deploy-mode client \
    --packages "org.apache.kafka:kafka-log4j-appender:2.3.0"
    --conf "spark.driver.extraClassPath=$JARS_CLP" \
    --conf "spark.executor.extraClassPath=$KAFKA_JARS" \

Or for cluster mode

spark-submit \
    --deploy-mode cluster \
    --packages "org.apache.kafka:kafka-log4j-appender:2.3.0"
    --conf "spark.driver.extraClassPath=$KAFKA_JARS" \
    --conf "spark.executor.extraClassPath=$KAFKA_JARS" \

The above should work already

Extra steps

If you want to provide your logging.proprietes, follow my tutorial on that here: https://stackoverflow.com/a/55596389/1549135