0
votes

I am uploading the kafka environment with SSL, until then, without problems...

it goes up normally, but when I create a MySQL connector,

the producer does not receive the config sent by the docker environment!

Any Suggestions?

---
  version: '3'
  services:
    zookeeper:
      image: confluentinc/cp-zookeeper:latest
      container_name: ${ZK_HOST}
      hostname: ${ZK_HOST}
      ports:
        - "${ZK_PORT}:${ZK_PORT}"
      environment:
        ZOOKEEPER_SERVER_ID: 1
        ZOOKEEPER_CLIENT_PORT: ${ZK_PORT}
        ZOOKEEPER_CLIENT_SECURE: 'true'
        ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/zookeeper/secrets/kafka.keystore.jks
        ZOOKEEPER_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/zookeeper/secrets/kafka.truststore.jks
        ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
      volumes:
        - ./secrets:/etc/zookeeper/secrets

    kafka-ssl:
      image: confluentinc/cp-kafka:latest
      container_name: ${BROKER_HOST}
      hostname: ${BROKER_HOST}
      ports:
        - "${BROKER_PORT}:${BROKER_PORT}"
      depends_on:
        - ${ZK_HOST}
      environment:
        KAFKA_BROKER_ID: 1
        KAFKA_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
        KAFKA_ADVERTISED_LISTENERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
        KAFKA_SSL_KEYSTORE_CREDENTIALS: cert_creds
        KAFKA_SSL_KEY_CREDENTIALS: cert_creds
        KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks
        KAFKA_SSL_TRUSTSTORE_CREDENTIALS: cert_creds
        KAFKA_SSL_CLIENT_AUTH: 'required'
        KAFKA_SECURITY_PROTOCOL: SSL
        KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      volumes:
        - ./secrets:/etc/kafka/secrets
  
    schema-registry:
      image: confluentinc/cp-schema-registry
      container_name: ${SR_HOST}
      hostname: ${SR_HOST}
      depends_on:
        - ${ZK_HOST}
        - ${BROKER_HOST}
      ports:
        - "${SR_PORT}:${SR_PORT}"
      environment:
        SCHEMA_REGISTRY_HOST_NAME: ${SR_HOST}
        SCHEMA_REGISTRY_LISTENERS: 'https://0.0.0.0:${SR_PORT}'
        SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: '${ZK_HOST}:${ZK_PORT}'
        SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SSL
        SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.keystore.jks
        SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.keystore.jks
        SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_KAFKASTORE_SSL_KEY_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SSL_KEY_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.truststore.jks
        SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.truststore.jks
        SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: https
        SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
        SCHEMA_REGISTRY_SSL_CLIENT_AUTH: 'true'
      volumes:
        - ./secrets:/etc/schema-registry/secrets

    connect:
      build:
        context: .
        dockerfile: Dockerfile
      image: chethanuk/kafka-connect:5.3.1
      hostname: ${SR_CON}
      container_name: ${SR_CON}
      depends_on:
        - ${ZK_HOST}
        - ${BROKER_HOST}
        - ${SR_HOST}
      ports:
        - "${SR_CON_PORT}:${SR_CON_PORT}"
      environment:
        CONNECT_LISTENERS: 'https://0.0.0.0:${SR_CON_PORT}'
        CONNECT_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
        CONNECT_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
        CONNECT_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        CONNECT_REST_ADVERTISED_HOST_NAME: ${SR_CON}
        CONNECT_REST_PORT: ${SR_CON_PORT}
        CONNECT_GROUP_ID: compose-connect-group
        CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
        CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
        CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
        CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
        CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
        CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
        CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: https://${SR_HOST}:${SR_PORT}
        CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
        CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar
        CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
        CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
        CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
        CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
        CONNECT_SSL_CLIENT_AUTH: 'true'
        CONNECT_SECURITY_PROTOCOL: SSL
        CONNECT_SSL_KEY_PASSWORD: ${SSL_SECRET}
        CONNECT_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
        CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        CONNECT_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.keystore.jks
        CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        CONNECT_PRODUCER_SECURITY_PROTOCOL: SSL
        CONNECT_PRODUCER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
        CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        CONNECT_CONSUMER_SECURITY_PROTOCOL: SSL
        CONNECT_CONSUMER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
        CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
      volumes:
        - ./secrets:/etc/kafka/secrets

error:

[2021-05-21 05:13:50,157] INFO Requested thread factory for connector MySqlConnector, id = myql named = db-history-config-check (io.debezium.util.Threads)
[2021-05-21 05:13:50,160] INFO ProducerConfig values: 
    acks = 1
    batch.size = 32768
    bootstrap.servers = [broker:29092]
    buffer.memory = 1048576
    client.dns.lookup = default
    client.id = myql-dbhistory
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 10000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 1
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer
 (org.apache.kafka.clients.producer.ProducerConfig)
[2021-05-21 05:13:50,162] WARN Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker (org.apache.kafka.clients.ClientUtils)
[2021-05-21 05:13:50,162] INFO [Producer clientId=myql-dbhistory] Closing the Kafka producer with timeoutMillis = 0 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-05-21 05:13:50,162] INFO WorkerSourceTask{id=zabbix-hosts-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-21 05:13:50,162] INFO WorkerSourceTask{id=zabbix-hosts-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-21 05:13:50,163] ERROR WorkerSourceTask{id=zabbix-hosts-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:235)
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:40)
    at io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:90)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:94)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)
    ... 14 more
[2021-05-21 05:13:50,164] ERROR WorkerSourceTask{id=zabbix-hosts-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

vars

  • SSL_SECRET=
  • ZK_HOST=zookeeper
  • ZK_PORT=2181
  • BROKER_HOST=kafka-ssl
  • BROKER_PORT=9092
  • SR_HOST=schema-registry
  • SR_PORT=8181
  • SR_CON=connect
  • SR_CON_PORT=8083
  • HOST=localhost
1

1 Answers

1
votes

build and image should not be used together. You've not shown your Dockerfile, so it's unclear what you're doing there, but it may explain why no variables are actually loaded


bootstrap.servers = [broker:29092]

Somewhere in your Connect configuration, you're not using kafka-ssl:9092 as the connection string

Notice that your key and value serializers are using String, not Avro settings, too... Interceptor list is empty, SSL settings don't seem to be applied, etc

To narrow it down, I don't think you need _PRODUCER_BOOTSTRAP_SERVERS or the consumer one.

You should exec into your container and look at the templated connect-distributed.properties file that was created

Note that the Debezium images come with the mysql connector classes, so maybe you don't need your own image?