I am using kafka processor api (not DSL)
public class StreamProcessor implements Processor<String, String>
{
public ProcessorContext context;
public void init(ProcessorContext context)
{
this.context = context;
context.commit()
//statestore initialized with key,value
}
public void process(String key, String val)
{
try
{
String[] topicList = stateStore.get(key).split("|");
for(String topic: topicList)
{
context.forward(key,val,To.child(consumerTopic));
} // forward same message to list of topics ( 1..n topics) , rollback if write to some topics failed ?
}
}
}
Scenario : we are reading data from a source topic and stream processor writes data to multiple sink topics (topicList above) .
Question: How to implement rollback mechanism using kafka streams processor api when one or more of the topics in the topicList above fails to receive the message ? .
What I understand is processor api has rollback mechanism for each record it failed to send, or can roll back for an an entire batch of messages which failed be achieved as well? as process method in processor interface is called per record rather than per batch hence I would surmise it can only be done per record.Is this correct assumption ?, if not please suggest how to achieve per record and per batch rollbacks for failed topics using processor api.
To.all()
context.forward(key, val, To.all());
? - JavaTechnical