3
votes

I saw a Dataframes tutorial at https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html which is written in Python. I am trying to translate it into Scala.

They have the following code:

df = context.load("/path/to/people.json")
# RDD-style methods such as map, flatMap are available on DataFrames
# Split the bio text into multiple words.
words = df.select("bio").flatMap(lambda row: row.bio.split(" "))
# Create a new DataFrame to count the number of words
words_df = words.map(lambda w: Row(word=w, cnt=1)).toDF()
word_counts = words_df.groupBy("word").sum()

So, I first read the data from a csv into a dataframe df and then I have:

val title_words = df.select("title").flatMap { row =>    
  row.getAs[String("title").split(" ") }
val title_words_df = title_words.map( w => Row(w,1) ).toDF()
val word_counts = title_words_df.groupBy("word").sum()

but I don't know:

  1. how to assign the field names to the rows in the line beginning with val title_words_df = ...

  2. I am having the error "The value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]"

Thanks in advance for the help.

1

1 Answers

2
votes

how to assign the field names to the rows

Python Row is quite different type of object than its Scala counterpart. It is a tuple augmented with names which makes it more equivalent to product type than untyped collection (o.a.s.sql.Row).

I am having the error "The value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]"

Since o.a.s.sql.Row is basically untyped it cannot be used with toDF and requires createDataFrame with explicit schema.

import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("word", StringType), StructField("cnt", LongType)
))

sqlContext.createDataFrame(title_words.map(w => Row(w, 1L)), schema)

If you want your code to be equivalent to the Python version you should use product types instead of Row. It means either a Tuple:

title_words.map((_, 1L)).toDF("word", "cnt")

or case class:

case class Record(word: String, cnt: Long)

title_words.map(Record(_, 1L)).toDF

In practice though, there should be no need for using RDDs:

import org.apache.spark.sql.functions.{explode, lit, split}

df.select(explode(split($"title", " ")), lit(1L))