0
votes

I am working on a spark program that takes input from the RDD and runs a few drool rules on it reading from a drl file.

in the drl file i have made a rule that wherever the hz attribute of the object is 0 it should increment the counter attribute by 1.

I have no clue why is that not working, it gives me an output of 0 for all the data in the stream (Yes, there is data with hz attribute equal to 0 and yes, I can print all the attributes and verify that even for them counter is 0)

I am using the KieSessionFactory class that I found on a github project here https://github.com/mganta/sprue/blob/master/src/main/java/com/cloudera/sprue/KieSessionFactory.java

But I am quite sure that this part not where the problem is, it only reads from the drl file and applies the rules.

below is my scala code: (I have marked the part where I think the problem lies, but please take a look at the drl file first)

package com.streams.Scala_Consumer

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.{ DStream, InputDStream, ConstantInputDStream }
import org.apache.spark.streaming.kafka.v09.KafkaUtils
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.sql.functions.avg
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka.producer._
import org.apache.kafka.common.serialization.{ Deserializer, Serializer }
import org.apache.kafka.common.serialization.StringSerializer
import org.kie.api.runtime.StatelessKieSession
//import KieSessionFactory.getKieSession;
//import Sensor

object scala_consumer extends Serializable {

// schema for sensor data   
class Sensor(resid_1: String, date_1: String, time_1: String, hz_1: Double, disp_1: Double, flo_1: Double, sedPPM_1: Double, psi_1: Double, chlPPM_1: Double, counter_1: Int) extends Serializable
{
var resid = resid_1
var date = date_1
var time = time_1
var hz = hz_1
var disp = disp_1
var flo = flo_1
var sedPPM = sedPPM_1
var psi = psi_1
var chlPPM = chlPPM_1
var counter = counter_1

def IncrementCounter (param: Int) =
{
    counter = counter + param
}
}

// function to parse line of sensor data into Sensor class
def parseSensor(str: String): Sensor = {
    val p = str.split(",")
    //println("printing p: " + p)
    new Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8).toDouble, 0)
}

var counter = 0
val timeout = 10 // Terminate after N seconds
val batchSeconds = 2 // Size of batch intervals

def main(args: Array[String]): Unit = {

    val brokers = "maprdemo:9092" // not needed for MapR Streams, needed for Kafka
    val groupId = "testgroup"
    val offsetReset = "latest"
    val batchInterval = "2"
    val pollTimeout = "1000"
    val topics = "/user/vipulrajan/streaming/original:sensor"
    val topica = "/user/vipulrajan/streaming/fail:test"
    val xlsFileName = "./src/main/Rules.drl"

    val sparkConf = new SparkConf().setAppName("SensorStream").setMaster("local[1]").set("spark.testing.memory", "536870912")
                                                                    .set("spark.streaming.backpressure.enabled", "true")
                                  .set("spark.streaming.receiver.maxRate", Integer.toString(2000000))
                                  .set("spark.streaming.kafka.maxRatePerPartition", Integer.toString(2000000));

    val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String](
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
        ConsumerConfig.GROUP_ID_CONFIG -> groupId,
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
            "org.apache.kafka.common.serialization.StringDeserializer",
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
            "org.apache.kafka.common.serialization.StringDeserializer",
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset,
        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
        "spark.kafka.poll.time" -> pollTimeout
    )

    val producerConf = new ProducerConf(
        bootstrapServers = brokers.split(",").toList
    )

    val messages = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topicsSet)

    val values: DStream[String] = messages.map(_._2)
    println("message values received")
    //values.print(10)
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************/////////////
    values.foreachRDD(x => try{
                                print("did 1\n")     //markers for manual and minor debugging
                                val myData = x.mapPartitions(s => {s.map(sens => {parseSensor(sens)})})
                                //myData.collect().foreach(println)
                                //println(youData.date)
                                print("did 2\n")
                                val evalData = myData.mapPartitions(s => {
                                val ksession = KieSessionFactory.getKieSession(xlsFileName)
                                val retData = s.map(sens => {ksession.execute(sens); sens;})
                                retData
                                })
                                evalData.foreach(t => {println(t.counter)})
                                print("did 3\n")
                               }

    catch{case e1: ArrayIndexOutOfBoundsException => println("exception in line " )})
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************///////////// 
    println("filtered alert messages ")

    // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

}
}

the drl file

package droolsexample

import com.streams.Scala_Consumer.Sensor;
import scala.com.streams.Scala_Consumer.Sensor; //imported because my rules file lies in the src/main folder
                                            //and code lies in src/main/scala 

// declare any global variables here
dialect "java"
rule "Counter Incrementer"

when
    sens : Sensor (hz == 0)

then
    sens.IncrementCounter(1);
end

I have tried using an xls file instead of the drl file, I have tried creating the class in java and the object in scala. I have tried a lot of other things, but all I get in the output is a warning:

6/06/27 16:38:30.462 Executor task launch worker-0 WARN AbstractKieModule: No files found for KieBase defaultKieBase

and when I print the counter values I get all zeroes. Anybody to the rescue?

1
If a simple rule rule x when then System.out.println( "Hello" ); end is in that file and not firing then, most likely, you aren't creating the knowledge base correctly. Do you think that importing two different classes with the same simple name is a good idea?laune
I have tried importing them individually too. Also, I have tried just printing ("Hello") that didn't work either. I am sorry but I have no clue what knowledge base is, I would google it but if you have a link or resource I would be really grateful if you could post in here :)Vipul Rajan

1 Answers

1
votes

When you are doing the spark submit and passing your JAR for execution, pls ensure that other dependency JARs from KIE, etc are also included with in the same JAR and then run it with Spark-Submit.

alternate is to have two separate projects one where you have your spark program another is your KIE project so you will have two Jars and you run it something like below:

 nohup spark-submit --conf "spark.driver.extraJavaOptions -Dlog4j.configuration=file:/log4j.properties" \
     --queue abc \
--master yarn \
--deploy-mode cluster \
--jars drools-kie-project-0.0.1-SNAPSHOT.jar --class com.abc.DroolsSparkJob SparkcallingDrools-0.0.1-SNAPSHOT.jar \
-inputfile /user/hive/warehouse/abc/* -output /user/hive/warehouse/drools-Op > app.log &