I am trying to use Scala XML library in Flink to parse an XML and I am not able to make it work. Please note that I need to use both the serialized and unserialized (string) version on my code in the same processing function.
I tried already different solutions, they always work in IntelliJ but not when I run them on a Flink cluster. They always return different java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser
; I tried multiple things but I still get error similar to this one.
This is an example of what my Flink Job looks like:
object StreamingJob {
import org.apache.flink.streaming.api.scala._
val l = List(
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set up kafka section excluded
val stream = env.fromCollection(l)
.map(new Processor)
This is an example of my processing function:
import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader
class Processor extends MapFunction[String, String] {
override def map(translatedMessage: String): String = {
val xml = Processor.xmlLoader.loadString(translatedMessage)
object Processor {
val factory: SAXParserFactory = SAXParserFactory.newInstance
val SAXParser: SAXParser = factory.newSAXParser
val xmlLoader: XMLLoader[Elem] = XML.withSAXParser(SAXParser)
and finally this is my pom.xml, using the maven-shade plugin to make the jar I pass to flink:
<!-- other sections of the pom are excluded -->
<!-- other sections of the pom are excluded -->
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<!-- Scala Library, provided by Flink as well. -->
<!-- other sections of the pom are excluded -->
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Run shade goal on package phase -->
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<!-- Java Compiler -->
<!-- Scala Compiler -->
<!-- Add src/main/scala to eclipse build path -->
<!-- Add src/test/scala to eclipse build path -->
<!-- other sections of the pom are excluded -->
I believe the issue is somehow related to the implementation that is going to be used for the SAXParser
that Flink is using at runtime. I also tried using the @transient
annotation to prevent persisting the fields from Flink but without success.
However I am quite confusion on what exactly is happening, anybody knows how to prevent the error and what went wrong?
? Shading usually helps agains these kind of runtime errors – Richard Deurwaarder