I use Spark 2.1.1 and Scala 2.10 with Spark Standalone of two nodes.
I'd like to execute a Spark code that reads a dataset from a MySQL table and writes it to a Cassandra table.
CODE :
object RdmsToCassandra extends Serializable {
def main(args: Array[String]) {
val conf = new SparkConf().set("spark.cassandra.connection.host", "192.168.22.231")
val sc = new SparkContext("spark://192.168.22.231:7077", "MigrateMySQLToCassandra", conf)
val mysqlJdbcString: String = s"jdbc:mysql://192.168.22.238/customer_events?user=root&password=qweqwe"
Class.forName("com.mysql.jdbc.Driver").newInstance
CassandraConnector(conf).withSessionDo { session =>
session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 }")
session.execute("CREATE TABLE IF NOT EXISTS test.store( store_name text PRIMARY KEY, location text, store_type text)" )
}
val highestId: Long = 2
val startingId: Long = 0
val numberOfPartitions = 1;
val customerEvents = new JdbcRDD(sc, () => { DriverManager.getConnection(mysqlJdbcString)},
"select * from store limit ?, ?",startingId, highestId, numberOfPartitions,
(r: ResultSet) => {
(r.getString("store_name"),
r.getString("location"),
r.getString("store_type")
)
}
)
customerEvents.saveToCassandra("test", "store1",
SomeColumns("store_name"))
}
}
I submit the application using the command:
spark-submit --master spark://192.168.22.231:6066 \
--class "RdmsToCassandra" \
rdbmstocassandra_2.10-1.0.jar
During execution I am getting following error _
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/05/26 15:45:51 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 10619@totalprices-db-server-02 17/05/26 15:45:51 INFO SignalUtils: Registered signal handler for TERM 17/05/26 15:45:51 INFO SignalUtils: Registered signal handler for HUP 17/05/26 15:45:51 INFO SignalUtils: Registered signal handler for INT 17/05/26 15:45:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/05/26 15:45:51 INFO SecurityManager: Changing view acls to: root 17/05/26 15:45:51 INFO SecurityManager: Changing modify acls to: root 17/05/26 15:45:51 INFO SecurityManager: Changing view acls groups to: 17/05/26 15:45:51 INFO SecurityManager: Changing modify acls groups to: 17/05/26 15:45:51 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 17/05/26 15:45:52 INFO TransportClientFactory: Successfully created connection to /192.168.22.231:36249 after 60 ms (0 ms spent in bootstraps) 17/05/26 15:45:52 INFO SecurityManager: Changing view acls to: root 17/05/26 15:45:52 INFO SecurityManager: Changing modify acls to: root 17/05/26 15:45:52 INFO SecurityManager: Changing view acls groups to: 17/05/26 15:45:52 INFO SecurityManager: Changing modify acls groups to: 17/05/26 15:45:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 17/05/26 15:45:52 INFO TransportClientFactory: Successfully created connection to /192.168.22.231:36249 after 1 ms (0 ms spent in bootstraps) 17/05/26 15:45:52 WARN Utils: Your hostname, totalprices-db-server-02 resolves to a loopback address: 127.0.0.1; using 221.243.36.126 instead (on interface em1) 17/05/26 15:45:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 17/05/26 15:45:52 INFO DiskBlockManager: Created local directory at /tmp/spark-73513e64-f52c-48c5-bf9e-bbc45caec12d/executor-fd21cdc3-673e-4b9e-9bd2-6cef1e3da140/blockmgr-157ab1c4-ca1e-481b-9357-b5855ee6beef 17/05/26 15:45:52 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB 17/05/26 15:45:52 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://[email protected]:36249 17/05/26 15:45:52 INFO WorkerWatcher: Connecting to worker spark://[email protected]:40561 17/05/26 15:45:52 INFO TransportClientFactory: Successfully created connection to /221.243.36.126:40561 after 1 ms (0 ms spent in bootstraps) 17/05/26 15:45:52 INFO WorkerWatcher: Successfully connected to spark://[email protected]:40561 17/05/26 15:45:52 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 17/05/26 15:45:52 INFO Executor: Starting executor ID 0 on host 221.243.36.126 17/05/26 15:45:52 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43114. 17/05/26 15:45:52 INFO NettyBlockTransferService: Server created on 221.243.36.126:43114 17/05/26 15:45:52 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 17/05/26 15:45:52 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(0, 221.243.36.126, 43114, None) 17/05/26 15:45:52 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(0, 221.243.36.126, 43114, None) 17/05/26 15:45:52 INFO BlockManager: Initialized BlockManager: BlockManagerId(0, 221.243.36.126, 43114, None) 17/05/26 15:46:00 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 17/05/26 15:46:00 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM tdown
Can anybody suggest what could be the problem here.