0
votes

I have the following code which is trying to join 2 cassandra tables in spark.

 val imageKeywords = sc.cassandraTable[ImageMetadata]("images", "metadata")
 val imageAndPageKeywords = imageKeywords
  .joinWithCassandraTable[PagesMetadata]("pages2", "metadata")
  .on(SomeColumns("tid", "url" as "pu"))

The case classes I am using to map data are as below

case class ImageMetadata(tid: String, iu: String, pu: Option[String],
mk: List[String], fk: List[String], ak: List[String], ipk: List[String], pk: List[String], ik: List[String], ck: List[String])

case class PagesMetadata(tid: String, url: String, pk: List[String], uk: List[String], hk: List[String], ok: List[String], tc: List[String])

I get an error when I try to do some operations like below

imageAndPageKeywords.collect.toList.sortBy(_._1.tid).take(10).foreach(println)

error stacktrace -

Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Invalid null value for partition key part url at com.datastax.driver.core.Responses$Error.asException(Responses.java:103) at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ... 3 more

1

1 Answers

2
votes

Simple, the exception tells you that it cannot perform the join because the column used to join ImageMetadata with PagesMetadata are null.

In your case, some url (pu) values in ImageMetadata are null.

What is strange is that you define the PagesMetadata with url nullable (Option[String]) and it seems that it is part of the table's primary key

One solution to make it work would be:

val imageAndPageKeywords = imageKeywords
  .filter(im -> im.pu.isDefined)
  .joinWithCassandraTable[PagesMetadata]("pages2", "metadata")
  .on(SomeColumns("tid", "url" as "pu"))