0
votes

I am new to NiFi and I am developing a custom processor to pull the most recent data from a psql database view . I can retrieve the database view, with the code below, when the custom processor is initialized.

private void GetData(){
    Connection connection = DriverManager.getConnection("jdbc:postgresql://example:5432/example", "user", "pass");
    Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
    ResultSet rs = statement.executeQuery("SELECT * FROM Example_Table");
    while(rs.next()){
        //Get data from database
    }
    connection.close();
}

However I am struggling to get the recent updates from the database view. The primary issue is when a new entry is added into the database. Since the database is queried when the processor is initialized, the custom processor will not have the new entry.

I tried to implement the query within public void onTrigger() function; however this will cause the pipe to back up, because it will query the database on every flowfile (which is not ideal if there are thousands of flowfiles coming in per second).

Is there a method to query the database at start up of the processor; without querying the database on every flowfile? Alternatively, would it be possible to detect if the database has been modified and pulling the data on modification? Or even set a timer to pull the database within the custom processor?

Any assistance is much appreciated, thank you in advance.

1

1 Answers

1
votes

I think if you can explain a bit more about your higher-level use case, it may help you get solutions, because this seems like an uncommon approach. Usually each processor has a single responsibility, so some processors interact with a database and then output the necessary information for others to consume.

There are some LookupServices which may be good examples to examine, for instance MongoDBLookupService.

If your use case is actually "I have a custom processor which ingests flowfiles containing arbitrary data and need to perform some operation on them using the latest data from this database table", you have a few options:

  1. Perform the database query in a method like you have above and call that method once during onEnabled() to get most of the data from the table, then call it on a regular interval using a thread to stay updated and store the results locally in a field. When the onTrigger() method runs, use the local cache results as opposed to making a database call. This will reduce latency and give you near-real-time data. Be sure to clean up the thread runner and local state via a method with the @OnStopped annotation.
  2. Perform the database query inline with the flowfile processing (i.e. onTrigger()). This can result in high latency and blocking on throughput. You can potentially increase the number of flowfiles processed in each execution cycle if they are able to be batch processed by using List<FlowFile> flowfiles = session.get(1000); (the number is configurable).
  3. If there are no upserts/in-place modifications (i.e. any change to the database table will result in new rows), you could use a sentinel query (SELECT COUNT(*) FROM table;) to return the number of rows, compare that to the number of rows previously returned, and only perform the "expensive" query retrieving all the data if those numbers differ. You could retrieve just the delta rows in this case by recording the max ID or timestamp of the previously obtained rows. If upserts are possible, something like SELECT MAX(lastModified) AS mostRecentTimeModified FROM table; may be helpful.