0
votes

I'm new to Java, Spring, and Kafka in general. Here is the situation:

I've used @KafkaListener annotations to make a Kafka Consumer that looks like so:

public class Listener {

private ExecutorService executorService;
private List<Future> futuresThread1 = new ArrayList<>();
public Listener() {
    Properties appProps = new AppProperties().get();
    this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));
}
//TODO: how can I pass an approp into this annotation?
@KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")
public void listener(ConsumerRecord<?, ?> record, ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue) throws InterruptedException, ExecutionException
    {
        futuresThread1.add(executorService.submit(new Runnable() {
                @Override public void run() {
                    System.out.println(record);
                    arrayBlockingQueue.add(record);
                }
        }));
    }

}

I added a parameter, ArrayBlockingQueue, to the listener, that I'd like for it to add the messages from Kafka to.

The problem I am having is I can't figure out how I actually pass an ArrayBlockingQueue into the listener because Spring is handling the instantiation and running of the listener behind the scenes.

I need this blocking queue so that another object outside of the listener can access the messages and do some work with it. For example, in my main:

@SpringBootApplication
public class SourceAccountListenerApp {
    public static void main(String[] args) {
        Properties appProps = new AppProperties().get();
        ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue = new ArrayBlockingQueue<>(
           Integer.parseInt(appProps.getProperty("blockingQueueSize"))
        );
        //TODO: This starts my listener. How do I pass the queue to it?
        SpringApplication.run(SourceAccountListenerApp.class, args);
    }
}
1
"The problem I am having is I can't figure out how I actually pass an ArrayBlockingQueue into the listener because Spring is handling the instantiation and running of the listener behind the scenes." so why not declare it as spring bean and let spring handle the passing? - eis
Do you mean declaring the blocking queue as a spring bean? - Amr A.
yes [some characters to bypass the filter] - eis
Like so? Just found this: stackoverflow.com/questions/31790412/… - Amr A.

1 Answers

2
votes

There are plenty of ways to declare a blocking queue as bean.

One example, main:

@SpringBootApplication
public class SourceAccountListenerApp {
    public static void main(String[] args) {
        SpringApplication.run(SourceAccountListenerApp.class, args);
    }

    @Bean
    public ArrayBlockingQueue arrayBlockingQueue() {
        Properties appProps = new AppProperties().get();
        ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue = new ArrayBlockingQueue<>(
           Integer.parseInt(appProps.getProperty("blockingQueueSize"))
        );
        return arrayBlockingQueue;
    }
}

Listener:

public class Listener {

    @Autowired
    ArrayBlockingQueue arrayBlockingQueue;