Folks, I'm new to all this data streaming process but I was able to build and submit a Flink job that will read some CSV data from Kafka and aggregate it then put it in Elasticsearch.
I was able to do the first two parts, and print out my aggregation to STDOUT. But when I added the code to put it to Elasticsearch, it seems nothing is happening there (no data being added). I looked at the Flink job manager log and it looks fine (no errors) and says:
2020-03-03 16:18:03,877 INFO
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge
- Created Elasticsearch RestHighLevelClient connected to [http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200]
Here is my code at this point:
/*
* This Scala source file was generated by the Gradle 'init' task.
*/
package flinkNamePull
import java.time.LocalDateTime
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
object Demo {
/**
* MapFunction to generate Transfers POJOs from parsed CSV data.
*/
class TransfersMapper extends RichMapFunction[String, Transfers] {
private var formatter = null
@throws[Exception]
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
}
@throws[Exception]
override def map(csvLine: String): Transfers = {
//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
var splitCsv = csvLine.stripLineEnd.split(",")
val arrLength = splitCsv.length
val i = 0
if (arrLength != 13) {
for (i <- arrLength + 1 to 13) {
if (i == 13) {
splitCsv = splitCsv :+ "0.0"
} else {
splitCsv = splitCsv :+ ""
}
}
}
var trans = new Transfers()
trans.rowId = splitCsv(0)
trans.subjectId = splitCsv(1)
trans.hadmId = splitCsv(2)
trans.icuStayId = splitCsv(3)
trans.dbSource = splitCsv(4)
trans.eventType = splitCsv(5)
trans.prev_careUnit = splitCsv(6)
trans.curr_careUnit = splitCsv(7)
trans.prev_wardId = splitCsv(8)
trans.curr_wardId = splitCsv(9)
trans.inTime = splitCsv(10)
trans.outTime = splitCsv(11)
trans.los = splitCsv(12).toDouble
return trans
}
}
def main(args: Array[String]) {
// Create streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// Set properties per KafkaConsumer API
val properties = new Properties()
properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
properties.setProperty("group.id", "test")
// Add Kafka source to environment
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
// Read from beginning of topic
myKConsumer.setStartFromEarliest()
val streamSource = env
.addSource(myKConsumer)
// Transform CSV (with a header row per Kafka event into a Transfers object
val streamTransfers = streamSource.map(new TransfersMapper())
// create a TableEnvironment
val tEnv = StreamTableEnvironment.create(env)
println("***** NEW EXECUTION STARTED AT " + LocalDateTime.now() + " *****")
// register a Table
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
tEnv.createTemporaryView("transfers", tblTransfers)
tEnv.connect(
new Elasticsearch()
.version("7")
.host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http") // required: one or more Elasticsearch hosts to connect to
.index("transfers-sum")
.documentType("_doc")
.keyNullLiteral("n/a")
)
.withFormat(new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
.withSchema(new Schema()
.field("curr_careUnit", DataTypes.STRING())
.field("sum", DataTypes.DOUBLE())
)
.inUpsertMode()
.createTemporaryTable("transfersSum")
val result = tEnv.sqlQuery(
"""
|SELECT curr_careUnit, sum(los)
|FROM transfers
|GROUP BY curr_careUnit
|""".stripMargin)
result.insertInto("transfersSum")
// Elasticsearch elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200
env.execute("Flink Streaming Demo Dump to Elasticsearch")
}
}
I'm not sure how I can debug this beast... Wondering if somebody can help me figure out why the Flink job is not adding data to Elasticsearch :( From my Flink cluster, I'm able to query Elasticsearch just fine (manually) and add records to my index:
curl -XPOST "http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200/transfers-sum/_doc" -H 'Content-Type: application/json' -d'{"curr_careUnit":"TEST123","sum":"123"}'