1
votes

I have kafka cluster receiving messages. message is a byte array of the zip file. zip file contains binary protobuf data files as entries. i am reading the zip file and trying to deserialize the protobuf entries, that's where my code is hitting "protocol message has invalid UTF-8,invalid tag" exceptions.

i am able to deserialize the binary protobuf file before sending it as zipped byte array to broker.

but when i zip these binary protobuf files, produce the messages to kafka, consume it and then try to deserialize the entries in zip stream, i am facing issues.

i am not sure which one is the culprit here.

since these binary protocol buffers are GZipped, zipping them again messing up things?

can someone shed some light.

Thanks

**************edit**************

Producer Side:

public byte[] getZipfileBytes() {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ZipOutputStream zipOut = new ZipOutputStream(baos);
        CheckedOutputStream checkSum = new CheckedOutputStream(zipOut, new Adler32());

        try {
            ZipEntry zipEntry = new ZipEntry(testFile.getName());
            byte[] protoBytes = IOUtils.toByteArray(new FileInputStream(testFile));
            System.out.println("bytes length:\t"+protoBytes.length);
            zipEntry.setSize(protoBytes.length);
            zipOut.putNextEntry(zipEntry);
            zipOut.write(protoBytes);
            zipOut.close();
            System.out.println("checksum:"+checkSum.getChecksum().getValue());
            zipBytes = baos.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return zipBytes;
    }



    Consumer Side:
         processConsumerRecord(ConsumerRecord<String, byte[]> record) {
                String key = record.key();
                byte[] dataPacket = record.value();

                ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(dataPacket));

                CheckedInputStream checkSum = new CheckedInputStream(zipIn,
                        new Adler32());
                ZipEntry zipEntry;
                try {
                    zipEntry = zipIn.getNextEntry();
                    while (zipEntry != null) {
                        String name = zipEntry.getName();
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                            try {
                                IOUtils.copy(zipIn, baos);
                                byte[] protoBytes = baos.toByteArray();

the binary protobuf bytes are gzipped so i need to gunzip

if i do the gunzip it throws not in gzip format.

if i skip gunzip and do the parseFrom i get invalid tag exceptions.

   GZIPInputStream gzip = new GZIPInputStream(
                        new ByteArrayInputStream(baos.toByteArray()));
                        MyProtoObject mpo = null;
                        try {
                            mpo = MyProtoObject.parseFrom(protoBytes);
                        } catch (InvalidProtocolBufferException e1) {
                            e1.printStackTrace();
                        }
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }

checkSum.getChecksum().getValue() returns 1 while producing and consuming zip byte array
following are the values of zipEntry variable during debugging:

    producer 
        zipEntry    ZipEntry  (id=44)   
        comment null    
        crc 2147247736  
        csize   86794   
        extra   null    
        flag    2056    
        method  8   
        name    "test.dat" (id=49)  
        size    92931   
        time    1214084891  


    consumer
        zipEntry    ZipEntry  (id=34)   
        comment null    
        crc 2147247736  
        csize   86794   
        extra   null    
        flag    0   
        method  8   
        name    "test.dat" (id=39)  
        size    92931   
        time    1214084891  

i even tested the other way,instead of processing the protobytes in memory,i wrote the zip file to disk, extract it manually through winzip and then deserialized the extracted binary proto file,it worked!!!

am i doing the zip/unzip wrong way, let me know

1

1 Answers

0
votes

There are two different things in play here: zip/unzip, and dealing with protobuf. It sounds like the problem here is the first one, and it sounds like it is corrupting the protobuf data. So, for now: forget about protobuf, and just concentrate on the zip/unzip. Record what the original message was (before you zipped it - perhaps as a binary file, or base-64 chunk). Now at the receiving end, track what you are getting in binary after you have unzipped it (again, binary file or base-64 chunk). If they are not absolutely 100% identical, then all other bets are off. Until you can successfully reproduce the original raw binary, protobuf doesn't stand a chance.

If this is the problem: it would be good to show your zip/unzip code, so we can see it.

If you are correctly compressing / decompressing the binary, then the problem is going to be in your protobuf code.

If this is the problem: it would be good to show your serialize/deserialize code, so we can see it.