2
votes

I am testing my spark streaming application, and I have multiple functions in my code: - some of them operate on a DStream[RDD[XXX]], some of them on RDD[XXX] (after I do DStream.foreachRDD).

I use Kafka log4j appender to log business cases that occur within my functions, that operate on both DStream[RDD] & RDD it self.

But data gets appended to Kafka only when from functions that operate on RDD -> it doesn't work when I want to append data to kafka from my functions that operate on DStream.

Does anyone know reason to this behaviour?

I am working on a single virtual machine, where I have Spark & Kafka. I submit applications using spark submit.

EDITED

Actually I have figured out the part of the problem. Data gets appended to Kafka only from the part of the code that is in my main function. All the code that Is outside of my main, doesnt write data to kafka.

In main I declared the logger like this:

val kafkaLogger = org.apache.log4j.LogManager.getLogger("kafkaLogger")

While outside of my main, I had to declare it like:

@transient lazy val kafkaLogger = org.apache.log4j.LogManager.getLogger("kafkaLogger")

in order to avoid serialization issues.

The reason might be behind JVM serialization concept, or simply because workers don't see the log4j configuration file (but my log4j file is in my source code, in resource folder)

Edited 2

I have tried in many ways to send log4j file to executors but not working. I tried:

  • sending log4j file in --files command of spark-submit

  • setting: --conf "spark.executor.extraJavaOptions =-Dlog4j.configuration=file:/home/vagrant/log4j.properties" in spark-submit

  • setting log4j.properties file in --driver-class-path of spark-submit...

None of this option worked.

Anyone has the solution? I do not see any errors in my error log..

Thank you

1
You should definitely post some code so we can help.Yuval Itzchakov
@YuvalItzchakov , I have just edited the questionSrdjan Nikitovic

1 Answers

0
votes

I think you are close..first you want to make sure all the files are exported to the WORKING DIRECTORY (not CLASSPATH) on all nodes using --files flag. And then you want to reference these files to extracClassPath option of executors and driver. I have attached the following command, hope it helps. Key is to understand once the files are exported, all the files can be accessed on the node using just file name of the working directory (and not url path).

Note: Putting log4j file in the resources folder will not work. (at least when i had tried, it didnt.)

sudo -u hdfs spark-submit --class "SampleAppMain" --master yarn --deploy-mode cluster --verbose --files file:///path/to/custom-log4j.properties,hdfs:///path/to/jar/kafka-log4j-appender-0.9.0.0.jar --conf "spark.driver.extraClassPath=kafka-log4j-appender-0.9.0.0.jar" --conf "spark.executor.extraClassPath=kafka-log4j-appender-0.9.0.0.jar"  --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=custom-log4j.properties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=custom-log4j.properties"  /path/to/your/jar/SampleApp-assembly-1.0.jar