I am trying to implementing streaming based data transformation from one CloudSQL table to another CloudSQL table using Windowing method (UnBounded PCollections).
My dataflow package is completed with success. However it is not keep on running when new data comes to the first table. Am I missed anything in code level in order to make it run streaming mode.
Run Command:
--project=<your project ID> --stagingLocation=gs://<your staging bucket>
--runner=DataflowRunner --streaming=true
Code Snippet:
PCollection<TableRow> tblRows = p.apply(JdbcIO.<TableRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"org.postgresql.Driver", connectionString)
.withUsername(<UserName>).withPassword(<PWD>))
.withQuery("select id,order_number from public.tableRead")
.withCoder(TableRowJsonCoder.of())
.withRowMapper(new JdbcIO.RowMapper<TableRow>() {
public TableRow mapRow(ResultSet resultSet) throws Exception
{
TableRow result = new TableRow();
result.set("id", resultSet.getString("id"));
result.set("order_number", resultSet.getString("order_number"));
return result;
}
})
);
PCollection<TableRow> tblWindow = tblRows.apply("window 1s", Window.into(FixedWindows.of(Duration.standardMinutes(5))));
PCollection<KV<Integer,TableRow>> keyedTblWindow= tblRows.apply("Process", ParDo.of(new DoFn<TableRow, KV<Integer,TableRow>>()
{
@ProcessElement
public void processElement(ProcessContext c) {
TableRow leftRow = c.element();
c.output(KV.of(Integer.parseInt(leftRow.get("id").toString()), leftRow) );
}}));
PCollection<KV<Integer, Iterable<TableRow>>> groupedWindow = keyedTblWindow.apply(GroupByKey.<Integer, TableRow> create());
groupedWindow.apply(JdbcIO.<KV<Integer, Iterable<TableRow>>>write()
.withDataSourceConfiguration( DataSourceConfiguration.create("org.postgresql.Driver", connectionString)
.withUsername(<UserName>).withPassword(<PWD>))
.withStatement("insert into streaming_testing(id,order_number) values(?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, Iterable<TableRow>>>() {
public void setParameters(KV<Integer, Iterable<TableRow>> element, PreparedStatement query)
throws SQLException {
Iterable<TableRow> rightRowsIterable = element.getValue();
for (Iterator<TableRow> i = rightRowsIterable.iterator(); i.hasNext(); ) {
TableRow mRow = (TableRow) i.next();
query.setInt(1, Integer.parseInt(mRow.get("id").toString()));
query.setInt(2, Integer.parseInt(mRow.get("order_number").toString()));
}
}
})
);