0
votes

I was trying to send a post request to Flink Job manager running on Kubernetes cluster. While sending post request of /jar/run for a class which doesn't need any command line arguments, it works fine. But while trying to submit a different class in the same jar which requires command line arguments gives following error. -: {"errors":["Request did not match expected format JarRunRequestBody."]}'

However, while passing command line arguments and submitting job directly like following works -:

./flink run -m localhost:30287 -c com.class.name ~/path/to/jar/1.0-1.0-SNAPSHOT.jar --bootstrap.servers izac-cp-kafka:9092 --group.id test --topic bank_transaction --schema.registry http://mysr-schema-registry:8081 --CepJson """{\"keyId\": \"customer_id\",\"pattern\": [{\"patternName\": \"p1\",\"simpleCondition\":{\"columnName\": \"amount\",\"operator\": \">\",\"value\": \"50\",\"dataType\": \"Int\"}},{\"patternName\":\"p2\",\"simpleCondition\":{\"columnName\":\"amount\",\"operator\":\">\",\"value\":\"30\",\"dataType\":\"Int\"}}],\"connector\":[{\"name\":\"begin\",\"connectorType\":\"next\",\"start\":\"p1\",\"end\":\"p2\"}]}"""

To convert above command to a flink REST based post request I did the following -:

curl -k -v -X POST -H "Content-Type: application/json" --data '{    "entryClass":"com.class.name",   "programArgsList": [        "--bootstrap.servers izac-cp-kafka:9092",        "--group.id test",        "--topic bank_transaction",        "--schema.registry http://mysr-schema-registry:8081",        "--CepJson """{\"keyId\": \"customer_id\",\"pattern\": [{\"patternName\": \"p1\",\"simpleCondition\":{\"columnName\": \"amount\",\"operator\": \">\",\"value\": \"50\",\"dataType\": \"Int\"}},{\"patternName\":\"p2\",\"simpleCondition\":{\"columnName\":\"amount\",\"operator\":\">\",\"value\":\"30\",\"dataType\":\"Int\"}}],\"connector\":[{\"name\":\"begin\",\"connectorType\":\"next\",\"start\":\"p1\",\"end\":\"p2\"}]}""""]}' http://localhost:30287/jars/2a788e33-c92d-47c4-84af-31e3dff28666_1.0-1.0-SNAPSHOT.jar/run

However, this gave the error as mentioned earlier. I just wanted to convert the above command line job submission to a rest api based submission to flink cluster.

Note -: The post request is for a cluster of flink which already contains the required Jar. I only want to submit a job using a particular class.

2

2 Answers

0
votes

I don't think that curl accepts """ as a string interpolation the way scala does so it won't send correct CepJson parameter for sure, so I would start with changing that.

0
votes

I changed the above curl request to the following and it worked -:

curl -k -v -X POST -H "Content-Type: application/json" --data '{    "entryClass":"com.class.name",   "programArgsList": [        "--bootstrap.servers", "izac-cp-kafka:9092",        "--group.id"," test",        "--topic","bank_transaction",        "--schema.registry", "http://mysr-schema-registry:8081",        "--CepJson", "{\"keyId\": \"customer_id\",\"pattern\": [{\"patternName\": \"p1\",\"simpleCondition\":{\"columnName\": \"amount\",\"operator\": \">\",\"value\": \"50\",\"dataType\": \"Int\"}},{\"patternName\":\"p2\",\"simpleCondition\":{\"columnName\":\"amount\",\"operator\":\">\",\"value\":\"30\",\"dataType\":\"Int\"}}],\"connector\":[{\"name\":\"begin\",\"connectorType\":\"next\",\"start\":\"p1\",\"end\":\"p2\"}]}"]}' http://localhost:30287/jars/2a788e33-c92d-47c4-84af-31e3dff28666_1.0-1.0-SNAPSHOT.jar/run