0
votes

I am trying to configure DLQ in Spring Cloud Data Flow. Here is the Stream defination and how I am deploying it

  stream create --definition ":someTestTopic > custom-transform
     --spring.cloud.stream.bindings.input.consumer.headerMode=raw | log --spring.cloud.stream.bindings.input.consumer.headerMode=raw" --name ticktran


    stream deploy ticktran --properties
  "apps.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,apps.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.output.destination=test-tran,app.log.spring.cloud.stream.bindings.input.destination=test-tran,app.custom-transform.spring.cloud.stream.kafka.bindings.test-tran.consumer.enableDlq=true"

In custom-transform - processor code, I have mentioned

if(out.contains("ERROR")) {
            throw new RuntimeException("Error ");
        }

That means if message contains ERROR then RunTimeException and I want to capture those messages in DLQ. But it seems when I am running the code I am not getting any Kafka DL Queue with name test-tran.

Do i need to set more properties to enable DLQ or I need to change something in code for proper use of DLQ.

Custom Transform Code

TransformationServiceApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.hateoas.config.EnableEntityLinks;

@SpringBootApplication
@EnableEntityLinks
public class TransformationServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(TransformationServiceApplication.class, args);
    }
}

TransformationMessageEndPoint.java

@EnableBinding(Processor.class)
@MessageEndpoint
public class TransformationMessageEndpoint {

    private static final String NS = "http://openrisk.com/ingestion/";

    AtomicInteger index = new AtomicInteger(1);
    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object process(Message<?> message) {
        String out =  new String((byte[])message.getPayload());

        System.out.println("*****" + out);

        if(out.contains("ERROR")) {
            throw new RuntimeException("Error ");
        }

        return message;

    }
}

pom.xml

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.3.6.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>1.0.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <version>1.0.0.BUILD-SNAPSHOT</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud.stream.module</groupId>
            <artifactId>spring-cloud-stream-modules-test-support</artifactId>
            <version>1.0.0.BUILD-SNAPSHOT</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.jena</groupId>
            <artifactId>jena-core</artifactId>
            <version>3.1.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

Adding Module

app register --name custom-transform --type processor --uri maven://com.openrisk.openmargin:TransformationService:0.0.1-SNAPSHOT

Adding Stream

stream create --definition ":someTesstTopic > custom-transform | log " --name ticktran

Deploy Stream

stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"
2
What version of SCDF are you using? The module register command is super old; at least 6-7 months old. We have moved away from these terms long back. Please try the latest 1.1 M1 release.Sabby Anandan
With new version it is working. Thanks.Viren Balaut
Glad to hear that. Please share your final findings and/or comments and perhaps also consider marking the issue as resolved.Sabby Anandan

2 Answers

0
votes

There were few issues with your stream definition.

  • The deployment properties start with app.<app-name>., but you've apps.<app-name>. in few places.
  • The destinations are automatically created in SCDF, so having to override the defaults wouldn't be recommended. You can, however, do this when you're running standalone spring-cloud-stream applications.
  • Instead of using custom destinations, you can enable DLQ by interacting with default channels directly - see example below.

stream create --definition ":someTesssstTopic > transform | log " --name ticktran

stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"

  • The destination test-tran is not in the acceptable format when it comes to referring in the app.transform.spring.cloud.stream.kafka.bindings.<channelName>.consumer.enableDlq property.
  • Lastly, the error.<destination>.<group> topic is only created when there's an error.

We will add few DSL samples to the reference guide via: #885.

EDIT: I updated the stream definition to reflect the right deployment property prefix.

0
votes

I changed the Data flow version to 1.1 M1 release with the below mention command to create and deploy properties and it working now

stream create --definition ":someTesstTopic > transform | log " --name ticktran


stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"

Thanks Sabby Anandan