1
votes

I haven't used spark for long, so this may be a newbie mistake, but I know my way around java to know how to fix a NullPointerException. However, the following error has me puzzled.

Setup: I have data on HDFS, which I read using Spark. The data are stored as HIVE tables, so I will be using a HiveContext. I'm using Spark 1.6.0 and MLLib_2.10. Java is 1.7.

Objective: I'm reading text from a table which I want to "classify". I have created offline a set of (n-gram, weight) pairs which I am planning to use during the classification.

The file is loaded into a HashMap, n-gram => weight. Let's call that list ngramWeights. I plan to do lookups on the ngramWeights map as the n-grams get generated by Spark. The ngrams stored in the map are unigrams and bigrams. Therefore, my Spark code needs to be generating both of those cases.

High-level Approach:

  • Read the contents w/ a SQL query and "store" in a dataframe.
  • Use MLLib to create a Tokenizer and an NGram transformation, N={1,2}
  • For each Row
    • Get the ngrams as a list
    • Iterate over the list and run it through the ngramWeights map, aggregating the score
    • Return a Tuple2 instance, with row_id as the key and the summated score as the value.

The result of the previous 2 runs (N=1, N=2) is stored in a respective JavaPairRDD structure.

  • Join unigram and bigram scores, by joining the respective JavaPairRDD. The join is on the row_id.
    • The overall score is the sum of the two weights.
  • Return the result in a JavaRDD

Code:

This is what I'm using to score for a specific each line for a specific N.

private static final JavaPairRDD<String, Double> scoreByNGrams( DataFrame dframe, int nGramSize, final Map<String, Double> ngramWeights ){

    Tokenizer tokenizer = new Tokenizer().setInputCol( "content" ).setOutputCol( "tokenized" );
    dframe = tokenizer.transform( dframe );

    NGram ngramTransform = new NGram().setN( nGramSize ).setInputCol( "tokenized" ).setOutputCol( "ngrams" );
    dframe = ngramTransform.transform( dframe );

    JavaPairRDD<String, Double> textScores = dframe.javaRDD().mapToPair(

        new PairFunction<Row, String, Double>(){

            public Tuple2<String, Double> call( Row row ) throws Exception{

                double score = 0.0;

                WrappedArray<String> wrappedArray = (WrappedArray<String>)row.getAs( "ngrams" );
                Iterator<String> iter = wrappedArray.iterator();
                while ( iter.hasNext() ){
                    String ngram = iter.next();
                    Double value = ngramWeights.get( ngram );
                    if ( value != null )
                        score += value;
                }

                String rowId = row.getAs( "row_id" ).toString();
                return new Tuple2<String, Double>( rowId, score );
            }
        });

    return textScores;
}

I then combine partial results in the following way:

public static void main( String[] args ){

    Map<String, Double> ngramWeights = new HashMap<>();
    loadCoefficients( ngramWeights ); // simply reading from a file

    SparkConf sparkConf = new SparkConf().setAppName( "TestCase" );
    JavaSparkContext sc = new JavaSparkContext( sparkConf );
    HiveContext hiveContext = new HiveContext( sc.sc() );

    // Hive table is stored in PARQUET format
    hiveContext.setConf( "spark.sql.hive.convertMetastoreParquet", "false" );
    hiveContext.sql( "USE my_db" );

    DataFrame contentInfo = hiveContext.sql( "SELECT row_id, get_json_object( text, \"$.clean_text\" ) as content FROM mytable" );

    JavaPairRDD<String, Double> unigrams = scoreByNGrams( contentInfo, 1, ngramWeights );
    JavaPairRDD<String, Double> bigrams = scoreByNGrams( contentInfo, 2, ngramWeights );

    JavaPairRDD<String, Double> messageScores = unigrams.join( bigrams ).mapToPair( 

        new PairFunction<Tuple2<String, Tuple2<Double, Double>>(){

            @Override
            public Tuple2<String, Double> call( Tuple2<String, Tuple2<Double, Double>> t ){
                double sum = t._2()._1() + t._2()._2();
                return new Tuple2<String, Double>( t._1(), sum );
            }
        }
    );

    // I then proceed to store the result
    // ...
}

I compile my code, create my fat jar that includes all dependencies internally, put that on HDFS and run it using the command:

$ spark-submit --class my.package.MyClass --master yarn-cluster hdfs://my/hdfs/myfatjar.jar 

The problem:

The application starts fine and seems to be executing, but when it reaches the FINISHED state it immediately throws an exception, saying:

Exception in thread "main" org.apache.spark.SparkException: Application application_<id> finished with failed status
    at org.apache.spark.deploy.yarn.client.Client.run(Client.scala:1035)
    at org.apache.spark.deploy.yarn.client.Client$.main(Client.scala:1082)
    at org.apache.spark.deploy.yarn.client.Client.main(Client.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

A careful examination of the logs for this application reveal that the task that failed has the following problem:

java.lang.NullPointerException
    at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
    at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr10$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)

Has anyone encountered this issue before? The only place that I am using a tokenizer is within the function that I provided so I don't really see an issue there.

In case anyone is wondering, the "text" field is a json string that always has a "clean_text" key. Running the following query against the DB:

SELECT * FROM mytable WHERE get_json_object( text, "$.clean_text" ) IS NULL

returns 0 results. Therefore, it is not the case that the read content is NULL.

Let me know if anything's unclear or if you need additional information on the query that I'm executing.


UPDATE: I've removed the ML transformations (tokenizer and ngram) and converted the code so that I would use a custom NGram tokenizer (directly getting the content). For some rows, Spark returns NULL (as a String), although there are NO null values in mytable.

Anyone have any thoughts on that? Thanks again!

1
I get nullpointer exception as an equivalent of 'file not found'Alexis Benichoux
Hi! All of the files that I'm using are there (e.g., the file with the coefficients is definitely there, or the HIVE table). Using dframe.show() and logging, I have verified that things are being loaded properly.lebiathan

1 Answers

0
votes

I'm not sure if this is still causing anyone issues, however I was getting the same error using Databricks and found that the issue was being caused by the VectorAssembler in the pyspark.ml.feature module. In my case the list I was specifying for the inputCols argument was not in the same order that the columns appear in the dataset that the VectorAssembler was transforming, and this was causing the problem. By ensuring the order of my inputCols list matched the dataframe column list I stopped this java.lang.NullPointerException error occuring.