I'm trying to stop my job with savepoint, then start it again using the same savepoint. For my case, I update my job, and create new version for it with new jar. Here is my code example;
class Reader(bla bla) {
def read() = {
val ds = readFromKafka()
transform(ds)
}
def transform(ds: DataStream[]) = {
ds.map()
}
}
object MyJob {
def run () = {
val data = new Reader().read()
data.keyBy(id).process(new MyStateFunc).uid("my-uid") // then write to kafka
}
}
In this case, i did stop job with savepoint, then start it using the same savepoint with the same jar. Then, I add a filter to my Reader like this;
class Reader(bla bla) {
def read() = {
val ds = readFromKafka()
transform(ds)
}
def transform(ds: DataStream[]) = {
ds.map().filter() // FILTER ADDED HERE
}
}
I stop my job with savepoint, it works. Then i've tried to deploy job with new version(new filter method) using the same savepoint, it can not match the operators and job does not deploys. Why?