4
votes

we are working on a application consuming kafka messages using spark streaming ..running on a hadoop/yarn spark cluster ....i have log4j properties confiured both on driver and the worker's ...but i still don't see the log messages inside the foreachRDD ..i do see the "start for each rdd" and "end of for each rdd"

val broadcaseLme=sc.broadcast(lme)
logInfo("start for each rdd: ")
val lines: DStream[MetricTypes.InputStreamType] = myConsumer.createDefaultStream()  
             lines.foreachRDD(rdd => {
            if ((rdd != null) && (rdd.count() > 0) && (!rdd.isEmpty()) ) {
              **logInfo("filteredLines: " + rdd.count())**
              **logInfo("start loop")**
              rdd.foreach{x => 
                 val lme = broadcastLme.value    
                 lme.aParser(x).get
                  }
              logInfo("end loop")
            }   })

         logInfo("end of for each rdd ")

              lines.print(10)

I'm running the application on the cluster using this

spark-submit --verbose --class DevMain --master yarn-cluster --deploy-mode cluster --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.p‌​roperties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j‌​.properties" --files "hdfs://hdfs-name-node:8020/user/hadoopuser/log4j.properties‌​" hdfs://hdfs-name-node:8020/user/hadoopuser/streaming_2.10-1.‌​0.0-SNAPSHOT.jar hdfs://hdfs-name-node:8020/user/hadoopuser/enriched.properti‌​es

I'm new to spark could some one please help why i'm not seeing the log messages inside the foreachrdd this is the log4j.properties

log4j.rootLogger=WARN, rolling

log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.layout.conversionPattern=[%p] %d %c %M - %m%n
log4j.appender.rolling.maxFileSize=100MB
log4j.appender.rolling.maxBackupIndex=10
log4j.appender.rolling.file=${spark.yarn.app.container.log.dir}/titanium-spark-enriched.log
#log4j.appender.rolling.encoding=URF-8

log4j.logger.org.apache,spark=WARN
log4j.logger.org.eclipse.jetty=WARN

log4j.logger.com.x1.projectname=INFO

#log4j.appender.console=org.apache.log4j.ConsoleAppender
#log4j.appender.console.target=System.err
#log4j.appender.console.layout=org.apache.log4j.PatternLayout
#log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
#log4j.logger.org.spark-project.jetty=WARN
#log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
#log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
#log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

#log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.RollingAppender.File=./logs/spark/enriched.log
#log4j.appender.RollingAppender.DatePattern='.'yyyy-MM-dd
#log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
#log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - %m%n


#log4j.rootLogger=INFO, RollingAppender, console
1
Deploy mode is cluster, so driver's logs will be on one of cluster nodes. Have you checked node's logs? - T. Gawęda
I'm using yarn for aggregating the logs i don't see the log messages inside foreachRdd - user2359997
How did you create myConsumer? What about lines DStream? Show the code that you foreach later. - Jacek Laskowski

1 Answers

0
votes

The problem with the Spark Streaming application seems to be with a missing start after you've built your streaming computation, i.e.

ssc.start()

Quoting the scaladoc of StreamingContext:

After creating and transforming DStreams, the streaming computation can be started and stopped using context.start() and context.stop(), respectively.

context.awaitTermination() allows the current thread to wait for the termination of the context by stop() or by an exception.