0
votes

I'm encountering a type mismatch when distributing an operation using Spark and Docker. The tutorial I followed seems to be pretty clear. Here is my attempt for the Scala code:

package test

import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.{SparkConf, SparkContext}
import readhub.sharedkernel.config.Settings

object Application extends App {
    import com.datastax.spark.connector._


    val conf = new SparkConf(true)
      .setAppName("Coordinator")
      .setMaster("spark://localhost:7077")
      .set("spark.cassandra.connection.host", "valid host")

    val sc = new SparkContext(conf)

    CassandraConnector(conf).withSessionDo { session =>
      session.execute("CREATE KEYSPACE test2 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
      session.execute("CREATE TABLE test2.words (word text PRIMARY KEY, count int)")
      session.execute("INSERT INTO test2.words(word, count) VALUES('hey', 32)")

      sc.cassandraTable("test2", "words")
        .map(r => r.getString("word"))
        .foreach(process)

    }

  def process(word: String): Unit = {
    // Dummy processing
    println(word)
  }
} 

The build.sbt looks like that:

import sbt.project

val sparkSql = "org.apache.spark" %% "spark-sql" % "2.3.0" % "provided"
val sparkCassandraConnector = "com.datastax.spark" %% "spark-cassandra-connector" % "2.3.0" % "provided"

lazy val commonSettings = Seq(
  version := "0.1",
  scalaVersion := "2.11.12",
  organization := "ch.heig-vd"
)

lazy val root = (project in file("."))
  .settings(
    commonSettings,
    name := "Root"
  )
  .aggregate(
    coordinator
  )

lazy val coordinator = project
  .settings(
    commonSettings,
    name := "Coordinator",
    libraryDependencies ++= Seq(
      sparkSql,
      sparkCassandraConnector
    )
  )

The Dockerfile has been taken from this image and lightly modified to use the version 2.3.0 of Spark:

FROM phusion/baseimage:0.9.22

ENV SPARK_VERSION 2.3.0
ENV SPARK_INSTALL /usr/local
ENV SPARK_HOME $SPARK_INSTALL/spark
ENV SPARK_ROLE master
ENV HADOOP_VERSION 2.7
ENV SPARK_MASTER_PORT 7077
ENV PYSPARK_PYTHON python3
ENV DOCKERIZE_VERSION v0.2.0

RUN apt-get update && \
    apt-get install -y openjdk-8-jdk autossh python3-pip && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

##### INSTALL DOCKERIZE
RUN curl -L -O https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz && \
    tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz && \
    rm -rf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz

##### INSTALL APACHE SPARK WITH HDFS
RUN curl -s http://mirror.synyx.de/apache/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION.tgz | tar -xz -C $SPARK_INSTALL && \
    cd $SPARK_INSTALL && ln -s spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION spark

WORKDIR $SPARK_HOME

##### ADD Scripts
RUN mkdir /etc/service/spark
ADD runit/spark.sh /etc/service/spark/run
RUN chmod +x /etc/service/**/*

EXPOSE 4040 6066 7077 7078 8080 8081 8888

VOLUME ["$SPARK_HOME/logs"]

CMD ["/sbin/my_init"]

The docker-compose.yml is also pretty simple:

version: "3"

services:
  master:
    build: birgerk-apache-spark

    ports:
      - "7077:7077"
      - "8080:8080"

  slave:
    build: birgerk-apache-spark
    environment:
      - SPARK_ROLE=slave
      - SPARK_MASTER=master
    depends_on:
      - master

I cloned the git repo into the folder birgerk-apache-spark and only changed the version of Spark to be 2.3.0.

Finally, I glue everything using:

sbt coordinator/assembly

to create the fat jar and

spark-submit --class test.Application --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0 --master spark://localhost:7077 ReadHub\ Coordinator-assembly-0.1.jar

to submit the jar into the cluster. The error arises when I issue the spark-submit:

ERROR TransportRequestHandler:199 - Error while invoking RpcHandler#receive() on RPC id 7068633004064450609 java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 6155820641931972169, local class serialVersionUID = -3720498261147521051 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:687) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) [..]

From my perspective, the Dockerfile properly downloads the corresponding version of Spark, the one that can be found as a dependency in my build.sbt.

I'm sure I'm missing something fundamental. Could anyone point me into the right direction ?

Many thanks!

1
I have only seen that error when running mis-matched versions of spark. I would double check that your version of the docker image is the one actually running.Travis Hegner
I'm not too familiar with docker-compose, but the docs are showing a leading ./ when passing a directory to the build: directive. Try build: ./birgerk-apache-spark in your compose file.Travis Hegner
Hello @TravisHegner, thanks for your comment! I checked the version of Spark by browsing localhost:8080 and the version is the one expected (2.3.0). I also changed the build directive and has issued a docker-compose build, but no change. Docker correctly targeted the build folder. I also have the feeling something mismatches, but I cannot find it. :(Jämes
Perhaps some other depency in your app is pulling in a mismatched version of spark? What about your Java version, does it match the one in the containers?Travis Hegner

1 Answers

0
votes

version mismatch between spark 2.3.3 and spark 2.3.0.

be careful not to submit jobs with a SPARK_HOME defined on your host, that can cause this kind of problems