39
votes

I have a dataframe in Spark using scala that has a column that I need split.

scala> test.show
+-------------+
|columnToSplit|
+-------------+
|        a.b.c|
|        d.e.f|
+-------------+

I need this column split out to look like this:

+--------------+
|col1|col2|col3|
|   a|   b|   c|
|   d|   e|   f|
+--------------+

I'm using Spark 2.0.0

Thanks

7

7 Answers

91
votes

Try:

import sparkObject.spark.implicits._
import org.apache.spark.sql.functions.split

df.withColumn("_tmp", split($"columnToSplit", "\\.")).select(
  $"_tmp".getItem(0).as("col1"),
  $"_tmp".getItem(1).as("col2"),
  $"_tmp".getItem(2).as("col3")
)

The important point to note here is that the sparkObject is the SparkSession object you might have already initialized. So, the (1) import statement has to be compulsorily put inline within the code, not before the class definition.

41
votes

To do this programmatically, you can create a sequence of expressions with (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")) (assume you need 3 columns as result) and then apply it to select with : _* syntax:

df.withColumn("temp", split(col("columnToSplit"), "\\.")).select(
    (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
).show
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   a|   b|   c|
|   d|   e|   f|
+----+----+----+

To keep all columns:

df.withColumn("temp", split(col("columnToSplit"), "\\.")).select(
    col("*") +: (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
).show
+-------------+---------+----+----+----+
|columnToSplit|     temp|col0|col1|col2|
+-------------+---------+----+----+----+
|        a.b.c|[a, b, c]|   a|   b|   c|
|        d.e.f|[d, e, f]|   d|   e|   f|
+-------------+---------+----+----+----+

If you are using pyspark, use a list comprehension to replace the map in scala:

df = spark.createDataFrame([['a.b.c'], ['d.e.f']], ['columnToSplit'])
from pyspark.sql.functions import col, split

(df.withColumn('temp', split('columnToSplit', '\\.'))
   .select(*(col('temp').getItem(i).alias(f'col{i}') for i in range(3))
).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   a|   b|   c|
|   d|   e|   f|
+----+----+----+
16
votes

A solution which avoids the select part. This is helpful when you just want to append the new columns:

case class Message(others: String, text: String)

val r1 = Message("foo1", "a.b.c")
val r2 = Message("foo2", "d.e.f")

val records = Seq(r1, r2)
val df = spark.createDataFrame(records)

df.withColumn("col1", split(col("text"), "\\.").getItem(0))
  .withColumn("col2", split(col("text"), "\\.").getItem(1))
  .withColumn("col3", split(col("text"), "\\.").getItem(2))
  .show(false)

+------+-----+----+----+----+
|others|text |col1|col2|col3|
+------+-----+----+----+----+
|foo1  |a.b.c|a   |b   |c   |
|foo2  |d.e.f|d   |e   |f   |
+------+-----+----+----+----+

Update: I highly recommend to use Psidom's implementation to avoid splitting three times.

7
votes

This appends columns to the original DataFrame and doesn't use select, and only splits once using a temporary column:

import spark.implicits._

df.withColumn("_tmp", split($"columnToSplit", "\\."))
  .withColumn("col1", $"_tmp".getItem(0))
  .withColumn("col2", $"_tmp".getItem(1))
  .withColumn("col3", $"_tmp".getItem(2))
  .drop("_tmp")
4
votes

This expands on Psidom's answer and shows how to do the split dynamically, without hardcoding the number of columns. This answer runs a query to calculate the number of columns.

val df = Seq(
  "a.b.c",
  "d.e.f"
).toDF("my_str")
.withColumn("letters", split(col("my_str"), "\\."))

val numCols = df
  .withColumn("letters_size", size($"letters"))
  .agg(max($"letters_size"))
  .head()
  .getInt(0)

df
  .select(
    (0 until numCols).map(i => $"letters".getItem(i).as(s"col$i")): _*
  )
  .show()
1
votes

We can write using for with yield in Scala :-

If your number of columns exceeds just add it to desired column and play with it. :)

val aDF = Seq("Deepak.Singh.Delhi").toDF("name")
val desiredColumn = Seq("name","Lname","City")
val colsize = desiredColumn.size

val columList = for (i <- 0 until colsize) yield split(col("name"),".").getItem(i).alias(desiredColumn(i))

aDF.select(columList: _ *).show(false) 

Output:-

+------+------+-----+--+
|name  |Lname |city |
+-----+------+-----+---+
|Deepak|Singh |Delhi|
+---+------+-----+-----+

If you don't need name column then, drop the column and just use withColumn.

0
votes

Example: Without using the select statement.

Lets assume we have a dataframe having a set of columns and we want to split a column having column name as name

import spark.implicits._

val columns = Seq("name","age","address")

val data = Seq(("Amit.Mehta", 25, "1 Main st, Newark, NJ, 92537"),
             ("Rituraj.Mehta", 28,"3456 Walnut st, Newark, NJ, 94732"))

var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()

val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split("\\.").map(_.trim)
      (nameSplit(0),nameSplit(1),f.getAs[Int](1),f.getAs[String](2))
    })

val finalDF = newDF.toDF("First Name","Last Name", "Age","Address")

finalDF.printSchema()

finalDF.show(false)

output: output