7
votes

I'm trying to run a test Spark script in order to connect Spark to hadoop. The script is the following

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
file = sc.textFile("hdfs://hadoop_node.place:9000/errs.txt")
errors = file.filter(lambda line: "ERROR" in line)
errors.count()

When I run it with pyspark I get

py4j.protocol.Py4JJavaError: An error occurred while calling o21.collect. : java.io.IOException: Can't get Master Kerberos principal for use as renewer at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:116) at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100) at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:187) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:251) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:46) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.SparkContext.runJob(SparkContext.scala:898) at org.apache.spark.rdd.RDD.collect(RDD.scala:608) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:243) at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:27) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:744)

This happens despite the facts that

  • I've done a kinit and a klist shows I have the correct tokens
  • when I issue a ./bin/hadoop fs -ls hdfs://hadoop_node.place:9000/errs.txt it shows the file
  • Both the local hadoop client and spark have the same configuration file

The core-site.xml in the spark/conf and hadoop/conf folders is the following (got it from one of the hadoop nodes)

<configuration>
    <property>

        <name>hadoop.security.auth_to_local</name>
        <value>
            RULE:[1:$1](.*@place)s/@place//
            RULE:[2:$1/$2@$0](.*/node1.place@place)s/^([a-zA-Z]*).*/$1/
            RULE:[2:$1/$2@$0](.*/node2.place@place)s/^([a-zA-Z]*).*/$1/
            RULE:[2:$1/$2@$0](.*/node3.place@place)s/^([a-zA-Z]*).*/$1/
            RULE:[2:$1/$2@$0](.*/node4.place@place)s/^([a-zA-Z]*).*/$1/
            RULE:[2:$1/$2@$0](.*/node5.place@place)s/^([a-zA-Z]*).*/$1/
            RULE:[2:$1/$2@$0](.*/node6.place@place)s/^([a-zA-Z]*).*/$1/
            RULE:[2:$1/$2@$0](.*/node7.place@place)s/^([a-zA-Z]*).*/$1/
            RULE:[2:nobody]
            DEFAULT
        </value>
    </property>
    <property>
        <name>net.topology.node.switch.mapping.impl</name>
        <value>org.apache.hadoop.net.TableMapping</value>
    </property>
    <property>
        <name>net.topology.table.file.name</name>
        <value>/etc/hadoop/conf/topology.table.file</value>
    </property>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://server.place:9000/</value>
    </property>
    <property>
      <name>hadoop.security.authentication</name>
      <value>kerberos</value>
    </property>

    <property>
      <name>hadoop.security.authorization</name>
      <value>true</value>
    </property>

    <property>
      <name>hadoop.proxyuser.hive.hosts</name>
      <value>*</value>
    </property>

    <property>
      <name>hadoop.proxyuser.hive.groups</name>
      <value>*</value>
    </property>

</configuration>

Can someone point out what am I missing?

1
Do all the Spark slaves also have the configuration?Daniel Darabos
Spark runs in a pseudo-distributable mode so there is only one server/nodendp
You would still have a couple JVMs running: your application, the Spark master, the Spark worker and the executor. I think your application and the executor will be the ones accessing HDFS, so you need to make sure they have loaded the right configuration. You could print System.getProperties on the executor to check.Daniel Darabos
Good idea. I'll do this next time I face a problem. :-)ndp

1 Answers

6
votes

After creating my own hadoop cluster in order to better understand how hadoop works. I fixed it.

You have to provide Spark with a valid .keytab file which has been generated for an account which has at least read access to the hadoop cluster.

Also, you have to provide spark with the hdfs-site.xml of your hdfs cluster.

So for my case I had to create a keytab file which when you run

klist -k -e -t

on it you get entries like the following

host/[email protected]

In my case the host was the literal word host and not a variable. Also in your hdfs-site.xml you have to provide the path of the keytab file and say that

host/[email protected]

will be your account.

Cloudera has a pretty detailed writeup on how to do it.

Edit after playing a little bit with different configurations I think the following should be noted. You have to provide spark with the exact hdfs-site.xml and core-site.xml of your hadoop cluster. Otherwise it wont work