2
votes

I need to read from a file which is under /files directory and then display the contents of that file on a websocket client. This is what I am doing in MyRouteBuilder.scala:

import org.apache.camel.component.websocket.WebsocketComponent
import org.apache.camel.{LoggingLevel, CamelContext, Exchange}
import org.apache.camel.scala.dsl.builder.ScalaRouteBuilder

/**
 * A Camel Router using the Scala DSL
 */
class MyRouteBuilder(override val context : CamelContext) extends ScalaRouteBuilder(context) {

  // an example of a Processor method
  val myProcessorMethod = (exchange: Exchange) => {
    exchange.getIn.setBody("block test")
  }

  val ws = context.getComponent("websocket", classOf[WebsocketComponent]);
  ws.setPort(8444);
  ws.setHost("127.0.0.1")
  // we can serve static resources from the classpath: or file: system
  ws.setStaticResources("classpath:.");

  "file://files?noop=true" ==> {
    setBody(convertBodyTo(classOf[String]))
    to("websocket://127.0.0.1:8444/")
  }
}

However, when I run it gives the following stacktrace:

Stacktrace

java.lang.IllegalArgumentException: Failed to send message to single connection; connetion key not set. at org.apache.camel.component.websocket.WebsocketProducer.process(WebsocketProducer.java:57) at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:141) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190) at org.apache.camel.processor.Pipeline.process(Pipeline.java:121) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190) at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:442) at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:214) at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:178) at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:174) at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [el-2) thread #0 - file://files] GenericFileOnCompletion WARN Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@72611f35 for file: GenericFile[audience.json] [el-2) thread #0 - file://files] DefaultErrorHandler ERROR Failed delivery for (MessageId: ID-vaisakh-ubuntu-45785-1455531433908-1-11 on ExchangeId: ID-vaisakh-ubuntu-45785-1455531433908-1-12). Exhausted after delivery attempt: 1 caught: java.lang.IllegalArgumentException: Failed to send message to single connection; connetion key not set.

Message History

RouteId ProcessorId Processor Elapsed (ms) [route1 ] [route1 ] [file://files?noop=true ] [ 2] [route1 ] [convertBodyTo1 ] [convertBodyTo[java.lang.String] ] [ 1] [route1 ] [setBody1 ] [setBody[{org.apache.camel.scala.ScalaExpression@f736069}] ] [ 0] [route1 ] [to1 ] [websocket://127.0.0.1:8444/ ] [ 1]

Exchange

Exchange[ Id ID-vaisakh-ubuntu-45785-1455531433908-1-12 ExchangePattern InOnly Headers {breadcrumbId=ID-vaisakh-ubuntu-45785-1455531433908-1-11, CamelFileAbsolute=false, CamelFileAbsolutePath=/home/sagar/IdeaProjects/SampleIntegrationService/files/audience.json, CamelFileContentType=text/plain, CamelFileLastModified=1455526488000, CamelFileLength=150, CamelFileName=audience.json, CamelFileNameConsumed=audience.json, CamelFileNameOnly=audience.json, CamelFileParent=files, CamelFilePath=files/audience.json, CamelFileRelativePath=audience.json, CamelRedelivered=false, CamelRedeliveryCounter=0} BodyType org.apache.camel.scala.dsl.SRouteDefinition Body SRouteDefinition(Route(route1)[[From[file://files?noop=true]] -> [ConvertBodyTo[java.lang.String], SetBody[{org.apache.camel.scala.ScalaExpression@f736069}], To[websocket://127.0.0.1:8444/]]],com.mediaiqdigital.sampleIntegrationService.MyRouteBuilder@67a27caa) ]

1

1 Answers

1
votes

Resolved the issue by setting connection key property from the header.

 from(websocketUrl) ==> {

     setProperty("wsConnKey", header("websocket.connectionkey"))
     to("file://files?noop=true")
 }

 "file://files?noop=true" ==> {

     setBody(convertBodyTo(classOf[String]))
     to(WEBSOCKET_RESPONSE_ENDPOINT)
 }

 from(WEBSOCKET_REPONSE_ENDPOINT) ==> {

     process(new Processor() {
         void process(Exchange exchange) {
             Object connectionKey = exchange.getproperty("wsConnKey");
             Object exchangeBody = exchange.getIn().getBody();
             exchange.getOut().setHeader("wsConnKey", connectionKey);
             exchange.getOut().setBody(exchangeBody);
         }
     })
    to(websocketUrl)
 }