public static void createKafkaTopic(String sourceTopicName, String sinkTopicName, String responseTopicName, String kafkaUrl) {
try {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
AdminClient kafkaAdminClient = KafkaAdminClient.create(properties);
ListTopicsResult topics = kafkaAdminClient.listTopics();
Set <String> names = topics.names().get();
boolean containsSourceTopic = names.contains(sourceTopicName);
boolean containsSinkTopic = names.contains(sinkTopicName);
boolean containsResponseTopic = names.contains(responseTopicName);
if (!containsResponseTopic && !containsSinkTopic && !containsSourceTopic) {
CreateTopicsResult result = kafkaAdminClient.createTopics(
Stream.of(sourceTopicName, sinkTopicName, responseTopicName).map(
name -> new NewTopic(name, 1, (short) 1)
).collect(Collectors.toList())
);
result.all().get();
LOG.info("new sourceTopicName: {}, sinkTopicName: {}, responseTopicName: {} are created",
sourceTopicName, sinkTopicName, responseTopicName);
}
} catch (ExecutionException | InterruptedException e) {
LOG.info("Error message {}", e.getMessage());
}
}