I am creating an Apache Beam streaming processing pipeline to run in GCP Dataflow. I have a number of transforms that extends DoFn and CombineFn. In DoFn logs are visualised fine using the LOGS window in the Dataflow job details. However, the logs from CombineFn transforms are not shown.
I tried different log levels and they also show fine using the DirectRunner.
Here is some sample code. I changed the input and output to String for brevity, there are some custom classes in my code.
import java.io.Serializable;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AverageSpv extends CombineFn<String, AverageSpv.Accum, String> {
private static final Logger LOG = LoggerFactory.getLogger(AverageSpv.class);
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
@Nullable String id;
}
@Override
public Accum createAccumulator() {
return new Accum();
}
@Override
public Accum addInput(Accum accumulator, String input) {
LOG.info("Add input: id {}, input);
accumulator.id = input;
return accumulator;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accumulators) {
LOG.info("Merging accumulator");
Accum merged = createAccumulator();
for (Accum accumulator : accumulators) {
merged.id = accumulator.id;
}
return merged;
}
@Override
public VehicleSpeedPerSegmentInfo extractOutput(Accum accumulator) {
LOG.info("Extracting accumulator");
LOG.info("Extract output: id {}", acummulator.id);
return acummulator.id;
}
}