0
votes

I'm receiving a kinda weird .csv file from FetchFTP processor. It looks like:

Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,

You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,

,,,,,,,,,,,,,,,

Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)
Ingka,190530,1_flight_0119,947535,25.06.2020,Auditorius SE,101304,Smart Phone,Flight_EK_Auditorius_Video_mobile,27353235,https://www.ikea.com/promo/wifi?utm_source=Auditorius&utm_medium=Video_mobile,0,0,0,0
Ingka,190530,1_flight_0119,947535,28.06.2020,Between Exchange SE,124598,PC,Flight_IQP_Between_Exchange_Banner_728x90_DCO,27359134,,0,0,0,0
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,

I can't save it in this format into database. What I want:

  1. Remove this useless block:
Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,

You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,

,,,,,,,,,,,,,,,

  1. Remove this useless footer:
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,
  1. Change headers names.

From: Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)

To: advertiser_name,advertiser_id,campaign_name,campaign_id,report_date,site,site_id,device,placement_name,placement_id,url,clicks,imps,total_record_imps,total_view_imps

Any tools to reach it with Apache Nifi?

1
sounds as a script - python, groovy, ... - daggett
is the header and footer length always consistent? E.g. could you always trim 11 lines from the top and 2 from the bottom? - Sdairs
updated code snippet! - Vikramsinh Shinde
@Sdairs, yeap, length always consistent. - funnelCONN
Easiest/efficient way would be a script that first just removes N lines from the top and bottom, then second does the header transformations (which will always now be line 1) - @VikramsinhShinde answer below is a good place to start with some adaptions - Sdairs

1 Answers

1
votes

Cleanup needs to be done to your data to make valid CSV format. You can use ExecuteScript or ExecuteStreamCommand processor to execute data cleaning script, say python, which will clean the incoming data to your desired format.

Below code snippet (header standardization and data cleanup) will give you an idea about how to access flowfile content using ExecuteScript processor configured for Python as script engine -

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import re


# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        with wrap(inputStream) as f:
            lines = f.readlines()

        outer_new_value_list = []
        is_csv_data = False
        for csv_row in lines:
            if not is_csv_data:
                if csv_row.startswith("Advertiser Name,Advertiser ID,"):
                    is_csv_data = True
                else:
                    continue
            if is_csv_data:
                if csv_row.startswith("Data was updated last on"):
                    break
                else:
                    outer_new_value_list.append(csv_row)

        outer_new_value_list[0] = outer_new_value_list[0].replace(' ', '_').replace('*', '').replace('-', '_').lower()
        with wrap(outputStream, 'w') as filehandle:
            filehandle.writelines("%s" % line for line in outer_new_value_list)


# end class
flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
# implicit return at the end