1
votes

I have a Dataflow pipeline (SDK 2.1.0, Apache Beam 2.2.0) which simply reads RDF (in N-Triples, so it's just text files) from GCS, transforms it somehow and writes it back to GCS, but in a different bucket. In this pipeline I employ side inputs which are three single files (one file per side input) and use them in a ParDo.

To work with RDF in Java I use Apache Jena, so each file is read into an instance of Model class. Since Dataflow doesn't have Coder for it, I developed it myself (RDFModelCoder, see below). It works fine in number of other pipelines I created.

The problem with this particular pipeline is when I add the side inputs, the execution fails with an exception indicating a corruption of the data, i.e. some garbage is added. Once I remove the side inputs, the pipeline finishes execution successfully.

The exception (it's thrown from RDFModelCoder, see below):

Caused by: org.apache.jena.atlas.RuntimeIOException: java.nio.charset.MalformedInputException: Input length = 1
    at org.apache.jena.atlas.io.IO.exception(IO.java:233)
    at org.apache.jena.atlas.io.CharStreamBuffered$SourceReader.fill(CharStreamBuffered.java:77)
    at org.apache.jena.atlas.io.CharStreamBuffered.fillArray(CharStreamBuffered.java:154)
    at org.apache.jena.atlas.io.CharStreamBuffered.advance(CharStreamBuffered.java:137)
    at org.apache.jena.atlas.io.PeekReader.advanceAndSet(PeekReader.java:235)
    at org.apache.jena.atlas.io.PeekReader.init(PeekReader.java:229)
    at org.apache.jena.atlas.io.PeekReader.peekChar(PeekReader.java:151)
    at org.apache.jena.atlas.io.PeekReader.makeUTF8(PeekReader.java:92)
    at org.apache.jena.riot.tokens.TokenizerFactory.makeTokenizerUTF8(TokenizerFactory.java:48)
    at org.apache.jena.riot.lang.RiotParsers.createParser(RiotParsers.java:57)
    at org.apache.jena.riot.RDFParserRegistry$ReaderRIOTLang.read(RDFParserRegistry.java:198)
    at org.apache.jena.riot.RDFParser.read(RDFParser.java:298)
    at org.apache.jena.riot.RDFParser.parseNotUri(RDFParser.java:288)
    at org.apache.jena.riot.RDFParser.parse(RDFParser.java:237)
    at org.apache.jena.riot.RDFParserBuilder.parse(RDFParserBuilder.java:417)
    at org.apache.jena.riot.RDFDataMgr.parseFromInputStream(RDFDataMgr.java:870)
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:268)
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:254)
    at org.apache.jena.riot.adapters.RDFReaderRIOT.read(RDFReaderRIOT.java:69)
    at org.apache.jena.rdf.model.impl.ModelCom.read(ModelCom.java:305)

And here you can see the garbage (at the end):

<http://example.com/typeofrepresentative/08> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#NamedIndividual> . ������** �����I��.�������������u�������

The pipeline:

val one = p.apply(TextIO.read().from(config.getString("source.one")))
           .apply(Combine.globally(SingleValue()))
           .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val two = p.apply(TextIO.read().from(config.getString("source.two")))
           .apply(Combine.globally(SingleValue()))
           .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val three = p.apply(TextIO.read().from(config.getString("source.three")))
             .apply(Combine.globally(SingleValue()))
             .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val sideInput = PCollectionList.of(one).and(two).and(three)
                .apply(Flatten.pCollections())
                .apply(View.asList())

p.apply(RDFIO.Read
                  .from(options.getSource())
                  .withSuffix(RDFLanguages.strLangNTriples))
 .apply(ParDo.of(SparqlConstructETL(config, sideInput))
                        .withSideInputs(sideInput))
 .apply(RDFIO.Write
                  .to(options.getDestination())
                  .withSuffix(RDFLanguages.NTRIPLES))

And just to provide the whole picture here are implementations of SingleValue and ConvertToRDFModel ParDos:

class SingleValue : SerializableFunction<Iterable<String>, String> {
    override fun apply(input: Iterable<String>?): String {
        if (input != null) {
            return input.joinToString(separator = " ")
        }
        return ""
    }
}

class ConvertToRDFModel(outputLang: Lang) : DoFn<String, Model>() {
    private val lang: String = outputLang.name

    @ProcessElement
    fun processElement(c: ProcessContext?) {
        if (c != null) {
            val model = ModelFactory.createDefaultModel()
            model.read(StringReader(c.element()), null, lang)
            c.output(model)
        }
    }
}

The implementation of RDFModelCoder:

class RDFModelCoder(private val decodeLang: String = RDFLanguages.strLangNTriples,
                    private val encodeLang: String = RDFLanguages.strLangNTriples)
    : AtomicCoder<Model>() {

    private val LOG = LoggerFactory.getLogger(RDFModelCoder::class.java)

    override fun decode(inStream: InputStream): Model {
        val bytes = StreamUtils.getBytes(inStream)
        val model = ModelFactory.createDefaultModel()

        model.read(ByteArrayInputStream(bytes), null, decodeLang) // the exception is thrown from here

        return model
    }

    override fun encode(value: Model, outStream: OutputStream?) {
        value.write(outStream, encodeLang, null)
    }

}

I checked the side input files multiple times, they're fine, they have UTF-8 encoding.

2
This not a Jena issue. The input is not valid UTF-8. PeekReader slurps quite large buffer (128K bytes) to improve byte to character conversion. Hence MalformedInputException. The length = 1 means it is an illegal one byte UTF-8 sequence in that block.AndyS

2 Answers

3
votes

Most likely the error is in the implementation of RDFModelCoder. When implementing encode/decode one has to remember that the provided InputStream and OutputStream are not exclusively owned by the current instance being encoded/decoded. E.g. there might be more data in the InputStream after the encoded form of your current Model. When using StreamUtils.getBytes(inStream) you are grabbing both data of the current encoded Model and anything else that was in the stream.

Generally when writing a new Coder it's a good idea to only combine existing Coder's rather than hand-parsing the stream: that is less error-prone. I would suggest to convert the model to/from byte[] and use ByteArrayCoder.of() to encode/decode it.

1
votes

Apache Jena provides the Elephas IO modules which have Hadoop IO support, since Beam supports Hadoop InputFormat IO you should be able to use that to read in your NTriples file.

This will likely be far more efficient since the NTriples support in Elephas is able to parallelise the IO and avoid caching the entire model into memory (in fact it won't use Model at all):

Configuration myHadoopConfiguration = new Configuration(false);

// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class",
                               NTriplesInputFormat.class, InputFormat.class);
myHadoopConfiguration.setClass("key.class", LongWritable.class, Object.class);
myHadoopConfiguration.setClass("value.class", TripleWritable.class, Object.class);
// Set any other Hadoop config you might need

// Read data only with Hadoop configuration.
p.apply("read",
         HadoopInputFormatIO.<LongWritable, TripleWritable>read()
        .withConfiguration(myHadoopConfiguration);

Of course this may require you to refactor your overall pipeline somewhat.