2
votes

I have a Cassandra database split over multiple nodes. When querying it with Pig, the mapreduce job that is created by Pig 'crashes' on the hadoop nodes with the following exception:

2013-03-18 00:57:19,374 WARN org.apache.hadoop.mapred.Child: Error running child
java.lang.RuntimeException: org.apache.thrift.TException: Message length exceeded: 674
        at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.maybeInit(ColumnFamilyRecordReader.java:384)
        at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.computeNext(ColumnFamilyRecordReader.java:390)
        at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.computeNext(ColumnFamilyRecordReader.java:313)
        at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
        at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
        at org.apache.cassandra.hadoop.ColumnFamilyRecordReader.getProgress(ColumnFamilyRecordReader.java:103)
        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader.getProgress(PigRecordReader.java:169)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.getProgress(MapTask.java:514)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:539)
        at org.apache.hadoop.mapreduce.MapContext.nextKeyValue(MapContext.java:67)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: org.apache.thrift.TException: Message length exceeded: 674, readLength: 192
        at org.apache.thrift.protocol.TBinaryProtocol.checkReadLength(TBinaryProtocol.java:393)
        at org.apache.thrift.protocol.TBinaryProtocol.readBinary(TBinaryProtocol.java:363)
        at org.apache.cassandra.thrift.Column.read(Column.java:535)
        at org.apache.cassandra.thrift.ColumnOrSuperColumn.read(ColumnOrSuperColumn.java:507)
        at org.apache.cassandra.thrift.KeySlice.read(KeySlice.java:408)
        at org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:12905)
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
        at org.apache.cassandra.thrift.Cassandra$Client.recv_get_range_slices(Cassandra.java:734)
        at org.apache.cassandra.thrift.Cassandra$Client.get_range_slices(Cassandra.java:718)
        at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.maybeInit(ColumnFamilyRecordReader.java:346)
        ... 17 more

The one that stands out is org.apache.thrift.TException: Message length exceeded: 674. The message length spit out in the exception varies every time a Pig query is fired up. From the moment the task is initialized on a hadoop node, it takes less than a second for it to trigger the exception.

Cassandra is populated with around 1GB of data. The Pig query used to cause this exception is the following:

rows = LOAD 'cassandra://[keyspace here]/[cf here]' USING org.apache.cassandra.hadoop.pig.CassandraStorage() AS([column definitions here]);
testvals = foreach rows generate mycolumn.$1;
names = limit testvals 57343;
dump names;

Why the 57343 limit you ask? Any number under 57343 lets the Pig job finish succesfully, any number that is >= 57343 makes it crash. The Pig example that ships with Cassandra also exits on the same exception. Also, using a smaller dataset in Cassandra lets Pig finish the job succesfully.

I found some similar errors where Thrift complains about the message length, but usually this is when the maximum message length specified in cassandra.yaml is exceeded. In this case, the message length in cassandra.yaml is set to 64MB to test if it helped, but still the same exception occured. Also, the exception states that the length of the message is too long even when it is stated in the exception that the message itself is only 674 bytes in this case!

What I tried:

  • Increase the thrift_max_message_length_in_mb and thrift_framed_transport_size_in_mb values in cassandra.yaml
  • Rebuild the Thrift jar
  • Drop the Cassandra keyspace, and repopulate it

Setup:

  • Hadoop 1.0.4
  • Cassandra 1.2.2
  • Pig 0.11.0

TL;DR Pig + Cassandra crashes on larger datasets (org.apache.thrift.TException: Message length exceeded: 674). Smaller datasets or smaller subsets of larger dataset works fine.

edit Cassandra logs show no error. It serves out the slices as requested by the job, and the job dies while Cassandra does this.

1
ConfigHelper.setThriftMaxMessageLengthInMb();user2298360

1 Answers

0
votes

If this column family is using wide rows or has lots of columns, then you might want to try passing the widerows option.

set cassandra.input.widerows true;
data = load 'cassandra://[keyspace here]/[cf here]/?widerows=true' 
            using org.apache.cassandra.hadoop.pig.CassandraStorage();