1
votes

I try to create Kafka to BigQuery data pipeline using Confluent BigQuerySinkConnector. Test environment is a cp-all-in-one docker container. I added on it (it does not exist as default). All definitions I did on Google BigQuery side (I hope...). But it just gives Schema Registry error that I cannot understand why it occurs. I created a table which is named as rest_avro on BigQuery dataset. The topic schema:

{
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": [
        "null",
        "int"
      ]
    }
  ],
  "name": "User",
  "type": "record"
} 

I defined this schema on BigQuery table manually.

enter image description here

There is no error when connector running.

enter image description here

My connector configuration is loaded successfully

[2020-08-19 13:21:46,803] INFO SinkConnectorConfig values:
    config.action.reload = restart
    connector.class = com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
    errors.deadletterqueue.context.headers.enable = false
    errors.deadletterqueue.topic.name =
    errors.deadletterqueue.topic.replication.factor = 3
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    header.converter = null
    key.converter = null
    name = kcbq-connect1
    tasks.max = 1
    topics = [rest-avro]
    topics.regex =
    transforms = []
    value.converter = null
 (org.apache.kafka.connect.runtime.SinkConnectorConfig)
[2020-08-19 13:21:46,803] INFO EnrichedConnectorConfig values:
    config.action.reload = restart
    connector.class = com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
    errors.deadletterqueue.context.headers.enable = false
    errors.deadletterqueue.topic.name =
    errors.deadletterqueue.topic.replication.factor = 3
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    header.converter = null
    key.converter = null
    name = kcbq-connect1
    tasks.max = 1
    topics = [rest-avro]
    topics.regex =
    transforms = []
    value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2020-08-19 13:21:46,804] INFO [Worker clientId=connect-1, groupId=compose-connect-group] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-08-19 13:21:55,940] INFO [Consumer clientId=connector-consumer-kcbq-connect1-0, groupId=connect-kcbq-connect1] Seeking to offset 8 for partition rest-avro-0 (org.apache.kafka.clients.consumer.KafkaConsumer)

But console puts errors like below. Do you have any idea?

[2020-08-19 13:09:45,663] ERROR Task failed with org.apache.kafka.connect.errors.ConnectException error: Exception encountered while trying to fetch latest schema metadata from Schema Registry (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor)
Exception in thread "pool-3-thread-79" org.apache.kafka.connect.errors.ConnectException: Exception encountered while trying to fetch latest schema metadata from Schema Registry
    at com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever.retrieveSchema(SchemaRegistrySchemaRetriever.java:67)
    at com.wepay.kafka.connect.bigquery.SchemaManager.updateSchema(SchemaManager.java:58)
    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptSchemaUpdate(AdaptiveBigQueryWriter.java:129)
    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:96)
    at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:117)
    at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:77)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
    at sun.net.www.http.HttpClient.New(HttpClient.java:339)
    at sun.net.www.http.HttpClient.New(HttpClient.java:357)
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:153)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:359)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:351)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:136)
    at com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever.retrieveSchema(SchemaRegistrySchemaRetriever.java:63)
    ... 8 more
1

1 Answers

2
votes

The problem says that BigQuery Sink Connector is unable to retrieve the current schema from Schema Registry container.

It seems that the schemaRegistryLocation field has been misconfigured on BigQuery Sink connector properties. Most probably it has been set to be http://localhost:8081.

As the docker-compose.yml shipped with Confluent's repository says, this endpoint needs to be defined as http://schema-registry:8081 on dockerized environments.

With below properties I managed to create the BigQuery Sink Connector;

{
  "name": "customer-connect1",
  "config": {
    "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "tasks.max": "1",
    "topics": "dbserver1.venue_organization.customers",
    "topicsToTables": "dbserver1.venue_organization.customers=customers",
    "sanitizeTopics": "true",
    "autoCreateTables": "true",
    "autoUpdateSchemas": "true",
    "schemaRetriever": "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
    "schemaRegistryLocation": "http://schema-registry:8081",
    "bufferSize": "100000",
    "maxWriteSize": "10000",
    "tableWriteWait": "1000",
    "project": "venue-organization",
    "datasets": ".*=venue_organization",
    "keyfile": " /data/venue-organization-service-account.json"
  }
}

More in here: Google BigQuery Sink Connector Configuration Properties