0
votes

currently I am stuck on the following problem: I am reading messages from a Kafka Topic using KafkaConsumer. The messages are strings and have the following format: { "a" : "b", "a1" : "b1", "c2" : "c3" } They are saved within the payload of the FlowFile.

I want to convert that string into json or ideally to csv, but cant figure out how to do it.

I am new to NiFi and researched as much as possible, but the answers I found were regarding conversions from json to avro or similar, but never string to json or avro. I also found out that the Kafka message is in the payload of the FlowFile, not in the attributes, so I have no clue how to get my hands on it, since the examples are always involving the attributes.

So in short: Can I convert the payload of a FlowFile, which is a string, to json/cvs with some built-in processor.

2

2 Answers

0
votes

if your message is in FlowFile, the following sequence could help:

1) Use AttributesToJson to convert payload message to Json. 2) Use EvaluateJsonPath to extract the payload message. In your case the kafka message. Then you can pass the extracted messages for csv generation.

This post can help to convert Json TO CSV: Convert Json To CSV

0
votes

I ended up doing this:

  1. ConsumeKafka gives me the string:

{ "a" : "b", "a1" : "b1" }

  1. EvaluateJsonPath creates attributes by adding properties

a -> $.a //results in attribute named a with value b

a1 -> $.a1 //results in attribute named a1 with value b1

  1. ReplaceText gets the attributes from EvaluateJsonPath to form one single csv formated:

Replacement value -> ${'a'},${'a1'}

This results as a single line, BUT NO NEW LINE:

b,b1

To add the new line appending \n, '\n', "\n" did not work. What worked was pressing Shift+Enter while typing into the Replacement value field, which resulted in creating an empty new line.