2
votes

The question says it all. How can I do one of the following things:

  • How can I limit the number of concurrent tasks running for one processor cluster-wide?
  • Is there any unique and short ID for the Node, I run on? I could use these ID to append to the database-table-name to load (see details below) and have an exclusive table per connection.

I have a NIFI cluster and a self-written, specialized Processor, that loads heavy amounts of data into a database via JDBC (up to 20Mio rows per Second). It uses some of the database-vendor specific tuning tricks to be really fast in my particular case. One of these tricks needs an exclusive, empty table to load into for each connection.

At the moment, my processor opens one connection per Node in the NIFI-Cluster (it takes a connection from the DBCPConnectionPool). With about 90-100 nodes in the cluster, I'd get 90-100 connections - all of them bulk loading data at the same time.

I'm using NIFI 1.3.0.0

Any help or comment is highly appreciated. Sorry for not showing any code. It's about 700 lines not really helping with the question. But I plan to put it on Git and as part of the open-source project Kylo.

1
Would a temporary table, named with a UUID suffix or similar, work for the JDBC bulk-upload method you are using? - kevdoran
Something like the UUID but shorter would definitely help. UUID as is, is too long and needs reformatting before I could use it as tablename. Any idea how to shorten the UUID down to 10 or less characters? - jboi
I'm not clear on why the length or format of a table name matters if it is a temporary table that is never queries by a human or program, ie, the table is used for bulk loading but then all records are processed and moved to another table(s). If the database has format limitations for names, you could always drop the dashes from the uuid string. If you really just prefer shorter names, I suppose you could come up with a naming formula that will give you what you want, eg hostname_threadId or something like that using Thread.currentThread().getId() - kevdoran
Good points. Length of a table-name is restricted to 30 characters and I have to use two different connections (one to load and one for ELT) therefore temp tables do not work. BUT you're right. What I do now, is to produce a random number of 10 chars with all allowed characters in a table-name (this is 0-9 and a-z). I get a new number every time, the processor is scheduled for each node. So this should work in most cases. And if really two sessions get the same number, then the script running up front will fail, processor will yield and retry again. Thank you for the thoughts! - jboi
Thanks for the context @jboi. I think you are on the right track... random string with retry upon table name collision. Are upper case letters allowed in table names, and are table names case sensitive? If so, you could probably come up with a scheme based on base64-encoding a UUID. That would be 22 characters, and you would have to substitute characters for [+,] - kevdoran

1 Answers

4
votes

A common way of breaking up tasks in NiFi is to split the flow file into multiple files on the primary node. Then other nodes would pull one of the flow files and process it.

In your case, each file would contain a range of values to pull from the table. Let's say you had a hundred rows and wanted only 3 nodes to pull data. So you'd create 3 flow files each having separate attribute values:

  1. start-row-id=1, end-row-id=33
  2. start-row-id=34, end-row-id=66
  3. start-row-id=67, end-row-id=100

Then a node would pick up a flow file from a remote process group or a queue (such as JMS or SQS). There's only 3 flow files so no more than 3 nodes would being loading data from a connection.