0
votes

I am trying to implementing streaming based data transformation from one CloudSQL table to another CloudSQL table using Windowing method (UnBounded PCollections).

My dataflow package is completed with success. However it is not keep on running when new data comes to the first table. Am I missed anything in code level in order to make it run streaming mode.

Run Command:

--project=<your project ID>  --stagingLocation=gs://<your staging bucket>
--runner=DataflowRunner  --streaming=true

Code Snippet:

PCollection<TableRow> tblRows = p.apply(JdbcIO.<TableRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
        "org.postgresql.Driver", connectionString)
        .withUsername(<UserName>).withPassword(<PWD>))
.withQuery("select id,order_number from public.tableRead")
.withCoder(TableRowJsonCoder.of())
.withRowMapper(new JdbcIO.RowMapper<TableRow>() {
    public TableRow mapRow(ResultSet resultSet) throws Exception 
    {
        TableRow result = new TableRow();
        result.set("id", resultSet.getString("id"));
        result.set("order_number", resultSet.getString("order_number"));
        return result;
    }
})
);

PCollection<TableRow> tblWindow = tblRows.apply("window 1s", Window.into(FixedWindows.of(Duration.standardMinutes(5))));

PCollection<KV<Integer,TableRow>>  keyedTblWindow=  tblRows.apply("Process", ParDo.of(new DoFn<TableRow, KV<Integer,TableRow>>() 
{
@ProcessElement
public void processElement(ProcessContext c) {

    TableRow leftRow = c.element();
    c.output(KV.of(Integer.parseInt(leftRow.get("id").toString()), leftRow) );
}}));

PCollection<KV<Integer, Iterable<TableRow>>> groupedWindow = keyedTblWindow.apply(GroupByKey.<Integer, TableRow> create());

groupedWindow.apply(JdbcIO.<KV<Integer, Iterable<TableRow>>>write()
.withDataSourceConfiguration( DataSourceConfiguration.create("org.postgresql.Driver", connectionString)
        .withUsername(<UserName>).withPassword(<PWD>))
.withStatement("insert into streaming_testing(id,order_number) values(?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, Iterable<TableRow>>>() {
    public void setParameters(KV<Integer, Iterable<TableRow>> element, PreparedStatement query)
            throws SQLException {
        Iterable<TableRow> rightRowsIterable = element.getValue();
        for (Iterator<TableRow> i = rightRowsIterable.iterator(); i.hasNext(); ) {
            TableRow mRow = (TableRow) i.next();
            query.setInt(1, Integer.parseInt(mRow.get("id").toString()));
            query.setInt(2, Integer.parseInt(mRow.get("order_number").toString()));
        }
    }
})
);
1

1 Answers

0
votes

JdbcIO just work on the snapshot of data produced at the time of execution and does not poll for changes. I suppose you have to build a custom ParDo to detect change in the database.