0
votes

I have Python-based script which should be run on Apache Spark cluster.

I have Hadoop MapReduce InputFormat as a data source for RDD. Here is no problem.

The problem is I'd like to construct custom Hadoop Configuration with additional resource files loaded and attributes set. Intention is to use modified Configuration inside Python SparkContext.

I can build JVM code which can construct and load needed Hadoop Configuration. How to attach it to Python using PySpark?

Does anybody know how all of this could be achieved?

1

1 Answers

0
votes

I have solved this puzzle for my case when dropped requirement to modify Configurationonline and just base on custom set of Hadoop configuration *.xml files.

At first I have written Java class which adds configuration of additional layers to default resources for org.apache.hadoop.conf.Configuration. It's static initialization appends Configuration default resoutces:

public class Configurator {

    static {

        // We initialize needed Hadoop configuration layers default configuration
        // by loading appropriate classes.

        try {
            Class.forName("org.apache.hadoop.hdfs.DistributedFileSystem");
        } catch (ClassNotFoundException e) {
            LOG.error("Failed to initialize HDFS configuartion layer.");
        }

        try {
            Class.forName("org.apache.hadoop.mapreduce.Cluster");
        } catch (ClassNotFoundException e) {
            LOG.error("Failed to initialize YARN/MapReduce configuartion layer.");
        }

        // We do what actually HBase should: default HBase configuration
        // is added to default Hadoop resources.
        Configuration.addDefaultResource("hbase-default.xml");
        Configuration.addDefaultResource("hbase-site.xml");
    }

    // Just 'callable' handle.
    public void init() {
    }

}

So now if someone just loads my Configurator he or she has the following inftastructure configurations searched over class path: core, hdfs, MapReduce, YARN, HBase. Appropriate files are core-default.xml, core-site.xml, hdfs-default.xml, hdfs-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hbase-default.xml, hbase-site.xml. If I will need additional layers, no problem to extend.

Configurator.init() is provided just to have more trivial handle for class loading.

Now I need to extend Python Spark scripts with access to configurator during Spark context startup:

# Create minimal Spark context.
sc = SparkContext(appName="ScriptWithIntegratedConfig")

# It's critical to initialize configurator so any
# new org.apach.hadoop.Configuration object loads our resources.
sc._jvm.com.wellcentive.nosql.Configurator.init()

So now normal Hadoop new Configuration() construction (which is common inside PythonRDD infrastructure for Hadoop-based datasets) leads to all layers configuration loaded from class path where I can place configuration for needed cluster.

At least works for me.