1
votes

I tried to find an up-to-date example for this kind of problem, but unluckily didn't find one. I am trying to implement a webservice with camel that should behave like the following:

  • Camel receives input from a Rest-Endpoint either via GET or POST (api/startsearch)
  • a bean processes the input and generates a ticket-id
  • the same bean responds to client with HTTP-202 or a redirect-Status-Code including the redirect url (api/result?ticket-id=jf3298u23).
  • bean also passes the input to the activemq start-queue where the Camel route will do all its long-op processing.
  • When the route is finished, the result should be available at the redirect URL (/result?ticket-id=jf3298u23). If processing is not finished yet, it should respond with a custom status code like HTTP-299-processing.

So my route looks like this:

rest().path(apiPath).produces("application/json")
            .get(searchEndpoint)
            .to("bean:requestHandler?method=processNewSearch") // generate ticket-id and reply with 202 or 3xx
            .route().inOnly("activemq:queue:start").endRest() // put the incoming message into the start-queue where the processing starts
            .get(resultEndpoint).to("bean:requestHandler?method=returnResult"); // return 299 when processing not done or 200 + result

from("activemq:queue:start")
            .setHeader("recipients").method(new ExtractRecipients(), "extractRecipients")
            .to("activemq:queue:recipientlist");

... etc, etc... until:

from("activemq:queue:output")
            .to("bean:requestHandler?method=saveFinishedSearch");

The bean itself has three methods:

public void processNewSearch(Exchange exchange) {
    //generate ticket and stuff and finally set Header and body of the response

    exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 202);
    exchange.getOut().setBody(redirectUrl);
}

public void returnResult(Exchange exchange) {
    //handle ticket related stuff, if valid fetch result and create http response:
        exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
        exchange.getOut().setBody(searchResult);
        return;
}

public void saveFinishedSearch(Exchange exchange) {
    // get search Results from the final Message that was processing asynchronously in the background and save it
    finishedSearches.put(ticket, body);
}

I am sure this is not the proper way to reply with manually set response codes and messages, but I did not find another way to do it.

So the problem currently is that camel waits until the whole message is processed and therefore the response generated by .to("bean:requestHandler?method=processNewSearch") does nothing since it will just be put into the start queue.

How do I immediatley return a custom response with camel and let the route process the request asyncronously?

1

1 Answers

1
votes

First and foremost, you should stick to the HTTP Protocol and only trigger background tasks through POST operations. You probably don't want a crawler to trigger a long running background process via GET requests, do you?

As such, you should also make use of the Location HTTP header to return the URI of the resource further information on the current state of the process can be retrieved from. I'd also would use a common URI and not some redirection.

In your route-setup, I usually keep all route dependend things in the .route() block as well. We maintain a catalogue assembly process and a EDI message archive system that will assemble messages sent and/or received in a certain timeframe due to German law forcing clients to backup their EDI messages exchanged.

We separate between triggering a new archiving or assembly request and retrieving the current state of a request.

rest("/archives")
  .post()
    .bindingMode(RestBindingMode.json)
    .type(ArchiveRequestSettings.class)
    .consumes(MediaType.APPLICATION_JSON)
    .produces(MediaType.APPLICATION_JSON)
    .description("Invokes the generation of a new message archive for 
                 "messages matching a criteria contained in the payload")

    .route().routeId("create-archives")
      // Extract the IP address of the user who invokes the service
      .bean(ExtractClientIP.class)
      // Basic Authentication
      .bean(SpringSecurityContextLoader.class).policy(authorizationPolicy)
      // check the amount of requests received within a certain time-period
      .bean(receivedRequestFilter)
      // extract specified settings
      .bean(ExtractArchiveRequestSettings.class)
      // forward the task to the archive generation queue
      .to(SomeEndpoints.ARCHIVE_GENERATION_QUEUE)
      // return 202 Accepted response
      .bean(ReturnArchiveRequestCreatedStatus.class)
    .endRest()

  .get("/{archiveId}")
    .bindingMode(RestBindingMode.json)
    .outType(ArchiveRequestEntity.class)
    .produces(MediaType.APPLICATION_JSON)
    .description("Returns the status of the message archive generation process." 
                 + " If the process has finished this operation will return the"
                 + " link to the download location of the generated archive")

    .route().routeId("archive-status")
      // Extract the IP address of the user who invokes the service
      .bean(ExtractClientIP.class)
      // Basic Authentication
      .bean(SpringSecurityContextLoader.class).policy(authorizationPolicy)
      // check the amount of requests received within a certain time-period
      .bean(receivedRequestFilter)
      // return the current state of the task to the client. If the job is done, 
      // the response will also include a download link as wel as an MD5 hash to
      // verify the correctness of the downloaded archive
      .bean(ReturnArchiveRequestStatus.class)
    .endRest();

The ExtractArchiveRequestSettings class just performs sanity checks on the received payload and sets default values for missing fields. Afterwards the request is stored into the database and its unique identifier stored into a header.

The ArchiveRequestSetting does look like the sample below (slightly simplified)

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ArchiveRequestSettings {

  /** Specifies if sent or received messages should be included in the artifact. Setting this field
   * to 'DELIVERED' will include only delivered documents where the companyUuid of the requesting
   * user matches the documents sender identifier. Specifying this field as 'RECEIVED' will include
   * only documents whose receiver identifier matches the companyUuid of the requesting user. **/
  private String direction;

  /** The naming schema of entries within the archive **/
  private String entryPattern;

  /** The upper timestamp bound to include messages. Entries older than this value will be omitted **/
  @JsonSerialize(using = Iso8601DateSerializer.class)
  @JsonDeserialize(using = Iso8601DateDeserializer.class)
  private Date from;
  /** The lower timestamp bound to include messages. Entries younger than this value will be
   * omitted. If left empty this will include even the most recent messages. **/
  @JsonSerialize(using = Iso8601DateSerializer.class)
  @JsonDeserialize(using = Iso8601DateDeserializer.class)
  private Date till;
}

The ReturnArchiveRequestCreatedStatus class looksup the stored request entity and returns it with a 202 Accepted response.

@Handler
public void returnStatus(Exchange exchange) {

    String archiveId = exchange.getIn().getHeader(HeaderConstants.ARCHIVES_REQUEST_ID, String.class);
    ArchiveRequestEntity archive = repository.findOne(archiveId);

    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 202); // Accepted
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setHeader("Location", archiveLocationUrl + "/" + archiveId);

    msg.setBody(archive);

    exchange.setOut(msg);
}

Returning the current state of the stored request ensures that the client can check which settings actually got applied and could update them if either some default settings are inconvenient or further changes need to be applied.

The actual backing process is started by sending the exchange to a Redis queue which is consumed on a different machine. The output of this process will be an archive containing the requested files which is uploaded to a public accessible location and only the link will be stored in the request entity. Note we have a custom camel component that mimics a seda entpoint just for Redis queues. Using seda though should be enough to start a processing of the task in a different thread.

Depending on the current status of the backing process the stored request entity will be updated by the backing process. On receiving a status request (via GET) the datastore is queried for the current status and mapped to certain responses:

public class ReturnArchiveRequestStatus {

  @Resource
  private ArchiveRequestRepository repository;

  @Handler
  public void returnArchiveStatus(Exchange exchange) throws JSONException {

    String archiveId = exchange.getIn().getHeader("archiveId", String.class);

    if (StringUtils.isBlank(archiveId)) {
      badRequest(exchange);
      return;
    }

    ArchiveRequestEntity archive = repository.findOne(archiveId);
    if (null == archive) {
      notFound(archiveId, exchange);
      return;
    }

    ok(archive, exchange);
  }

  private void badRequest(Exchange exchange) throws JSONException {
    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 400);
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setFault(false);
    JSONObject json = new JSONObject();

    json.put("status", "ERROR");
    json.put("message", "No archive identifier found");

    msg.setBody(json.toString());
    exchange.setOut(msg);
  }

  private void notFound(String archiveId, Exchange exchange) throws JSONException {
    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 403);
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setFault(false);
    JSONObject json = new JSONObject();

    json.put("status", "ERROR");
    json.put("message", "Could not find pending archive process with ID " + archiveId);

    msg.setBody(json.toString());
    exchange.setOut(msg);
  }

  private void ok(UserArchiveRequestEntity archive, Exchange exchange) throws JSONException {
    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setFault(false);

    msg.setBody(archive);
    exchange.setOut(msg);
  }
}

The actual entity stored and updated through the whole process looks something along the line (simplified):

@Getter
@Setter
@Builder
@ToString
@Document(collection = "archive")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class ArchiveRequestEntity {

  /**
   * The current state of the archiving process
   */
  public enum State {
    /** The request to create an archive was cued but not yet processed **/
    QUEUED,
    /** The archive is currently under construction **/
    RUNNING,
    /** The archive was generated successfully. {@link #downloadUrl} should contain the link the
     * archive can be found **/
    FINISHED,
    /** Indicates that the archive generation failed. {@link #error} should indicate the actual
     * reason why the request failed **/
    FAILED
  }

  @Id
  @JsonIgnore
  private String id;

  /** Timestamp the process was triggered **/
  @JsonIgnore
  @Indexed(expireAfterSeconds = DEFAULT_EXPIRE_TIME)
  private Date timestamp = new Date();

  /** The identifier of the company to create the archive for **/
  private String companyUuid;

  /** The state this archive is currently in **/
  private State state = State.QUEUED;

  ...

  /** Marks the upper limit to include entries to the archive. Entries older then this field will
   * not be included in the archives while entries equal or younger than this timestamp will be
   * included unless they are younger than {@link #till} timestamp **/
  @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ssXX")
  private Date from;
  /** Marks the lower limit to include entries to the archive. Entries younger than this field will
   * not be included in the archive **/
  @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ssXX")
  private Date till;

  /** Information on why the archive creation failed **/
  private String error;

  /** The URL of the final archive to download **/
  private String downloadUrl;

  /** The MD5 Hash of the final artifact in order to guarantee clients an unmodified version of the
   * archive **/
  private String md5Hash;

  ...
}

Note that regardless of the current state of the processing status a 200 OK is returned with the current JSON representation of the processes status. The client will either see a FINISHED state with downloadUrl and md5Hash properties set or a different status with yet again different properties available.

The backing process, of course, needs to update the request status appropriately as otherwise the client would not retrieve correct information on the current status of the request.

This approach should be applicable to almost any long running processes though the internals of which information you pass along will probably differ from our scenario. Hope this helps though