1
votes

I use Spark 2.1.1 and Scala 2.10 with Spark Standalone of two nodes.

I'd like to execute a Spark code that reads a dataset from a MySQL table and writes it to a Cassandra table.

CODE :

object RdmsToCassandra extends Serializable {

def main(args: Array[String]) {
  val conf = new SparkConf().set("spark.cassandra.connection.host", "192.168.22.231")
  val sc = new SparkContext("spark://192.168.22.231:7077", "MigrateMySQLToCassandra", conf)
  val mysqlJdbcString: String = s"jdbc:mysql://192.168.22.238/customer_events?user=root&password=qweqwe"
  Class.forName("com.mysql.jdbc.Driver").newInstance

    CassandraConnector(conf).withSessionDo { session =>
    session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 }")
    session.execute("CREATE TABLE IF NOT EXISTS test.store(  store_name text PRIMARY KEY, location text, store_type text)" )
  }
  val highestId: Long = 2
  val startingId: Long = 0
  val numberOfPartitions = 1;

val customerEvents = new JdbcRDD(sc, () => { DriverManager.getConnection(mysqlJdbcString)},
    "select * from store limit ?, ?",startingId,  highestId,  numberOfPartitions,
     (r: ResultSet) => {
        (r.getString("store_name"),
        r.getString("location"),
        r.getString("store_type")
        )
    }
    )


  customerEvents.saveToCassandra("test", "store1",
      SomeColumns("store_name"))

}

}

I submit the application using the command:

spark-submit --master spark://192.168.22.231:6066 \
  --class "RdmsToCassandra" \
  rdbmstocassandra_2.10-1.0.jar

During execution I am getting following error _

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/05/26 15:45:51 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 10619@totalprices-db-server-02 17/05/26 15:45:51 INFO SignalUtils: Registered signal handler for TERM 17/05/26 15:45:51 INFO SignalUtils: Registered signal handler for HUP 17/05/26 15:45:51 INFO SignalUtils: Registered signal handler for INT 17/05/26 15:45:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/05/26 15:45:51 INFO SecurityManager: Changing view acls to: root 17/05/26 15:45:51 INFO SecurityManager: Changing modify acls to: root 17/05/26 15:45:51 INFO SecurityManager: Changing view acls groups to: 17/05/26 15:45:51 INFO SecurityManager: Changing modify acls groups to: 17/05/26 15:45:51 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 17/05/26 15:45:52 INFO TransportClientFactory: Successfully created connection to /192.168.22.231:36249 after 60 ms (0 ms spent in bootstraps) 17/05/26 15:45:52 INFO SecurityManager: Changing view acls to: root 17/05/26 15:45:52 INFO SecurityManager: Changing modify acls to: root 17/05/26 15:45:52 INFO SecurityManager: Changing view acls groups to: 17/05/26 15:45:52 INFO SecurityManager: Changing modify acls groups to: 17/05/26 15:45:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 17/05/26 15:45:52 INFO TransportClientFactory: Successfully created connection to /192.168.22.231:36249 after 1 ms (0 ms spent in bootstraps) 17/05/26 15:45:52 WARN Utils: Your hostname, totalprices-db-server-02 resolves to a loopback address: 127.0.0.1; using 221.243.36.126 instead (on interface em1) 17/05/26 15:45:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 17/05/26 15:45:52 INFO DiskBlockManager: Created local directory at /tmp/spark-73513e64-f52c-48c5-bf9e-bbc45caec12d/executor-fd21cdc3-673e-4b9e-9bd2-6cef1e3da140/blockmgr-157ab1c4-ca1e-481b-9357-b5855ee6beef 17/05/26 15:45:52 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB 17/05/26 15:45:52 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://[email protected]:36249 17/05/26 15:45:52 INFO WorkerWatcher: Connecting to worker spark://[email protected]:40561 17/05/26 15:45:52 INFO TransportClientFactory: Successfully created connection to /221.243.36.126:40561 after 1 ms (0 ms spent in bootstraps) 17/05/26 15:45:52 INFO WorkerWatcher: Successfully connected to spark://[email protected]:40561 17/05/26 15:45:52 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 17/05/26 15:45:52 INFO Executor: Starting executor ID 0 on host 221.243.36.126 17/05/26 15:45:52 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43114. 17/05/26 15:45:52 INFO NettyBlockTransferService: Server created on 221.243.36.126:43114 17/05/26 15:45:52 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 17/05/26 15:45:52 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(0, 221.243.36.126, 43114, None) 17/05/26 15:45:52 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(0, 221.243.36.126, 43114, None) 17/05/26 15:45:52 INFO BlockManager: Initialized BlockManager: BlockManagerId(0, 221.243.36.126, 43114, None) 17/05/26 15:46:00 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 17/05/26 15:46:00 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM tdown

Can anybody suggest what could be the problem here.

1
Can you use pastebin.com to paste the entire logs from the spark-submit?Jacek Laskowski
Do you perhaps use Scala 2.10 to compile your Spark application and Spark 2.1.1 (with Scala 2.11 by default)? Can you make sure you're on the same level of Scala versions in the app and the env?Jacek Laskowski
I am using Scala 2.12.2DEV
@JacekLaskowski sorry for the previous comment btw I am using Scala 2.10, I will check with Scala 2.11.DEV

1 Answers

0
votes

I know it is kind of old but maybe helpful for somebody else stopping by:

I faced a similar issue where the investigation of the NodeManager-Logs lead me to the root cause. You can access them via the Web-interface

nodeManagerAdress:PORT/logs

The PORT is specified in the yarn-site.xml under yarn.nodemanager.webapp.address. (default: 8042)

My Investigation-Workflow:

  1. Collect logs (yarn logs ... command)
  2. Identify node and container (in these logs) emitting the error
  3. Search the NodeManager-logs by Timestamp of the error for a root cause

Btw: you can access the aggregated collection (xml) of all configurations affecting a node at the same port with:

 nodeManagerAdress:PORT/conf