I'm writing an Apache Beam project, which is written is Java, using Scala and not able to identify the syntax required to use the Filter.by functionality. Here's an example of something I've tried,
class Test extends SerializableFunction[String, Boolean] {
def apply(m: String): Boolean = true
}
val pipeline = Pipeline.create();
pipeline.apply(
KafkaIO.read[String,String]()
.withBootstrapServers("localhost:9092")
.withTopic("test-topic")
.withKeyDeserializer(classOf[StringDeserializer])
.withValueDeserializer(classOf[StringDeserializer])
.withoutMetadata()
)
.apply(Values.create())
.apply(Filter.by((m: String) => true))
//And I've tried this
.apply(Filter.by(new Test()))
which give me the following error,
[error] example.scala:61:19: overloaded method value by with alternatives:
[error] [T, PredicateT <: org.apache.beam.sdk.transforms.SerializableFunction[T,Boolean]](predicate: PredicateT)org.apache.beam.sdk.transforms.Filter[T] <and>
[error] [T, PredicateT <: org.apache.beam.sdk.transforms.ProcessFunction[T,Boolean]](predicate: PredicateT)org.apache.beam.sdk.transforms.Filter[T]
[error] cannot be applied to (com.example.Test)
[error] .apply(Filter.by(new Test()))
[error] ^
[error] one error found
The documentation for Filter.by is here https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/transforms/Filter.html#by-PredicateT-