I want to get data from nifi flow to spark and do some stuff. After that I want to send result again to nifi flow.
This is my nifi flow to send data to spark using output ports.
To get data from Nifi flow I wrote the below function.
def process() ={
val schema =
StructType(
Seq(
StructField(name = "ID", dataType = StringType, nullable = false),
StructField(name = "Client_Name", dataType = StringType, nullable = false),
StructField(name = "Due_Date", dataType = StringType, nullable = false),
StructField(name = "Score", dataType = StringType, nullable = false)
)
)
val config =
new SiteToSiteClient
.Builder()
.url("http://localhost:8090/nifi")
.portName("Data For Spark")
.buildConfig()
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("NiFi-Spark Streaming example")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val packetStream = ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_ONLY))
val file = packetStream.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))
file.foreachRDD(rdd => {
val data = spark.createDataFrame(rdd
.filter(!_.contains("ID,Client_Name,Due_Date,Score"))
.map(line => Row.fromSeq(line.split(",").toSeq)), schema)
data.show(100)
val id = data.select("ID")
})
ssc.start()
ssc.awaitTermination()
}
The final result of above function is id dataframe. I want to send that result to nifi flow. I don't want to write that result as a file to some destination and get to nifi flow using getFile processor.
How can I send the final result to nifi flow?
