0
votes

I am trying to read the documents of a bucket using Spark SQL through spark-shell.

spark-shell --packages com.couchbase.client:spark-connector_2.11:2.2.0

import org.apache.spark.sql.SparkSession
import com.couchbase.spark.sql._
import com.couchbase.client.java.document.JsonDocument 
import com.couchbase.client.java.query.N1qlQuery
import com.couchbase.client.java.view.ViewQuery
import org.apache.spark.sql.sources.EqualTo 

// Configure Spark
val sparkConf = SparkSession.
      builder().
      appName("KeyValueExample").
      master("local[*]").
      config("spark.couchbase.nodes", "135.x.x.x").
      config("spark.couchbase.username", "Administrator").
      config("spark.couchbase.password", "password").
      config("spark.couchbase.bucket.transaction-datastore", "transaction-datastore").
      getOrCreate()

import sparkConf.implicits._
val sc = sparkConf.sparkContext
val sql = sparkConf.sqlContext

// Create a DataFrame with Schema Inference
val cc = sql.read.couchbase(EqualTo("type", "Credit Card"))

sql.read.couchbase() throws error as follows:

19/03/13 13:04:19 WARN Endpoint: [null][KeyValueEndpoint]: Authentication Failure. 19/03/13 13:04:19 WARN Endpoint: [null][KeyValueEndpoint]: Authentication Failure. 19/03/13 13:04:19 WARN Endpoint: Error during reconnect: com.couchbase.client.core.endpoint.kv.AuthenticationException: Authentication Failure at com.couchbase.client.core.endpoint.kv.KeyValueAuthHandler.checkIsAuthed(KeyValueAuthHandler.java:288) at com.couchbase.client.core.endpoint.kv.KeyValueAuthHandler.channelRead0(KeyValueAuthHandler.java:173) at com.couchbase.client.core.endpoint.kv.KeyValueAuthHandler.channelRead0(KeyValueAuthHandler.java:52) at com.couchbase.client.deps.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335) at com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335) at com.couchbase.client.deps.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312) at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286) at com.couchbase.client.deps.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335) at com.couchbase.client.deps.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335) at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1302) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342) at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646) at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581) at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498) at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460) at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748)

Please some help! Thanks in advance. :)

1
The name of your bucket is "transaction-datastore" right? Have you tried also config("spark.couchbase.bucket.transaction-datastore", "").? - deniswsrosa
Right, the name of my bucket is transaction-datastore. I have total 5 buckets. When I do config("spark.couchbase.bucket.transaction-datastore", ""), I get this following error: 19/03/13 15:02:12 WARN Endpoint: [null][KeyValueEndpoint]: Authentication Failure. 19/03/13 15:02:12 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) com.couchbase.client.java.error.InvalidPasswordException: Passwords for bucket "default" do not match. - Sarfaraz Hussain

1 Answers

1
votes

1- Create a new user/password via Security -> Add User

2- Add permissions for this user to access bucket. (If you are on a testing env, just add "Full Admin" rights)

3- In your code, do the following:

val sparkConf = SparkSession.
  builder().
  appName("KeyValueExample").
  master("local[*]").
  config("spark.couchbase.nodes", "135.x.x.x").
  config("spark.couchbase.username", "myUser").
  config("spark.couchbase.password", "myPassword").
  config("spark.couchbase.bucket.myBucketName", ""). //it must have a empty string as a parameter (backward compatibility)
  getOrCreate()