0
votes

I am developing a web based analytics application which will provide model training and testing features via UI. To do this i had used django with scikit learn.

django with spark on local machine (windows)

Now, I want to do this at big data scale using spark. Using django as the backend framework for handling requests and spark to do the processing and modelling

Django+pyspark on local machine (windows) and spark on remote cluster

I had setup a django project and setup spark on a cluster of two linux machines along with hdfs.

I am assuming that uploading / downloading / streaming of data to that hdfs is already implemented.

I write each model as a view in the django project and the implementation of the view has code written using pyspark. I had used pyspark to create a connection to the spark setup on linux cluster.

import pandas as pd
import numpy as np
import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def sample_model_code(trainData, trainDataFileType, trainDataDelimiter,
                      testData, testDataFileType, testDataDelimiter,
                      targetIndexTrainData, targetIndexTestData,
                      modelName):

    # trainData = "D:/training-data.csv"
    # trainDataFileType = "csv"
    # trainDataDelimiter = ","
    # testData = "D:/test-data.csv"
    # testDataFileType = "csv"
    # testDataDelimiter = ","
    # targetIndexTrainData = 44
    # targetIndexTestData = 44
    # modelName = "test_model"

    conf = SparkConf().setMaster("local").setAppName("creditDecisonApp2")
    sc = SparkContext(conf = conf)  
    spark = SparkSession(sc)

    # spark.conf.set("spark.sql.shuffle.partitions", "2")

    training_data_set = spark.read.option("inferSchema", "true").option("header", "true").csv(trainData)
    test_data_set = spark.read.option("inferSchema", "true").option("header", "true").csv(testData)

    train_data_column_names = training_data_set.columns
    train_data_target_variable = train_data_column_names[targetIndexTrainData] 

    test_data_column_names = test_data_set.columns
    test_data_target_variable = test_data_column_names[targetIndexTestData]

    train_data_numeric_cols = []
    train_data_categorical_cols = []
    test_data_numeric_cols = []
    test_data_categorical_cols = []

    for element in training_data_set.dtypes:

        if 'int' in element[1] or 'double' in element[1]:
            train_data_numeric_cols.append(element[0])       
        else:
            train_data_categorical_cols.append(element[0])

    for element in test_data_set.dtypes:

        if 'int' in element[1] or 'double' in element[1]:
            test_data_numeric_cols.append(element[0])       
        else:
            test_data_categorical_cols.append(element[0])

    stages_train = []
    stages_test = []

    for categoricalColumn in train_data_categorical_cols:
        if categoricalColumn != train_data_target_variable:
            stringIndexer = StringIndexer(inputCol = categoricalColumn, outputCol = categoricalColumn + 'Index')
            stages_train += [stringIndexer]

    label_stringIdx_train = StringIndexer(inputCol = train_data_target_variable, outputCol = 'label')
    stages_train += [label_stringIdx_train]

    assemblerInputsTrain = [c + "Index" for c in train_data_categorical_cols] + train_data_numeric_cols
    assemblerTrain = VectorAssembler(inputCols=assemblerInputsTrain, outputCol="features")
    stages_train += [assemblerTrain]

    for categoricalColumn in test_data_categorical_cols:
        if categoricalColumn != test_data_target_variable:
            stringIndexer = StringIndexer(inputCol = categoricalColumn, outputCol = categoricalColumn + 'Index')
            stages_test += [stringIndexer]

    label_stringIdx_test = StringIndexer(inputCol = test_data_target_variable, outputCol = 'label')
    stages_test += [label_stringIdx_test]

    assemblerInputsTest = [c + "Index" for c in test_data_categorical_cols] + test_data_numeric_cols
    assemblerTest = VectorAssembler(inputCols=assemblerInputsTest, outputCol="features")
    stages_test += [assemblerTest]

    pipeline_train = Pipeline(stages=stages_train)
    pipeline_test = Pipeline(stages=stages_test)

    pipeline_train_model = pipeline_train.fit(training_data_set)
    pipeline_test_model = pipeline_test.fit(test_data_set)

    train_df = pipeline_train_model.transform(training_data_set)
    test_df = pipeline_test_model.transform(test_data_set)

    dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 5)
    dtModel = dt.fit(train_df)
    predictions = dtModel.transform(test_df)

    #TODO: save the clf as a pickle object

    labelIndexer = StringIndexer().setInputCol(train_data_target_variable).setOutputCol("label").fit(training_data_set)
    category_preds_col_name = "Predicted_" + test_data_target_variable
    categoryConverter = IndexToString().setInputCol("prediction").setOutputCol(category_preds_col_name).setLabels(labelIndexer.labels)
    converted = categoryConverter.transform(predictions)

    result_df = converted.select(test_data_column_names + [category_preds_col_name])

    location_temp = workingDirectory
    result_file_name = location_temp +"/"+"credit_decision_predicted_data.csv"

    result_df.coalesce(1).write.format('com.databricks.spark.csv').save(result_file_name,header = 'true')


    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    eval_metrics_dict = dict()
    eval_metrics_dict["accuracy"] = evaluator.evaluate(converted)


    result_dict = dict()
    result_dict["resultFilePath"] = os.path.normpath(result_file_name).replace(os.sep, "/")
    result_dict["evaluationMetricsDetails"] = eval_metrics_dict

    return (result_dict)   

Django application is running on a local windows machine Spark is setup on local windows machine and above code worked when ran using django

My question is, will it work if have Spark setup on remote linux cluster, django on local windows machine and pass the hdfs file path instead of local file system file path of data

Or is there any approach to setup applications for this type of architecture

1

1 Answers

0
votes

I was able to do it by installing pySpark and spark on windows. Then execute the spark code by connecting connect to the cluster.

I had set up a spark cluster on two Linux machines and then installed spark on windows machine along with pySpark

Then, while spark session on windows I had set the master as (spark cluster master) and it started to run on the cluster from then on.