0
votes

On my flink script I have a stream that I'm getting from one kafka topic, manipulate it and sending it back to kafka using the sink.

    public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    Properties p = new Properties();
    p.setProperty("bootstrap.servers", servers_ip_list);
    p.setProperty("gropu.id", "Flink");

   
    FlinkKafkaConsumer<Event_N> kafkaData_N =
            new FlinkKafkaConsumer("CorID_0", new Ev_Des_Sch_N(), p);
    WatermarkStrategy<Event_N> wmStrategy =
            WatermarkStrategy
                    .<Event_N>forMonotonousTimestamps()
                    .withIdleness(Duration.ofMinutes(1))
                    .withTimestampAssigner((Event, timestamp) -> {
                        return Event.get_Time();
                    });
    DataStream<Event_N> stream_N = env.addSource(
            kafkaData_N.assignTimestampsAndWatermarks(wmStrategy));

The part above is working fine no problems at all, the part below instead is where I'm getting the issue.

    String ProducerTopic = "CorID_0_f1";

    DataStream<Stream_Blocker_Pojo.block> box_stream_p= stream_N
                .keyBy((Event_N CorrID) -> CorrID.get_CorrID())
                .map(new Stream_Blocker_Pojo());

    FlinkKafkaProducer<Stream_Blocker_Pojo.block> myProducer = new FlinkKafkaProducer<>(
                ProducerTopic,
                new ObjSerializationSchema(ProducerTopic),
                p,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance

     box_stream_p.addSink(myProducer);

No errors everything works fine, this is the Stream_Blocker_Pojo where I'm mapping a stream manipulating it and sending out a new one.(I have simplify my code, just keeping 4 variables and removing all the math and data processing).

public class Stream_Blocker_Pojo extends RichMapFunction<Event_N, Stream_Blocker_Pojo.block>
{

        public class block {
        public Double block_id;
        public Double block_var2 ;
        public Double block_var3;
        public Double block_var4;}
        
        private transient ValueState<block> state_a;
        
        @Override
        public void open(Configuration parameters) throws Exception {
            state_a = getRuntimeContext().getState(new ValueStateDescriptor<>("BoxState_a", block.class));
        }

        public block map(Event_N input) throws Exception {

        p1.Stream_Blocker_Pojo.block current_a = state_a.value();

            if (current_a == null) {
                current_a = new p1.Stream_Blocker_Pojo.block();
                current_a.block_id = 0.0;
                current_a.block_var2 = 0.0;
                current_a.block_var3 = 0.0;
                current_a.block_var4 = 0.0;}

        
            current_a.block_id = input.f_num_id;
            current_a.block_var2 = input.f_num_2;
            current_a.block_var3 = input.f_num_3;
            current_a.tblock_var4 = input.f_num_4;
          
            state_a.update(current_a);
            return new block();
        };   
    }

This is the implementation of the Kafka Serialization schema.

public class ObjSerializationSchema implements KafkaSerializationSchema<Stream_Blocker_Pojo.block>{

    private String topic;
    private ObjectMapper mapper;

    public ObjSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Stream_Blocker_Pojo.block obj, Long timestamp) {
        byte[] b = null;
        if (mapper == null) {
            mapper = new ObjectMapper();
        }
        try {
            b= mapper.writeValueAsBytes(obj);
        } catch (JsonProcessingException e) {

        }
        return new ProducerRecord<byte[], byte[]>(topic, b);
    }

}

When I open the messages that i sent from my Flink script using kafka, I find that all the variables are "null"

CorrID b'{"block_id":null,"block_var1":null,"block_var2":null,"block_var3":null,"block_var4":null}

It looks like I'm sending out an empty obj with no values. But I'm struggling to understand what I'm doing wrong. I think that the problem could be into my implementation of the Stream_Blocker_Pojo or maybe into the ObjSerializationSchema, Any help would be really appreciated. Thanks

1

1 Answers

1
votes

There are two probable issues here:

  1. Are You sure the variable You are passing of type block doesn't have null fields? You may want to debug that part to be sure.
  2. The reason may also be in ObjectMapper, You should have getters and setters available for Your block otherwise Jackson may not be able to access them.