1
votes

I have a text file and I am reading it as CSV file in spark data frame. Now after joining when I write when a function in order to select columns I get below exception.

Here is my code load csv files

val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN")

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)


val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/INCR")
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*)
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*)



import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

Here is my schema

latestForEachKey.printSchema()

root
 |-- DataPartiotion: string (nullable = true)
 |-- LineItem_organizationId: long (nullable = true)
 |-- LineItem_lineItemId: integer (nullable = true)
 |-- StatementTypeCode_1: string (nullable = true)
 |-- LineItemName_1: string (nullable = true)
 |-- LocalLanguageLabel_1: string (nullable = true)
 |-- FinancialConceptLocal_1: string (nullable = true)
 |-- FinancialConceptGlobal_1: string (nullable = true)
 |-- IsDimensional_1: boolean (nullable = true)
 |-- InstrumentId_1: string (nullable = true)
 |-- LineItemSequence_1: string (nullable = true)
 |-- PhysicalMeasureId_1: string (nullable = true)
 |-- FinancialConceptCodeGlobalSecondary_1: string (nullable = true)
 |-- IsRangeAllowed_1: string (nullable = true)
 |-- IsSegmentedByOrigin_1: string (nullable = true)
 |-- SegmentGroupDescription_1: string (nullable = true)
 |-- SegmentChildDescription_1: string (nullable = true)
 |-- SegmentChildLocalLanguageLabel_1: string (nullable = true)
 |-- LocalLanguageLabel_languageId_1: string (nullable = true)
 |-- LineItemName_languageId_1: integer (nullable = true)
 |-- SegmentChildDescription_languageId_1: string (nullable = true)
 |-- SegmentChildLocalLanguageLabel_languageId_1: string (nullable = true)
 |-- SegmentGroupDescription_languageId_1: string (nullable = true)
 |-- SegmentMultipleFundbDescription_1: string (nullable = true)
 |-- SegmentMultipleFundbDescription_languageId_1: string (nullable = true)
 |-- IsCredit_1: string (nullable = true)
 |-- FinancialConceptLocalId_1: string (nullable = true)
 |-- FinancialConceptGlobalId_1: string (nullable = true)
 |-- FinancialConceptCodeGlobalSecondaryId_1: string (nullable = true)
 |-- FFAction_1: string (nullable = true)

df1result.printSchema()

root
 |-- LineItem_organizationId: long (nullable = true)
 |-- LineItem_lineItemId: integer (nullable = true)
 |-- StatementTypeCode: string (nullable = true)
 |-- LineItemName: string (nullable = true)
 |-- LocalLanguageLabel: string (nullable = true)
 |-- FinancialConceptLocal: string (nullable = true)
 |-- FinancialConceptGlobal: string (nullable = true)
 |-- IsDimensional: boolean (nullable = true)
 |-- InstrumentId: string (nullable = true)
 |-- LineItemSequence: string (nullable = true)
 |-- PhysicalMeasureId: string (nullable = true)
 |-- FinancialConceptCodeGlobalSecondary: string (nullable = true)
 |-- IsRangeAllowed: boolean (nullable = true)
 |-- IsSegmentedByOrigin: boolean (nullable = true)
 |-- SegmentGroupDescription: string (nullable = true)
 |-- SegmentChildDescription: string (nullable = true)
 |-- SegmentChildLocalLanguageLabel: string (nullable = true)
 |-- LocalLanguageLabel_languageId: integer (nullable = true)
 |-- LineItemName_languageId: integer (nullable = true)
 |-- SegmentChildDescription_languageId: integer (nullable = true)
 |-- SegmentChildLocalLanguageLabel_languageId: integer (nullable = true)
 |-- SegmentGroupDescription_languageId: integer (nullable = true)
 |-- SegmentMultipleFundbDescription: string (nullable = true)
 |-- SegmentMultipleFundbDescription_languageId: integer (nullable = true)
 |-- IsCredit: boolean (nullable = true)
 |-- FinancialConceptLocalId: integer (nullable = true)
 |-- FinancialConceptGlobalId: integer (nullable = true)
 |-- FinancialConceptCodeGlobalSecondaryId: string (nullable = true)
 |-- FFAction: string (nullable = true)

This is where i am getting error

val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
  .select($"LineItem_organizationId", $"LineItem_lineItemId",
    when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
    when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"),
    when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"),
    when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"),
    when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"),
    when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"),
    when($"LineItemLineItemName_1".isNotNull, $"LineItemLineItemName_1").otherwise($"LineItemLineItemName").as("LineItemLineItemName"),
    when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
    when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"),
    when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed").as("IsRangeAllowed"),
    when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin").as("IsSegmentedByOrigin"),
    when($"SegmentGroupDescription_1".isNotNull, $"SegmentGroupDescription_1").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"),
    when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"),
    when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"),
    when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"),
    when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"),
    when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"),
    when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"),
    when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"),
    when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"),
    when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"),
    when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit").as("IsCredit"),
    when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
    when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
    when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
    when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction").as("FFAction"))
    .filter(!$"FFAction".contains("D"))


 dfMainOutput.write
  .format("csv")
  .option("quote", "\uFEFF")
  .option("codec", "gzip")
  .save("s3://trfsdisu/SPARK/FinancialLineItem/output")

below is my exception

 org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (`IsRangeAllowed_1` IS NOT NULL) THEN `IsRangeAllowed_1` ELSE `IsRangeAllowed` END' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;

I have not mentioned any type while loading CSV.

I am loading both as csv file without schema but in one IsRangeAllowed_1 is string where as in another it is BooleanType

And with this, there is one more question I want to ask.

How can we remove default delimiter in the data frame output and put our custom delimiter with partition and with gzip compression?

dfMainOutput.rdd.saveAsTextFile("s3://trfsdisu/SPARK/FinancialLineItem/output")
1

1 Answers

1
votes

For first issue, i,e WHEN THEN and ELSE expressions should all be same type or common type;

But here IsRangeAllowed is Boolean and IsRangeAllowed_1 is String. So convert one of the column to String or Boolean. So the code change could be

import org.apache.spark.sql.types.DataTypes

when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1")
    .otherwise($"IsRangeAllowed".cast(DataTypes.StringType))
  .as("IsRangeAllowed")

How can we remove default delimiter in the data frame output and put our custom delimiter with partition and with gzip compression ?

A DataFrame can be directly saved with delimiter and codec, without needing to call the underlying rdd e.g dfMainOutput.rdd. i.e :

dfMainOutput.write
  .format("csv")
  .option("delimiter", "!")
  .option("codec", "gzip")
  .save("s3://trfsdisu/SPARK/FinancialLineItem/output")

Edit: as per the comment for concat_ws example

df.withColumn("colmn", concat_ws("|!|", $"IsRangeAllowed_1", "IsRangeAllowed", ...)
  .selectExpr("colmn")
  .show()

//to add all columns in df
df.withColumn("colmn", concat_ws("|!|", df.cols:_*))
  .selectExpr("colmn")
  .show()