2
votes

I have a very frustrating problem trying to join a KStream, that is populated by a java driver program using KafkaProducer, to a GlobalKTable that is populated from a Topic that, in turn, is populated using the JDBCConnector pulling data from a MySQL Table. No matter what I try to do the join between the KStream and the GlobalKTable, which both are keyed on the same value, will not work. What I mean is that the ValueJoiner is never called. I'll try and explain by showing the relevant config and code below. I appreciate any help.

I am using the latest version of the confluent platform.

The topic that the GlobalKTable is populated from is pulled from a single MySQL table:

Column Name/Type:
pk/bigint(20)
org_name/varchar(255)
orgId/varchar(10)

The JDBCConnector configuration for this is:

name=my-demo
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:mysql://localhost:3306/reporting?user=root&password=XXX
table.whitelist=organisation
mode=incrementing
incrementing.column.name=pk
topic.prefix=my-
transforms=keyaddition
transforms.keyaddition.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.keyaddition.fields=orgId

I am running the JDBC connector using the command line:

connect-standalone /home/jim/platform/confluent/etc/schema-registry/connect-avro-standalone.properties /home/jim/prg/kafka/config/my.mysql.properties

This gives me a topic called my-organisation, that is keyed on orgId ..... so far so good! (note, the namespace does not seem to be set by JDBCConnector but I don't think this is an issue but I don't know for sure)

Now, the code. Here is how I initialise and create the GlobalKTable (relevant code shown):

final Map<String, String> serdeConfig =
    Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
        schemaRegistryUrl);

final StreamsBuilder builder = new StreamsBuilder();

final SpecificAvroSerde<Organisation> orgSerde = new SpecificAvroSerde<>();
orgSerde.configure(serdeConfig, false);

// Create the GlobalKTable from the topic that was populated using the connect-standalone command line 
final GlobalKTable<String, Organisation>
   orgs =
   builder.globalTable(ORG_TOPIC, Materialized.<String, Organisation, KeyValueStore<Bytes, byte[]>>as(ORG_STORE)
           .withKeySerde(Serdes.String())
           .withValueSerde(orgSerde));

The avro schema, from where the Organisaton class is generated is defined as:

{"namespace": "io.confluent.examples.streams.avro",
 "type":"record",
 "name":"Organisation",
 "fields":[
    {"name": "pk",      "type":"long"},
    {"name": "org_name",   "type":"string"},
    {"name": "orgId",   "type":"string"}
  ]
}

Note: as described above the orgId is set as the key on the topic using the single message transform (SMT) operation.

So, that is the GlobalKTable setup.

Now for the KStream setup (the right hand side of the join). This has the same key (orgId) as the globalKTable. I use a simple driver program for this:

(The use case is that this topic will contain events associated with each organisation)

public class UploadGenerator {

  public static void main(String[] args){
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer producer = new KafkaProducer(props);

// This schema is also used in the consumer application or more specifically a class generated from it.
String mySchema = "{\"namespace\": \"io.confluent.examples.streams.avro\"," +
                      "\"type\":\"record\"," +
                      "\"name\":\"DocumentUpload\"," +
                      "\"fields\":[{\"name\":\"orgId\",\"type\":\"string\"}," +
                                  "{\"name\":\"date\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}]}";

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(mySchema);

// Just using three fictional organisations with the following orgIds/keys
String[] ORG_ARRAY = {"002", "003", "004"};

long count = 0;
String key = ""; // key is the realm
while(true) {
  count++;
  try {
    TimeUnit.SECONDS.sleep(5);
  } catch (InterruptedException e) {

  }
  GenericRecord avroRecord = new GenericData.Record(schema);
  int orgId = ThreadLocalRandom.current().nextInt(0, 2 + 1);

  avroRecord.put("orgId",ORG_ARRAY[orgId]);
  avroRecord.put("date",new Date().getTime());
  key = ORG_ARRAY[orgId];

  ProducerRecord<Object, Object> record = new ProducerRecord<>("topic_uploads", key, avroRecord);
  try {
    producer.send(record);
    producer.flush();
  } catch(SerializationException e) {
    System.out.println("Exccccception was generated! + " + e.getMessage());
  } catch(Exception el) {
    System.out.println("Exception: " + el.getMessage());
  }
}
  }
}

So, this generates a new event representing an upload for an organisation represented by the orgId but also specifically set in the key variable used in the ProducerRecord.

Here is the code that sets up the KStream for these events:

final SpecificAvroSerde<DocumentUpload> uploadSerde = new SpecificAvroSerde<>();
uploadSerde.configure(serdeConfig, false);

// Get the stream of uploads
final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(Serdes.String(), uploadSerde));

// Debug output to see the contents of the stream
uploadStream.foreach((k, v) -> System.out.println("uploadStream: Key: " + k + ", Value: " + v));

// Note, I tried to re-key the stream with the orgId field (even though it was set as the key in the driver but same problem)
final KStream<String, DocumentUpload> keyedUploadStream = uploadStream.selectKey((key, value) -> value.getOrgId());
keyedUploadStream.foreach((k, v) -> System.out.println("keyedUploadStream: Key: " + k + ", Value: " + v));

// Java 7 form used as it was easier to put in debug statements
// OrgPK is just a helper class defined in the same class 
KStream<String, OrgPk> joined = keyedUploadStream.leftJoin(orgs,
        new KeyValueMapper<String, DocumentUpload, String>() { /* derive a (potentially) new key by which to lookup against the table */
          @Override
          public String apply(String key, DocumentUpload value) {
            System.out.println("1. The key passed in is: " + key);
            System.out.println("2. The upload realm passed in is: " + value.getOrgId());
            return value.getOrgId();
          }
        },
        // THIS IS NEVER CALLED WITH A join() AND WHEN CALLED WITH A leftJoin() HAS A NULL ORGANISATION
        new ValueJoiner<DocumentUpload, Organisation, OrgPk>() {
          @Override
          public OrgPk apply(DocumentUpload leftValue, Organisation rightValue) {
            System.out.println("3. Value joiner has been called...");
            if( null == rightValue ) {
              // THIS IS ALWAYS CALLED, SO THERE IS NEVER A "MATCH"
              System.out.println("    3.1. Orgnisation is NULL");
              return new OrgPk(leftValue.getRealm(), 1L);
            }
            System.out.println("    3.1. Org is OK");
            // Never reaches here - this is the issue i.e. there is never a match
            return new OrgPk(leftValue.getOrgId(), rightValue.getPk());
          }
        });

So, the above join (or leftJoin) never matches, even though the two keys are the same! This is the main issue.

Finally, the avro schema for the DocumentUpload is:

{"namespace": "io.confluent.examples.streams.avro",
 "type":"record",
 "name":"DocumentUpload",
 "fields":[
    {"name": "orgId",   "type":"string"},
    {"name":"date",     "type":"long",  "logicalType":"timestamp-millis"}
  ]
}

So, in summary:

  1. I have a KStream on a topic with a String key of OrgId
  2. I have a GlobalKTable on a topic with a String key of OrgId also.
  3. The join never works, even though the keys are in the GlobalKTable (at least they are in the topic underlying the GlobalKTable)

Can someone help me? I am pulling my hair out trying to figure this out.

1
Not sure atm. Can you verify that the GlobalKTable is populated with data correctly? Maybe you can use "Interactive Queries" ? Or read the global KTable input topic to make sure that key and value are both set correctly? Or you just debug into the library directly. A might want to look into KStreamKTableJoinProcessor#process() to see why joiner.apply() is not called -- might be some unexpected nulls. (Last question: why so you return value.getOrgId() instead of key in the KeyValueMapper that you pass into the join?)Matthias J. Sax
I tried using the interactive queries on the GlobalKTable store. Probably due to the JDBC Source connector not supporting namespaces for schemas, I get the following when I try to iterate over all <key,value> pairs: Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1 Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class organisation specified in writer's schema whilst finding reader's schema for a SpecificRecord.Arturo Knight
Unfortunately KIP-66 did not include a setNamespace SMT. Maybe I can write a custom one.Arturo Knight

1 Answers

1
votes

I was able to solve this issue on Windows/Intellij by providing a state dir config StreamsConfig.STATE_DIR_CONFIG