1
votes

I added a SpringContextProcessor to my NiFi flow, it executes as expected and updates the FlowFile content and attributes. But in the data provenance section of NiFi instead of seeing SEND/RECEIVE, I am seeing

03/27/2017 11:47:57.164 MDT RECEIVE 42fa1c3f-edde-4cb7-8e73-ce752f7e3d66
03/27/2017 11:47:57.163 MDT DROP    667094a7-8eef-4657-981a-dc9fdc6c4056
03/27/2017 11:47:57.163 MDT SEND    667094a7-8eef-4657-981a-dc9fdc6c4056

Looks like the original message is being dropped and replaced by a new message. I haven't seen this behavior in other components, i.e. they all seem to preserve the original Flow File UUID. Simplified version of the Spring processor code:

@ServiceActivator(inputChannel = "fromNiFi", outputChannel = "toNiFi")
public Message<byte[]> process1(Message<byte[]> inMessage) {
    String inMessagePayload = new String(inMessage.getPayload());
    String userId = getUserIdFromDb(inMessagePayload);
    String outMessagePayload = inMessagePayload + userId;

    return MessageBuilder.withPayload(outMessagePayload.getBytes())
            .copyHeaders(inMessage.getHeaders())
            .setHeader("userId", userId)
            .build();
}

Is there a way to preserve the original Flow File UUID in the outgoing message?

1

1 Answers

1
votes

This is probably an oversight on our end, so yes please do raise a JIRA. However as a workaround you can try to extract FlowFile attributes from the incoming Message headers and then propagate them back to the outgoing message.

public Message<byte[]> process1(Message<byte[]> inMessage) {
    String myHeader = inMessage.getHeader("someHeader");
    . . .
    return MessageBuilder.withPayload(outMessagePayload.getBytes())
            .copyHeaders(inMessage.getHeaders())
            .setHeader("userId", userId)
            .setHeader("someHeader", myHeader)
            .build();
}