0
votes

I'm trying to set up an Apache Beam pipeline that reads from Kafka and writes to BigQuery using Apache Beam. I'm using the logic from here to filter out some coordinates: https://www.talend.com/blog/2018/08/07/developing-data-processing-job-using-apache-beam-streaming-pipeline/ TLDR: the messages in the topic are of the format id,x,y. filter out all messages where x>100 or y>100

I read the data, do couple of transforms, then define my table schema and then try to write to Bigquery. I'm not exactly sure how to call the write method. It's maybe a lack of Java Generics knowledge. I believe it should be a PCollection, but can't quiet figure it out.

Here is the pipeline code - appologies if it's considered code dump, I just want to give the whole context:

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(
                KafkaIO.<Long, String>read()
                        .withBootstrapServers(options.getBootstrap())
                        .withTopic(options.getInputTopic())
                        .withKeyDeserializer(LongDeserializer.class)
                        .withValueDeserializer(StringDeserializer.class))
        .apply(
                ParDo.of(
                        new DoFn<KafkaRecord<Long, String>, String>() {
                            @ProcessElement
                            public void processElement(ProcessContext processContext) {
                                KafkaRecord<Long, String> record = processContext.element();
                                processContext.output(record.getKV().getValue());
                            }
                        }))
        .apply(
                "FilterValidCoords",
                Filter.by(new FilterObjectsByCoordinates(options.getCoordX(), options.getCoordY())))
        .apply(
                "ExtractPayload",
                ParDo.of(
                        new DoFn<String, KV<String, String>>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) throws Exception {
                                c.output(KV.of("filtered", c.element()));
                            }
                        }));

        TableSchema tableSchema =
        new TableSchema()
                .setFields(
                        ImmutableList.of(
                                new TableFieldSchema()
                                        .setName("x_cord")
                                        .setType("STRING")
                                        .setMode("NULLABLE"),
                        new TableFieldSchema()
                                .setName("y_cord")
                                .setType("STRING")
                                .setMode("NULLABLE")

                        ));
        pipeline
                .apply(
                "Write data to BQ",
                BigQueryIO
                        .<String, KV<String, String>>write() //I'm not sure how to call this method
                        .optimizedWrites()
                        .withSchema(tableSchema)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
                        .withMethod(FILE_LOADS)
                        .to(new TableReference()
                                .setProjectId("prod-analytics-264419")
                                .setDatasetId("publsher")
                                .setTableId("beam_load_test"))
        );
1
Try doing PCollection<your-object-type> object_name = p.apply( .... and then use this object_name to do object_name.apply( "Write data to BQ", BigQueryIO [...]rmesteves

1 Answers

0
votes

You want something like this:

[..] 
pipeline.apply(BigQueryIO.writeTableRows()
        .to(String.format("%s.dataset.table", options.getProject()))
        .withCreateDisposition(CREATE_IF_NEEDED)
        .withWriteDisposition(WRITE_APPEND)
        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
        .withSchema(getTableSchema()));