I'm trying to code a streaming job which sinks a data stream into a postgres table. To give full information, I based my work on articles : https://tech.signavio.com/2017/postgres-flink-sink which propose to use JDBCOutputFormat.
My code looks like the following:
98 ...
99 String strQuery = "INSERT INTO public.alarm (entity, duration, first, type, windowsize) VALUES (?, ?, ?, 'dur', 6)";
100
101 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
102 .setDrivername("org.postgresql.Driver")
103 .setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
104 .setQuery(strQuery)
105 .setSqlTypes(new int[] { Types.VARCHAR, Types.INTEGER, Types.VARCHAR}) //set the types
106 .finish();
107
108 DataStream<Row> rows = FilterStream
109 .map((tuple)-> {
110 Row row = new Row(3); // our prepared statement has 3 parameters
111 row.setField(0, tuple.f0); // first parameter is case ID
112 row.setField(1, tuple.f1); // second paramater is tracehash
113 row.setField(2, f.format(tuple.f2)); // third paramater is tracehash
114 return row;
115 });
116
117 rows.writeUsingOutputFormat(jdbcOutput);
118
119 env.execute();
120
121 }
122 }
My problem now is that values are inserted only when my job is stopped (to be precise, when I cancel my job from apache flink dashboard).
So my question is the following: Did I miss something ? Should I commit somewhere the rows I inserted ?
Best regards, Ignatius