0
votes

I am very new to scala (typically I do this in R)

I have imported a large dataframe (2000+ columns, 100000+ rows) that is zero-inflated.

Task To convert the data to libsvm format

Steps As I understand the steps are as follows

  1. Ensure feature columns are set to DoubleType and Target is an Int
  2. Iterate through each row, retaining each value >0 in one array and index of its column in another array
  3. Convert to RDD[LabeledPoint]
  4. Save RDD in libsvm format

I am stuck on 3 (but maybe) because I am doing step 2 wrong.

Here is my code:

Main Function:

@Test
def testSpark(): Unit =
{
try
{

  var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")


  val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)

  val indexer = new StringIndexer()
    .setInputCol("Majors_Final")
    .setOutputCol("Majors_Final_Indexed")
  val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
  val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)



  //only doubles accepted by sparse vector, so that's what we filter for
  val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType)

  val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)


  val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()


  assertTrue(true)
}
catch
{
  case ex: Exception =>
  {

    println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}")
    fail()
  }
  }
}

Convert each row to LabeledPoint:

 @throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint =
{
  try
  {
    val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq)

    val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)
    val rowValuesItr: Iterable[Double] = sortedValuesMap.values

    var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
    var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
    var currentPosition: Int = 0
    rowValuesItr.foreach
    {
      kv =>
        if (kv > 0)
        {
          valuesArray += kv;
          positionsArray += currentPosition;
        }
        currentPosition = currentPosition + 1;
    }

    val lp:LabeledPoint = new LabeledPoint(label,  org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray))

    return lp

  }
  catch
  {
    case ex: Exception =>
    {
      throw new Exception(ex)
    }
  }
}

Problem So then I try to create a dataframe of labeledpoints which can easily be converted to an RDD.

val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()

But I get the following error:

SparkTest.scala:285: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for seri alizing other types will be added in future releases. [INFO] val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()

1
Did you try import spark.implicits._ as the error message mentioned? Also, usually return is not used in scala, it can create problems. - Shaido
Task not serializable and org.apache.spark.SparkException: Task not serializable after adding implicits - Jake
I think I need to look at matrices - Jake

1 Answers

0
votes

OK, so I skipped the DataFrame and created an Array of LabeledPoints whish is easily converted to an RDD. The rest is easy.

I stress, that while this works, I am new to scala and there may be more efficient ways to do this.

Main Function is now as follows:

  val mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
  val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)

  val indexer = new StringIndexer()
    .setInputCol("Majors_Final")
    .setOutputCol("Majors_Final_Indexed")
  val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
  val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)

  mDFFinal.show()
  //only doubles accepted by sparse vector, so that's what we filter for
  val fieldSeq: scala.collection.Seq[StructField] = mDFFinal.schema.fields.toSeq.filter(f => f.dataType == DoubleType)
  val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)

  var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()

  mDFFinal.collect().foreach
  {

    row => positionsArray+=convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"));

  }

  val mRdd:RDD[LabeledPoint]= spark.sparkContext.parallelize(positionsArray.toSeq)

  MLUtils.saveAsLibSVMFile(mRdd, "./output/libsvm")