2
votes

I'm trying to code a streaming job which sinks a data stream into a postgres table. To give full information, I based my work on articles : https://tech.signavio.com/2017/postgres-flink-sink which propose to use JDBCOutputFormat.

My code looks like the following:

98     ... 
99     String strQuery = "INSERT INTO public.alarm (entity, duration, first, type, windowsize) VALUES (?, ?, ?, 'dur', 6)";
100
101     JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
102      .setDrivername("org.postgresql.Driver")
103      .setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
104      .setQuery(strQuery)
105      .setSqlTypes(new int[] { Types.VARCHAR, Types.INTEGER, Types.VARCHAR}) //set the types
106      .finish();
107
108     DataStream<Row> rows = FilterStream
109                 .map((tuple)-> {
110                    Row row = new Row(3);                  // our prepared statement has 3 parameters
111                    row.setField(0, tuple.f0);             // first parameter is case ID
112                    row.setField(1, tuple.f1);             // second paramater is tracehash
113                    row.setField(2, f.format(tuple.f2));   // third paramater is tracehash
114                    return row;
115                 });
116
117     rows.writeUsingOutputFormat(jdbcOutput);
118
119     env.execute();
120
121     }
122 }

My problem now is that values are inserted only when my job is stopped (to be precise, when I cancel my job from apache flink dashboard).

So my question is the following: Did I miss something ? Should I commit somewhere the rows I inserted ?

Best regards, Ignatius

2
The JDBCOutputFormat writes values in batches; the default size being 5000. You can control this parameter in the buildJDBCOutputFormat block by calling setBatchInterval(). If the input for your job is less than the interval the batch is only submitted when the sink is closed, i.e. when the job terminates. - Chesnay Schepler
Hi, your comment is in fact the answer to my question. I added ".setBatchInterval(1)" at row 106 and it perfectly works. Thank you very much - Ignatius J. Reilly

2 Answers

2
votes

As Chesnay said in his comment, you have to adapt the batch interval.

However this is not the full story. If you want to achieve at-least once results, you have to sync the batch writes with Flink's checkpoints. Basically, you have to wrap the JdbcOutputFormat in a SinkFunction that also implements the CheckpointedFunction interface. When the snapshotState() is called, you have write the batch to the database. You can have a look at this pull request that will provide this functionality in the next release.

1
votes

Fabian's answer is one way to achieve at-least-once semantics; by syncing the writes with Flink's checkpoints. However, this has the disadvantage that your Sink's data freshness is now tight to your checkpointing interval cycle.

As an alternative, you could store your tuples or rows that have (entity, duration, first) fields in Flink's own managed state so Flink takes care of checkpointing it (in other words, make your Sink's state fault-tolerant). To do that, you implements CheckpointedFunction and CheckpointedRestoring interfaces (without having to sync your writes with checkpoints. You can even executes your SQL inserts individually if you do not have to use JDBCOutputFormat). See: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state. Another solution is to implement ListCheckpointed interface only (can be used in a similar way as the deprecated CheckpointedRestoring interface, and supports list-style state redistribution).