0
votes

I have a cassandra table like below and want to get records from cassandra using some conditions and put it in the hive table.

Cassandra Table(Employee) Entry:

 Id   Name  Amount  Time
 1    abc   1000    2017041801
 2    def   1000    2017041802
 3    ghi   1000    2017041803
 4    jkl   1000    2017041804
 5    mno   1000    2017041805
 6    pqr   1000    2017041806
 7    stu   1000    2017041807

Assume that this table columns are of the datatype string. We have same schema in hive also.

Now i wanted to import cassandra record between 2017041801 to 2017041804 to hive or hdfs. In second run I will pull the incremental records based on the prev run.

I am able to load the cassandra data into RDD using below syntax.

val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("mydb", "Employee")

Now my problem is how can i filter this records according to the between condition and persist the filtered records in hive or hive external table path.

Unfortunately my Time column is not clustering key in cassandra table. So I am not able to use .where() clause.

I am new to this scala and spark. So please kindly help out on this filter logic or any other better way of implementing this logic using dataframe, Please let me know.

Thanks in advance.

1
Filtering you could do in spark itself, something on the lines of: stackoverflow.com/a/39283574/7413631 Saving to hive is covered here stackoverflow.com/questions/37050828/…Marko Švaljek

1 Answers

0
votes
  1. I recommend to use Connector Dataframe API for loading from C* https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md.
  2. Use df.filter() call for predicates
  3. saveAsTable() method to store data in hive.

Here is spark 2.0 example for your case

val df = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "Employee", "keyspace" -> "mydb" ))
  .load()
df.filter("time between 2017041801 and 2017041804")
  .write.mode("overwrite").saveAsTable("hivedb.employee");