1
votes

Does anybody has been able to start structured streaming on Hadoop 2.6.0-cdh5.14.2 also using external libraries ( mainly the spark-sql-* ).

Update

Before anything else: Missing info from my previous post: Spark is at version 2.3.0

On my remote-friend suggestion I did this:

  1. I moved from python to Scala ( which is better supported and it's the native Spark language )
  2. I ran Structured Streaming using other sources than Kafka.

I used a simple csv as source:

$ export SPARK_KAFKA_VERSION=0.10
$ spark2-shell 

scala> import org.apache.spark.sql.Encoders
scala> case class Amazon(EventId:String, DOCOMOEntitlementId:String, AmazonSubscriptionId:String, AmazonPlanId:String, DOCOMOUserId:String, MerchantAccountKey:String, ResellerKey:String, Status:String, CreatedDate:String, EndDate:String, ActivatedDate:String, FailedDate:String, ExpiryDate:String, LastUpdated:String, dateTimeStart:String, dateTimeEnd:String, referrerSource:String, reasonCode:String)
scala> val schema = Encoders.product[Amazon].schema
scala> val data = spark.readStream.schema(schema).csv("/user/ale/csv.csv").as[Amazon]

scala> data.isStreaming 
res0: Boolean = true

scala> val ss = data.writeStream.outputMode("append").format("console")
scala> ss.start()

Magically this code worked.

Cloudera claims that they don't support Structured Streaming, according to this, the following code, where I just changed the source, fails:

val data =spark.readStream.format("kafka")... 

Raising this exception:

java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at scala.util.Try.orElse(Try.scala:84)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
  ... 50 more

And I'm using only the provided Cloudera libraries ( no external jars ). Please note the

     $ export SPARK_KAFKA_VERSION=0.10

that is used to force the usage of the 0.10 version ( spark-streaming-kafka-.. ) since on the cluster also the 0.8 version exists. But there is no spark-sql-kafka jar.

At this point, AFAIK, the problem is that I'm missing the correct library (jar). Spark 2.3.0 looks healthy despite all the warnings on Cloudera site.

enter image description here

So... is there an option to have an "unofficially official Cloudera Jar" that address this issue? Did someone find a good Jar to deploy with the code that solves this issue? The Jar from cloudera option is the better: the internal policy deny to bundle third parties jars with the code.

The other option is re-implement again all the Structured Streaming stuff using the directStreaming. This is a job I like to avoid.

1
If you have admin access to the cluster, nothing is stopping you from installing a recent version of Spark to run Structured Streaming (in fact, you don't even need to "install" Spark, just set it up to submit YARN jobs) - OneCricketeer
I updated the question with more useful information, spark is already at the correct version, at the end of the road the issue is about jars/cloudera policy. - ozw1z5rd

1 Answers

1
votes

I think that this is the answer to my question:

  1. The library from Cloudera does exists and it is spark-sql-kafka-0-10_2.11-2.3.0.cloudera2.jar
  2. If Kafka is under Sentry, it will not work. Disable it.

Sadly the code need to create new group.id for each query

 18/11/15 10:51:25 WARN kafka010.KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-0
 18/11/15 10:51:27 WARN kafka010.KafkaOffsetReader: Error in attempt 2 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-1
 18/11/15 10:51:28 WARN kafka010.KafkaOffsetReader: Error in attempt 3 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-2
 18/11/15 10:51:29 ERROR streaming.MicroBatchExecution: Query [id = 099e897f-2a44-4a50-bc57-46f898e05174, runId = b010d8d8-7b73-4f71-8ca5-f3eda47149c6] terminated

And Sentry will not allow these group to access the data. There is no option to avoid it since it is coded in KafkaSourceProvider.scala code:

enter image description here

Hope this can save someone else time.