I haven't used spark for long, so this may be a newbie mistake, but I know my way around java to know how to fix a NullPointerException. However, the following error has me puzzled.
Setup: I have data on HDFS, which I read using Spark. The data are stored as HIVE tables, so I will be using a HiveContext. I'm using Spark 1.6.0 and MLLib_2.10. Java is 1.7.
Objective: I'm reading text from a table which I want to "classify". I have created offline a set of (n-gram, weight) pairs which I am planning to use during the classification.
The file is loaded into a HashMap, n-gram => weight. Let's call that list ngramWeights. I plan to do lookups on the ngramWeights map as the n-grams get generated by Spark. The ngrams stored in the map are unigrams and bigrams. Therefore, my Spark code needs to be generating both of those cases.
High-level Approach:
- Read the contents w/ a SQL query and "store" in a dataframe.
- Use MLLib to create a Tokenizer and an NGram transformation, N={1,2}
- For each Row
- Get the ngrams as a list
- Iterate over the list and run it through the ngramWeights map, aggregating the score
- Return a Tuple2 instance, with row_id as the key and the summated score as the value.
The result of the previous 2 runs (N=1, N=2) is stored in a respective JavaPairRDD structure.
- Join unigram and bigram scores, by joining the respective JavaPairRDD. The join is on the row_id.
- The overall score is the sum of the two weights.
- Return the result in a JavaRDD
Code:
This is what I'm using to score for a specific each line for a specific N.
private static final JavaPairRDD<String, Double> scoreByNGrams( DataFrame dframe, int nGramSize, final Map<String, Double> ngramWeights ){
Tokenizer tokenizer = new Tokenizer().setInputCol( "content" ).setOutputCol( "tokenized" );
dframe = tokenizer.transform( dframe );
NGram ngramTransform = new NGram().setN( nGramSize ).setInputCol( "tokenized" ).setOutputCol( "ngrams" );
dframe = ngramTransform.transform( dframe );
JavaPairRDD<String, Double> textScores = dframe.javaRDD().mapToPair(
new PairFunction<Row, String, Double>(){
public Tuple2<String, Double> call( Row row ) throws Exception{
double score = 0.0;
WrappedArray<String> wrappedArray = (WrappedArray<String>)row.getAs( "ngrams" );
Iterator<String> iter = wrappedArray.iterator();
while ( iter.hasNext() ){
String ngram = iter.next();
Double value = ngramWeights.get( ngram );
if ( value != null )
score += value;
}
String rowId = row.getAs( "row_id" ).toString();
return new Tuple2<String, Double>( rowId, score );
}
});
return textScores;
}
I then combine partial results in the following way:
public static void main( String[] args ){
Map<String, Double> ngramWeights = new HashMap<>();
loadCoefficients( ngramWeights ); // simply reading from a file
SparkConf sparkConf = new SparkConf().setAppName( "TestCase" );
JavaSparkContext sc = new JavaSparkContext( sparkConf );
HiveContext hiveContext = new HiveContext( sc.sc() );
// Hive table is stored in PARQUET format
hiveContext.setConf( "spark.sql.hive.convertMetastoreParquet", "false" );
hiveContext.sql( "USE my_db" );
DataFrame contentInfo = hiveContext.sql( "SELECT row_id, get_json_object( text, \"$.clean_text\" ) as content FROM mytable" );
JavaPairRDD<String, Double> unigrams = scoreByNGrams( contentInfo, 1, ngramWeights );
JavaPairRDD<String, Double> bigrams = scoreByNGrams( contentInfo, 2, ngramWeights );
JavaPairRDD<String, Double> messageScores = unigrams.join( bigrams ).mapToPair(
new PairFunction<Tuple2<String, Tuple2<Double, Double>>(){
@Override
public Tuple2<String, Double> call( Tuple2<String, Tuple2<Double, Double>> t ){
double sum = t._2()._1() + t._2()._2();
return new Tuple2<String, Double>( t._1(), sum );
}
}
);
// I then proceed to store the result
// ...
}
I compile my code, create my fat jar that includes all dependencies internally, put that on HDFS and run it using the command:
$ spark-submit --class my.package.MyClass --master yarn-cluster hdfs://my/hdfs/myfatjar.jar
The problem:
The application starts fine and seems to be executing, but when it reaches the FINISHED state it immediately throws an exception, saying:
Exception in thread "main" org.apache.spark.SparkException: Application application_<id> finished with failed status
at org.apache.spark.deploy.yarn.client.Client.run(Client.scala:1035)
at org.apache.spark.deploy.yarn.client.Client$.main(Client.scala:1082)
at org.apache.spark.deploy.yarn.client.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
A careful examination of the logs for this application reveal that the task that failed has the following problem:
java.lang.NullPointerException
at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr10$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
Has anyone encountered this issue before? The only place that I am using a tokenizer is within the function that I provided so I don't really see an issue there.
In case anyone is wondering, the "text" field is a json string that always has a "clean_text" key. Running the following query against the DB:
SELECT * FROM mytable WHERE get_json_object( text, "$.clean_text" ) IS NULL
returns 0 results. Therefore, it is not the case that the read content is NULL.
Let me know if anything's unclear or if you need additional information on the query that I'm executing.
UPDATE: I've removed the ML transformations (tokenizer and ngram) and converted the code so that I would use a custom NGram tokenizer (directly getting the content). For some rows, Spark returns NULL (as a String), although there are NO null values in mytable.
Anyone have any thoughts on that? Thanks again!