1
votes

I have the spring bean that uses dropwizard-metrics annotations on its methods to measure some metrics (see #parse method):

@Component
public class Consumer extends AbstractConsumer {

  @Autowired
  private EventsService eventsService;

  private ExecutorService executor;

  @PostConstruct
  void init() throws Exception {
    executor = Executors.newFixedThreadPool(1);

    ConsumerEventsHandler consumer = new ConsumerEventsHandler(Arrays.asList("topic1"), this);
    executor.submit(consumer);
  }

  @Timed(name = CONSUMER_BATCH_PARSING_TIME, absolute = true)
  public void parse(ConsumerRecord<String, String> record) {
    String val = record.value();
    eventsService.parseToDB(val);
  }
}

As you see this bean submits a new task to the executor service, the task is not a bean but it needs a reference to the Consumer bean to reuse its logic (see consumer.parse(record); call):

public class ConsumerEventsHandler implements Runnable {

private Consumer consumer;
private List<String> topics;
protected final KafkaConsumer<String, String> kafkaConsumer;

public ConsumerEventsHandler(List<String> topics, Consumer consumer) {
    this.topics = topics;
    this.consumer = consumer;
    this.kafkaConsumer = new KafkaConsumer<>();
}

@Override
public void run() {
    try {
        consumer.subscribe(topics);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

            for (ConsumerRecord<String, String> record : records) {
                consumer.parse(record);
            }
        }
    } finally {
        consumer.close();
    }
}

}

Here is metrics config (I use 'ryantenney/metrics-spring' lib):

@Configuration
@EnableMetrics(proxyTargetClass = true)
@ComponentScan(basePackages = "com.inq")
public class SsvpApiMetricsConfig extends MetricsConfigurerAdapter {
  @Resource
  private MetricRegistry registry;

  @PostConstruct()
  public void init() {
    configureReporters(registry);
  }

  @Override
  public void configureReporters(MetricRegistry metricRegistry) {

    registerReporter(JmxReporter.forRegistry(metricRegistry).
       inDomain("com.inq.metrics").build()).start();
  }
}

As result, I see that Consumer bean is proxied but the ConsumerEventsHandler contains reference on the plain bean, cause when it was creating inside Consumer's @PostConstruct method through 'new ConsumerEventsHandler(..)' Consumer bean wasn't yet proxied, I assume that @PostConstruct is called before proxing things.

The only workaround I see is to get Consumer bean ref through ApplicationContext#getBean every time inside #run method instead of storing the ref inside ConsumerEventsHandler class variable. Are there any other solutions?

1

1 Answers

1
votes

Just create a static on your non spring class

public class ConsumerEventsHandler implements Runnable {

private static Consumer consumer;
private List<String> topics;
protected final KafkaConsumer<String, String> kafkaConsumer;

public ConsumerEventsHandler(List<String> topics) {
    this.topics = topics;
    this.kafkaConsumer = new KafkaConsumer<>();
}

public static setConsumer(Consumer consumer) {
    ConsumerEventsHandler.consumer = consumer;
}

Then on your spring side you can have :

  @PostConstruct()
  public void init() {
    configureReporters(registry);
    ConsumerEventsHandler.setConsumer(consumer);
  }