0
votes

I'm using Nifi to manage my data flow. I first receive data using ConsumeKafka processor, then EvaluateJsonPath and ReplaceText processors to obtain this request

insert into my_table values('x1','x2','x3');

and finally, this request is fed to PuthiveQl processor. In this way, I insert data row by row into my Hive table, working fine but it's a very bad approach.

All what I want to know is how to obtain a request like that

insert into my_table values('x1','x2','x3'),('x11','x22','x33'),('x111','x222','x333');

by inserting a ; to the query in the end of the day, and so obtain a batch data insert into hive every 24 hours.

Please find attached my data flow in Nifi.

enter image description here

1

1 Answers

0
votes

I've found the solution but only for the first part. Using this data flow

enter image description here

I could finally obtain a query like that

insert into my_table values('x1','x2','x3'),('x11','x22','x33'),('x111','x222','x333');

I used the first ReplaceText processor to get only values to insert with comma in the end enter image description here

Then I merged content (maximum and minimum number of entries to set) The second ReplaceText processor is to add the sentence insert into my_table values in the beginning of the merged content enter image description here

And finally, I used a third ReplaceText processor to replace the last , by a ; at the end of the query. enter image description here

For the task of scheduling I've not yet found a solution.