0
votes

I am new to Spark and Scala. I am trying to fetch contents from a procedure in SQL-server to use it in Spark SQL. For that, I am importing the data via JDBCRDD in Scala (Eclipse) and making an RDD from the procedure.

After creating the RDD, I am registering it as a temporary table and then using sqlContext.sql("Select query to select particular columns"). But when I enter column names in the select query, it is throwing an error as I do not have column names in neither RDD, nor the temporary table.

Please find below my code:

val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val url = XXXX
val username = XXXX
val password = XXXX
val query = "select A, B, C, D from Random_Procedure where ID_1 = ? and ID_2 = ?"

// New SparkContext 
val sc = new SparkConf().setMaster("local").setAppName("Amit")
val sparkContext = new SparkContext(sc)

val rddData = new JdbcRDD(sparkContext, () =>
DriverManager.getConnection(url, username, password),
query, 1, 0, 1, (x: ResultSet) => x.getString("A") + ", " +
x.getString("B") + ", " + x.getString("C") + ", " +
x.getString("D")).cache()

val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
 val dataFrame = rddData.toDF

dataFrame.registerTempTable("Data")
sqlContext.sql("select A from Data").collect.foreach(println)

When I run this code, it throws an error: cannot resolve 'code' given input columns _1;

But when I run: sqlContext.sql("select * from Data").collect.foreach(println) It prints all columns A, B, C, D

I believe I did not fetch column names in the JdbcRDD that I created, hence they are not accessible in the temporary table. I need help.

1

1 Answers

3
votes

The problem is that you create JdbcRDD object and you need DataFrame. RDD simple doesn't contain information about mapping from your tuples to column names. So you should create DataFrame from jdbc source as it explained in programming guide Moreover:

Spark SQL also includes a data source that can read data from other databases using JDBC. This functionality should be preferred over using JdbcRDD

Also notice that DataFrames are added in Spark 1.3.0. If you use older version you have to operate with org.apache.spark.sql.SchemaRDD