0
votes

I Build a stream with sftp-dataflow as source (sftp-dataflow-source-kafka:2.1.1.RELEASE) connected with task-launcher-dataflow (task-launcher-dataflow-sink-kafka:1.0.2.RELEASE) as sink. The goal is to launch a task which is a spring-batch job.

My SCDF platform contains : Skipper-server version 2.3.0, dataflow-server 2.4.0 and kafka deployed as dockers.

The sftp source send a message to the task-launcher and the launcher have an exception :

Failed to instantiate [org.springframework.cloud.dataflow.rest.client.DataFlowOperations]: Factory method 'dataFlowOperations' threw exception; nested exception is java.lang.IllegalStateException: Incompatible version of Data Flow server detected.

All my components are in the last versions.

To create may stream:

stream create testStreamSftpDataflowlaunch1 --definition "sftp-dataflow --local-dir=/tmp/local-files1 --remote-dir=/data/docker/containers/SCDF/dockerSCDF/tmp/remote-files1 --password=docker* --allow-unknown-keys=true --username=docker --host=slnxdalicrprp03.mano.es.sopra --task-name=batchEODTask --logging.level.org.springframework.integration=DEBUG --task.launch.request.taskName=batchEODTask | task-launcher-dataflow --server-uri=http://slnxdalicrprp03******:57102" (skipper-server;port)

Other thing, I don't understand, how the launcher can execute a task because the task-name is not transmit to the launcher through the message. For example, we have the file name and others properties

message: GenericMessage [payload=/tmp/local-files1/FMO.txt, headers={file_originalFile=/tmp/local-files1/FMO.txt, id=7447a3ab-cd0e-17c2-181d-7b0f5cb85d78, file_name=FMO.txt, file_relativePath=FMO.txt, timestamp=1582106812595}] 2020-02-19 10:06:52.961 DEBUG 233 --- [ask-scheduler-1] o.s.i.e.SourcePollingChannelAdapter : Poll resulted in Message: GenericMessage [payload=/tmp/local-files1/FMO2.txt, headers={file_originalFile=/tmp/local-files1/FMO2.txt, id=b0fc3c02-a3e2-2481-aa5b-68f9db6b6973, file_name=FMO2.txt, file_relativePath=FMO2.txt, timestamp=1582106812961}] 2020-02-19 10:06:52.962 DEBUG 233 --- [ask-scheduler-1] o.s.integration.channel.DirectChannel : preSend on channel 'taskLaunchRequestChannel',

When I launch the SCDF sample in documentation with the ingestFile, we have the same problem.

Do you have an idea?

1

1 Answers

0
votes

There's a lot in this post. I will attempt to unpack a few things here.

The sftp source send a message to the task-launcher and the launcher have an exception : Failed to instantiate [org.springframework.cloud.dataflow.rest.client.DataFlowOperations]: Factory method 'dataFlowOperations' threw exception; nested exception is java.lang.IllegalStateException: Incompatible version of Data Flow server detected.

It appears the TL-Dataflow sink is bundled with rather an older SCDF client library — see tasklauncher-dataflow-app-dependencies/pom.xml#L20. You can update the version to 2.4.0.RELEASE (same as SCDF that you have got running already), and build the app and as well a local docker image for your testing/use.

Other thing, I don't understand, how the launcher can execute a task because the task-name is not transmit to the launcher through the message.

If you pay attention to TL-Dataflow docs, you will notice the input (aka payload) is accepted as a JSON with certain key/value pairs. The name is the critical bit. It is the name of the Task definition that already exists in SCDF. We have an end-to-end recipe that walk through the use-case entirely, which you can use as the reference and follow along.

Recently, we also added the enhancement to auto-create the task definition (if there's none). The name must match with an existing task-app name in the app registry — see spring-cloud-stream-app-starters/tasklauncher-dataflow#24.

I believe this covers the pressing questions on this post.