I have a Cassandra database split over multiple nodes. When querying it with Pig, the mapreduce job that is created by Pig 'crashes' on the hadoop nodes with the following exception:
2013-03-18 00:57:19,374 WARN org.apache.hadoop.mapred.Child: Error running child java.lang.RuntimeException: org.apache.thrift.TException: Message length exceeded: 674 at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.maybeInit(ColumnFamilyRecordReader.java:384) at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.computeNext(ColumnFamilyRecordReader.java:390) at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.computeNext(ColumnFamilyRecordReader.java:313) at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143) at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138) at org.apache.cassandra.hadoop.ColumnFamilyRecordReader.getProgress(ColumnFamilyRecordReader.java:103) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader.getProgress(PigRecordReader.java:169) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.getProgress(MapTask.java:514) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:539) at org.apache.hadoop.mapreduce.MapContext.nextKeyValue(MapContext.java:67) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) at org.apache.hadoop.mapred.Child.main(Child.java:249) Caused by: org.apache.thrift.TException: Message length exceeded: 674, readLength: 192 at org.apache.thrift.protocol.TBinaryProtocol.checkReadLength(TBinaryProtocol.java:393) at org.apache.thrift.protocol.TBinaryProtocol.readBinary(TBinaryProtocol.java:363) at org.apache.cassandra.thrift.Column.read(Column.java:535) at org.apache.cassandra.thrift.ColumnOrSuperColumn.read(ColumnOrSuperColumn.java:507) at org.apache.cassandra.thrift.KeySlice.read(KeySlice.java:408) at org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:12905) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) at org.apache.cassandra.thrift.Cassandra$Client.recv_get_range_slices(Cassandra.java:734) at org.apache.cassandra.thrift.Cassandra$Client.get_range_slices(Cassandra.java:718) at org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.maybeInit(ColumnFamilyRecordReader.java:346) ... 17 more
The one that stands out is org.apache.thrift.TException: Message length exceeded: 674
. The message length spit out in the exception varies every time a Pig query is fired up. From the moment the task is initialized on a hadoop node, it takes less than a second for it to trigger the exception.
Cassandra is populated with around 1GB of data. The Pig query used to cause this exception is the following:
rows = LOAD 'cassandra://[keyspace here]/[cf here]' USING org.apache.cassandra.hadoop.pig.CassandraStorage() AS([column definitions here]); testvals = foreach rows generate mycolumn.$1; names = limit testvals 57343; dump names;
Why the 57343 limit you ask? Any number under 57343 lets the Pig job finish succesfully, any number that is >= 57343 makes it crash. The Pig example that ships with Cassandra also exits on the same exception. Also, using a smaller dataset in Cassandra lets Pig finish the job succesfully.
I found some similar errors where Thrift complains about the message length, but usually this is when the maximum message length specified in cassandra.yaml is exceeded. In this case, the message length in cassandra.yaml is set to 64MB to test if it helped, but still the same exception occured. Also, the exception states that the length of the message is too long even when it is stated in the exception that the message itself is only 674 bytes in this case!
What I tried:
- Increase the
thrift_max_message_length_in_mb
andthrift_framed_transport_size_in_mb
values in cassandra.yaml - Rebuild the Thrift jar
- Drop the Cassandra keyspace, and repopulate it
Setup:
- Hadoop 1.0.4
- Cassandra 1.2.2
- Pig 0.11.0
TL;DR
Pig + Cassandra crashes on larger datasets (org.apache.thrift.TException: Message length exceeded: 674
). Smaller datasets or smaller subsets of larger dataset works fine.
edit Cassandra logs show no error. It serves out the slices as requested by the job, and the job dies while Cassandra does this.