0
votes

I'm trying to stop my job with savepoint, then start it again using the same savepoint. For my case, I update my job, and create new version for it with new jar. Here is my code example;

class Reader(bla bla) {
   def read() = {
     val ds = readFromKafka()
     transform(ds)
   }
   def transform(ds: DataStream[]) = {
      ds.map()
   }
}

object MyJob {
  def run () = {
    val data = new Reader().read()
    data.keyBy(id).process(new MyStateFunc).uid("my-uid") // then write to kafka
  }
}

In this case, i did stop job with savepoint, then start it using the same savepoint with the same jar. Then, I add a filter to my Reader like this;

class Reader(bla bla) {
   def read() = {
     val ds = readFromKafka()
     transform(ds)
   }
   def transform(ds: DataStream[]) = {
      ds.map().filter() // FILTER ADDED HERE
   }
}

I stop my job with savepoint, it works. Then i've tried to deploy job with new version(new filter method) using the same savepoint, it can not match the operators and job does not deploys. Why?

1

1 Answers

0
votes

Unless you explicitly provide UIDs for all of your stateful operators before taking a savepoint, then after changing the topology of your job, Flink will no longer be able to figure out which state in the savepoint belongs to which operator.

I see that you have a UID on your keyed process function ("my-uid"). But you also need to have UIDs on the Kafka source and the sink, and anything else that's stateful. These UIDs need to be attached to the stateful operators themselves and need to be unique within the job (but not across all jobs). (Furthermore, each state descriptor needs to assign a name to each piece of state, using a name that is unique within the operator.)

Typically one does something like this

env
  .addSource(...)
  .name("KafkaSource")
  .uid("KafkaSource")

results.addSink(...)
    .name("KafkaSink")
    .uid("KafkaSink")

where the name() method is used to supply the text that appears in the web UI.