I have kafka cluster receiving messages. message is a byte array of the zip file. zip file contains binary protobuf data files as entries. i am reading the zip file and trying to deserialize the protobuf entries, that's where my code is hitting "protocol message has invalid UTF-8,invalid tag" exceptions.
i am able to deserialize the binary protobuf file before sending it as zipped byte array to broker.
but when i zip these binary protobuf files, produce the messages to kafka, consume it and then try to deserialize the entries in zip stream, i am facing issues.
i am not sure which one is the culprit here.
since these binary protocol buffers are GZipped, zipping them again messing up things?
can someone shed some light.
Thanks
**************edit**************
Producer Side:
public byte[] getZipfileBytes() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZipOutputStream zipOut = new ZipOutputStream(baos);
CheckedOutputStream checkSum = new CheckedOutputStream(zipOut, new Adler32());
try {
ZipEntry zipEntry = new ZipEntry(testFile.getName());
byte[] protoBytes = IOUtils.toByteArray(new FileInputStream(testFile));
System.out.println("bytes length:\t"+protoBytes.length);
zipEntry.setSize(protoBytes.length);
zipOut.putNextEntry(zipEntry);
zipOut.write(protoBytes);
zipOut.close();
System.out.println("checksum:"+checkSum.getChecksum().getValue());
zipBytes = baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return zipBytes;
}
Consumer Side:
processConsumerRecord(ConsumerRecord<String, byte[]> record) {
String key = record.key();
byte[] dataPacket = record.value();
ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(dataPacket));
CheckedInputStream checkSum = new CheckedInputStream(zipIn,
new Adler32());
ZipEntry zipEntry;
try {
zipEntry = zipIn.getNextEntry();
while (zipEntry != null) {
String name = zipEntry.getName();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
IOUtils.copy(zipIn, baos);
byte[] protoBytes = baos.toByteArray();
the binary protobuf bytes are gzipped so i need to gunzip
if i do the gunzip it throws not in gzip format.
if i skip gunzip and do the parseFrom i get invalid tag exceptions.
GZIPInputStream gzip = new GZIPInputStream(
new ByteArrayInputStream(baos.toByteArray()));
MyProtoObject mpo = null;
try {
mpo = MyProtoObject.parseFrom(protoBytes);
} catch (InvalidProtocolBufferException e1) {
e1.printStackTrace();
}
} catch (IOException e1) {
e1.printStackTrace();
}
checkSum.getChecksum().getValue() returns 1 while producing and consuming zip byte array
following are the values of zipEntry variable during debugging:
producer
zipEntry ZipEntry (id=44)
comment null
crc 2147247736
csize 86794
extra null
flag 2056
method 8
name "test.dat" (id=49)
size 92931
time 1214084891
consumer
zipEntry ZipEntry (id=34)
comment null
crc 2147247736
csize 86794
extra null
flag 0
method 8
name "test.dat" (id=39)
size 92931
time 1214084891
i even tested the other way,instead of processing the protobytes in memory,i wrote the zip file to disk, extract it manually through winzip and then deserialized the extracted binary proto file,it worked!!!
am i doing the zip/unzip wrong way, let me know