2
votes

According to the Apache Beam documentation the recommended way to write simple sources is by using Read Transforms and ParDo. Unfortunately the Apache Beam docs has let me down here.

I'm trying to write a simple unbounded data source which emits events using a ParDo but the compiler keeps complaining about the input type of the DoFn object:

message: 'The method apply(PTransform<? super PBegin,OutputT>) in the type PBegin is not applicable for the arguments (ParDo.SingleOutput<PBegin,Event>)'

My attempt:

public class TestIO extends PTransform<PBegin, PCollection<Event>> {

    @Override
    public PCollection<Event> expand(PBegin input) {
        return input.apply(ParDo.of(new ReadFn()));
    }

    private static class ReadFn extends DoFn<PBegin, Event> {
        @ProcessElement
        public void process(@TimerId("poll") Timer pollTimer) {
            Event testEvent = new Event(...);

            //custom logic, this can happen infinitely
            for(...) {
                context.output(testEvent);
            }
        }
    }
}
1
Can you tell more about the kind of source you're trying to write? It's possible that there is a much simpler way to do it.jkff
@jkff Eventually I want to poll all new records every 5 seconds from an SQL database and output them as protobuf objects. I don't think that's possible with an existing Beam source?wardva
OK if you want to look for new records in the results of a query, you should probably use the Watch transform github.com/apache/beam/blob/master/sdks/java/core/src/main/java/… - please take a look and see if it looks like what you want.jkff

1 Answers

1
votes

A DoFn performs element-wise processing. As written, ParDo.of(new ReadFn()) will have type PTransform<PCollection<PBegin>, PCollection<Event>>. Specifically, the ReadFn indicates it takes an element of type PBegin and returns 0 or more elements of type Event.

Instead, you should use an actual Read operation. There are a variety provided. You can also use Create if you have a specific set of in-memory collections to use.

If you need to create a custom source you should use the Read transform. Since you're using timers, you likely want to create an Unbounded Source (a stream of elements).