1
votes

I want to get data from nifi flow to spark and do some stuff. After that I want to send result again to nifi flow.

This is my nifi flow to send data to spark using output ports.

enter image description here

To get data from Nifi flow I wrote the below function.

def process() ={

    val schema =
      StructType(
        Seq(
          StructField(name = "ID", dataType = StringType, nullable = false),
          StructField(name = "Client_Name", dataType = StringType, nullable = false),
          StructField(name = "Due_Date", dataType = StringType, nullable = false),
          StructField(name = "Score", dataType = StringType, nullable = false)
        )
      )

    val config =
      new SiteToSiteClient
      .Builder()
      .url("http://localhost:8090/nifi")
      .portName("Data For Spark")
        .buildConfig()


    val sparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("NiFi-Spark Streaming example")

    val ssc = new StreamingContext(sparkConf, Seconds(10))

    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    val packetStream = ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_ONLY))

    val file = packetStream.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))


    file.foreachRDD(rdd => {

      val data = spark.createDataFrame(rdd
                                              .filter(!_.contains("ID,Client_Name,Due_Date,Score"))
                                              .map(line => Row.fromSeq(line.split(",").toSeq)), schema)

      data.show(100)
      val id = data.select("ID")

    })

    ssc.start()
    ssc.awaitTermination()



  }

The final result of above function is id dataframe. I want to send that result to nifi flow. I don't want to write that result as a file to some destination and get to nifi flow using getFile processor.

How can I send the final result to nifi flow?

2

2 Answers

0
votes

This is an interesting approach.

Have you considered introducing a brokering service such as Apache Kafka? This can be used both as a source and as a sink in your Apache Spark application and the integration is out of the box. You can also follow the official guide here: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html. The guide describes a flow using the relatively new Apache Spark Structured Streaming.

Then on Apache NiFi you can use the ConsumeKafkaRecord processor to consume from the same topic being used as a sink in your Apache Spark application. You can also make use of the PublishKafkaRecord processor if you wish to refactor your application to make use of Apache Kafka as a source as well rather than relying on Apache NiFi sockets directly.

Update: If you absolutely must write directly to Apache NiFi, using Apache Spark Structured Streaming you can extend the ForeachWriter class (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter) to implement your own custom sink.

0
votes

Look at ListenHTTP. That way you will treat NiFi as a simple REST service. Personally, I would prefer some message bus involved between Spark and NiFi but if it is not possible for your use case then you can try if that works for you.