3
votes

I receive a lot of the messages (by http-protocol) per second (50000 - 100000) and want to save them to PostgreSql. I decided to use Kafka JDBC Sink for this purpose.

The messages are saved to database by one record, not in batches. I want to insert records in PostgreSQL in batches with size 500-1000 records.

I found some answers on this problem in issue: How to use batch.size?

I tried to use related options in configuration, but it seems that they no have any effect.

My Kafka JDBC Sink PostgreSql configuration (etc/kafka-connect-jdbc/postgres.properties):

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=3

# The topics to consume from - required for sink connectors like this one
topics=jsonb_pkgs

connection.url=jdbc:postgresql://localhost:5432/test?currentSchema=test
auto.create=false
auto.evolve=false

insert.mode=insert
connection.user=postgres
table.name.format=${topic}

connection.password=pwd

batch.size=500
# based on 500*3000byte message size
fetch.min.bytes=1500000
fetch.wait.max.ms=1500
max.poll.records=4000

I also added options to connect-distributed.properties:

consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500

Although each a partition gets more than 1000 records per second, records are saved to PostgreSQL by one.

Edit: consumer options were added in other file with correct names

I also added options to etc/schema-registry/connect-avro-standalone.properties:

# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000
1
It should be consumer.max.poll.records, by the way - OneCricketeer
I tried to change max.poll.records -> consumer.max.poll.records, but receive the same result. - Mariya
Sure. I'm just saying, that is the correct property name. In any case, records ought to be sent in individual queries, I'm not sure there are transactions rules around batches - OneCricketeer
In Kafka Connect you do, yes @Miguel - OneCricketeer
@Miguel Wrong section. Search for "the same parameters can be used but need to be prefixed with producer. and consumer. respectively" - OneCricketeer

1 Answers

4
votes

I realised that I misunderstood the documentation. The records are inserted in database one by one. The count of the records inserted in one transaction depends on batch.size and consumer.max.poll.records. I expected that the batch insert was implemented the other way. I would like to have an option to insert records like this:

INSERT INTO table1 (First, Last)
VALUES
    ('Fred', 'Smith'),
    ('John', 'Smith'),
    ('Michael', 'Smith'),
    ('Robert', 'Smith');

But that seems impossible.