0
votes

Do Spring Cloud Sleuth tracing headers get injected into Spring Cloud Stream Kinesis messages like they do for RabbitMq and Kafka as described here? We have a REST controller that, after processing a POST request but right before sending the response to the client, sends an Avro message to Kinesis that summarizes the transaction using Spring Cloud Stream Kinesis. I want to ensure that the trace id started in the REST controller is propagated to the Spring Cloud Stream Kinesis message, and I am unsure of how to configure this or whether it's even supported by the framework. When I log the message and headers in the microservice that processes this Avro message, I see a different trace ID (and, of course, span ID). Propagation does not appear to be configured and/or working properly across this REST controller to Kinesis message context.

We are currently using Spring Cloud Hoxton.SR10, Spring Boot 2.3.9.RELEASE, and Spring Cloud Kinesis 2.0.1.RELEASE.

As a side note, I have tried upgrading Spring Cloud Kinesis to 2.0.2.RELEASE, 2.0.3.RELEASE, and 2.0.4.RELEASE but encounter errors with each of these binder upgrades. I'm not certain what version combinations of Spring Cloud, Spring Boot, and Spring Cloud Stream Kinesis work with each other.

Example code

@PostMapping(
      value = "/accountEntry",
      consumes = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE},
      produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
  public ResponseEntity<Confirmation> createAccountEntry(
      @RequestBody final AccountEntry accountEntry,
      final HttpServletRequest request) {
    final AccountEntryEvent accountEntryEvent =
        new AccountEntryEvent(convertToAvro(accountEntry), AccountEntryEventType.CREATE_ACCOUNT_ENTRY);
    accountEntryChannels
        .accountEntryRequest()
        .send(
            MessageBuilder.withPayload(accountEntryEvent)
                .build());
    final Confirmation confirmation = buildConfirmation();
    final EntityModel<Confirmation> confirmationEntityModel =
        new EntityModel<>(confirmation);
    return new ResponseEntity<>(confirmationEntityModel, HttpStatus.ACCEPTED);
}

// with a brave.Tracer instance autowired, I'm able to obtain the trace id as follows
protected String getTraceId() {
  return tracer.currentSpan().context().traceIdString();
}
  
@Component
public interface AccountEntryChannels {
  @Output("account-entry-request")
  SubscribableChannel accountEntryRequest();
  ...
}

1
How do you do "Rest to Kinesis"? Do you really use a Binder to produce message from your custom sink? You need to be sure that binder is configured for the HeaderMode.embeddedHeaders because there is no headers abstraction in Kinesis at all...Artem Bilan
headerMode is set to embeddedHeaders by default, right? We are not explicitly setting it. So, to answer your first question, what I'm saying is that within my REST controller I write a message to the message channel, and I want these trace and span ids to propagate to my Kinesis message. So, I'm not producing messages from a sink, I'm sending messages as a step in my REST controller. So, if by default we're using embeddedHeaders, then is there a way to propagate these to my Kinesis message?Keith Bennett
Still not clear: do you use Kinesis binder on the producer side or just channel adapter subscribed to the mentioned channel?Artem Bilan
I use the Kinesis binder on the producer side.Keith Bennett
The MessageBuilder has a setHeader() API. So, just add that getTraceId() into header of the message you send! Probably there must be some kind of ChannelInterceptor to propagate such an info from the ThreadLocal into message headers, but I don't know details what you have so far in your environment. Probably you need to configure that interceptor explicitly or so...Artem Bilan

1 Answers

0
votes

If you use it through the StreamBinder or MessageChannel, it should work, if you use it in the functional way (Supplier+BlockingQueue) it should not, the functional way is not supported.