1
votes

I'm writing an Apache Beam project, which is written is Java, using Scala and not able to identify the syntax required to use the Filter.by functionality. Here's an example of something I've tried,

    class Test extends SerializableFunction[String, Boolean] {
      def apply(m: String): Boolean = true
    }

    val pipeline = Pipeline.create();
    pipeline.apply(
      KafkaIO.read[String,String]()
        .withBootstrapServers("localhost:9092")
        .withTopic("test-topic")
        .withKeyDeserializer(classOf[StringDeserializer])
        .withValueDeserializer(classOf[StringDeserializer])
        .withoutMetadata()
    )
    .apply(Values.create())
    .apply(Filter.by((m: String) => true))
//And I've tried this
    .apply(Filter.by(new Test()))

which give me the following error,

[error] example.scala:61:19: overloaded method value by with alternatives:
[error]   [T, PredicateT <: org.apache.beam.sdk.transforms.SerializableFunction[T,Boolean]](predicate: PredicateT)org.apache.beam.sdk.transforms.Filter[T] <and>
[error]   [T, PredicateT <: org.apache.beam.sdk.transforms.ProcessFunction[T,Boolean]](predicate: PredicateT)org.apache.beam.sdk.transforms.Filter[T]
[error]  cannot be applied to (com.example.Test)
[error]     .apply(Filter.by(new Test()))
[error]                   ^
[error] one error found

The documentation for Filter.by is here https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/transforms/Filter.html#by-PredicateT-

1

1 Answers

1
votes

First, you might be interested in scio, which is cleaner and clearer to use with Scala.

Otherwise, I've successfully created a Filter.by with the Java SDK by explicitly specifying the ProcessFunction type on the lambda (tested with Beam 2.16.0):

// Using test pipeline outside of a JUnit @Rule
val pipeline = TestPipeline.create
pipeline.enableAbandonedNodeEnforcement(false)

// Applying a filter.
val predicate: ProcessFunction[String, java.lang.Boolean] = m => m.length == 3
val output = pipeline.apply(Create.of("one", "two", "three"))
  .apply(Filter.by(predicate))
PAssert.that(output).containsInAnyOrder("one", "two")

// Run the test.
pipeline.run();

(Note the java.lang.Boolean, not scala.Boolean return type.)