I am unable to connect to Phoenix (4.10) via Spark (2.1.0) based on the "Load as a DataFrame using the Data Source API" example on Phoenix website. I am using lastet (Phoenix 4.10) and Hbase 1.2.5. I can create a table in Hbase via Phoenix (sqlline client). The error returned within Spark is as follows:
scala> val df = sqlContext.load("org.apache.phoenix.spark",Map("table" -> "test", "zkUrl" -> "localhost:2181"))
warning: there was one deprecation warning; re-run with -deprecation for details
java.sql.SQLException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hbase.TableExistsException): SYSTEM.MUTEX
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2465)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:2382)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:76)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:2382)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:255)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:149)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:221)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:98)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(ConnectionUtil.java:57)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(ConnectionUtil.java:45)
at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getSelectColumnMetadataList(PhoenixConfigurationUtil.java:292)
at org.apache.phoenix.spark.PhoenixRDD.toDataFrame(PhoenixRDD.scala:118)
at org.apache.phoenix.spark.PhoenixRelation.schema(PhoenixRelation.scala:60)
at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:40)
at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:389)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:965)
... 50 elided
Caused by: org.apache.hadoop.ipc.RemoteException: SYSTEM.MUTEX
at org.apache.hadoop.hbase.master.procedure.CreateTableProcedure.prepareCreate(CreateTableProcedure.java:285)
at org.apache.hadoop.hbase.master.procedure.CreateTableProcedure.executeFromState(CreateTableProcedure.java:106)
at org.apache.hadoop.hbase.master.procedure.CreateTableProcedure.executeFromState(CreateTableProcedure.java:58)
at org.apache.hadoop.hbase.procedure2.StateMachineProcedure.execute(StateMachineProcedure.java:119)
at org.apache.hadoop.hbase.procedure2.Procedure.doExecute(Procedure.java:498)
at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.execProcedure(ProcedureExecutor.java:1147)
at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.execLoop(ProcedureExecutor.java:942)
at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.execLoop(ProcedureExecutor.java:895)
at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.access$400(ProcedureExecutor.java:77)
at org.apache.hadoop.hbase.procedure2.ProcedureExecutor$2.run(ProcedureExecutor.java:497)
UPDATE 1: It works fine if the SYSTEM.MUTEX table is dropped via HBase.
UPDATE 2: After dropping the SYSTEM.MUTEX table, this table gets recreated whenever a connection is made to Phoenix via the sqlContext.load(), which means that the moment another table is loaded or even if the same table is re-loaded, the same exception is thrown as it tries to recreate the SYSTEM.MUTEX table.
UPDATE 3: It seems that if you start without the SYSTEM.MUTEX table being in Hbase, it works fine for the same Spark session i.e. you can connect to as many tables as you want, however, if another Spark session is initialised, the same exception is thrown from the second Spark context.
As per the suggestion on https://issues.apache.org/jira/browse/PHOENIX-3814 (include hbase-client jar in the Spark classpath), it still comes up with the same exception.
UPDATE 4: I ended up doing a custom build of the Phoenix project. The fix is to change line no. 2427 in the class org.apache.phoenix.query.ConnectionQueryServicesImpl (phoenix-core) to if (!admin.tableExists(SYSTEM_MUTEX_NAME_BYTES)) createSysMutexTable(admin);
. Also, the load example given at https://phoenix.apache.org/phoenix_spark.html for data frame is not correct as it is based on deprecated/removed save method of the DataFrame class, rather the write method needs to be be used. See below example:
./bin/spark-shell --master local[4] --deploy-mode client --jars path_to_to/phoenix-4.10.1-HBase-1.2-SNAPSHOT-client.jar
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._
import org.apache.spark.sql.SaveMode
val sqlContext = new SQLContext(sc)
val df = sqlContext.load("org.apache.phoenix.spark",Map("table" -> "name_of_input_table_in_phoenix", "zkUrl" -> "localhost:2181"))
df.write.format("org.apache.phoenix.spark").mode(SaveMode.Overwrite).options(Map("table" -> "name_of_output_table_in_phoenix","zkUrl" -> "localhost:2181")).save()
Note that the output table should already exist in Phoenix with correct schema. Note that I am using a custom build hence the SNAPSHOT in the client jar name.