1
votes

I am trying to get a cql string given a Dataframe. I came across this function

Where I can do something like this

TableDef.fromDataFrame(df, "test", "hello", ProtocolVersion.NEWEST_SUPPORTED).cql()

It looks to me that the library uses first column as Partition Key and does not care about Clustering Key so how do I specify to use particular set of columns of a Dataframe as a PartitionKey and ParticularSet of columns as a Clustering Key ?

Looks like I can create a new TableDef however I have to do the entire mapping by myself and in some cases the necessary functions like ColumnType are not accessible in Java. for Example I tried to create a new ColumnDef like below

new ColumnDef("col5", new PartitionKeyColumn(), ColumnType is not accessible in Java)

Objective: To get a CQL create Statement from a Spark DataFrame.

Input My dataframe can have any number of columns with their respective Spark Types. so say I have a Spark Dataframe with 100 columns where my col8, col9 of my dataframe corresponds to cassandra partitionKey columns and my column10 corresponds to cassandra clustering Key column

col1| col2| ...|col100

Now I want to use spark-cassandra-connector library to give me a CQL create table statement given the info above.

Desired Output

create table if not exists test.hello (
   col1 bigint, (whatever column1 type is from my dataframe I just picked bigint randomly)
   col2 varchar,
   col3 double,
   ...
   ...
   col100 bigint,
   primary key(col8,col9)
) WITH CLUSTERING ORDER BY (col10 DESC);
1

1 Answers

1
votes

Because required components (PartitionKeyColumn & instances of ColumnType) are objects in Scala, you need to use following syntax to access their intances:

// imports
import com.datastax.spark.connector.cql.ColumnDef;
import com.datastax.spark.connector.cql.PartitionKeyColumn$;
import com.datastax.spark.connector.types.TextType$;

// actual code
ColumnDef a = new ColumnDef("col5",  
      PartitionKeyColumn$.MODULE$, TextType$.MODULE$);

See code for ColumnRole & PrimitiveTypes to find full list of names of objects/classes.

Update after additional requirements: Code is lengthy, but should work...

SparkSession spark = SparkSession.builder()
                .appName("Java Spark SQL example").getOrCreate();

Set<String> partitionKeys = new TreeSet<String>() {{
                add("col1");
                add("col2");
        }};
Map<String, Integer> clustereingKeys = new TreeMap<String, Integer>() {{
                put("col8", 0);
                put("col9", 1);
        }};

Dataset<Row> df = spark.read().json("my-test-file.json");
TableDef td = TableDef.fromDataFrame(df, "test", "hello", 
                ProtocolVersion.NEWEST_SUPPORTED);

List<ColumnDef> partKeyList = new ArrayList<ColumnDef>();
List<ColumnDef> clusterColumnList = new ArrayList<ColumnDef>();
List<ColumnDef> regColulmnList = new ArrayList<ColumnDef>();

scala.collection.Iterator<ColumnDef> iter = td.allColumns().iterator();
while (iter.hasNext()) {
        ColumnDef col = iter.next();
        String colName = col.columnName();
        if (partitionKeys.contains(colName)) {
                partKeyList.add(new ColumnDef(colName, 
                                PartitionKeyColumn$.MODULE$, col.columnType()));
        } else if (clustereingKeys.containsKey(colName)) {
                int idx = clustereingKeys.get(colName);
                clusterColumnList.add(new ColumnDef(colName, 
                                new ClusteringColumn(idx), col.columnType()));
        } else {
                regColulmnList.add(new ColumnDef(colName, 
                                RegularColumn$.MODULE$, col.columnType()));
        }
}

TableDef newTd = new TableDef(td.keyspaceName(), td.tableName(), 
                (scala.collection.Seq<ColumnDef>) partKeyList,
                (scala.collection.Seq<ColumnDef>) clusterColumnList, 
                (scala.collection.Seq<ColumnDef>) regColulmnList,
                td.indexes(), td.isView());
String cql = newTd.cql();
System.out.println(cql);