0
votes

In my work there are different environments (Development, Preproduction and Production) and in each environment we have certain tables in its Hive metastore. My user has permissions to access and query all these metastores via beeline, but I want to access to these metastores in a spark-shell session using sqlContext (or HiveContext).

For example, when I access to Preproduction environment using ssh and if I start a spark-shell session, it automatically creates a sqlContext variable with which I can perform queries to Preproduction metastore.

I also can perform queries to Production metastore from Preproduction metastore using beeline, so I tried to change some of the configuration in Hive ( How to connect to a Hive metastore programmatically in SparkSQL?). I changed following properties:

hive.metastore.uris and hive.server2.authentication.kerberos.principal to the correspondent properties in Production environment.

My code in a spark-shell:

   System.setProperty("hive.server2.authentication.kerberos.principal","hive/URL@URL2")
    System.setProperty("hive.metastore.uris","thrift://URLTOPRODUCTION:9083")
    import org.apache.spark.sql.hive.HiveContext
    val hive=new HiveContext(sc)
    val df=hive.sql("SELECT * FROM DB.Table limit 10")

But when I execute the last sentence of the previous code block, I'm getting following error.

java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1

    at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:406)

    at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)

    at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)

    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:762)

    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:693)

    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)

    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)

    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)

    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)

    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)

    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)

    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$1.apply(interfaces.scala:449)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$1.apply(interfaces.scala:447)

    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)

    at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489)

    at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487)

    at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:398)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)

    at scala.Option.getOrElse(Option.scala:120)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202)

    at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)

    at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)

    at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:39)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$12.apply(HiveMetastoreCatalog.scala:504)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$12.apply(HiveMetastoreCatalog.scala:503)

    at scala.Option.getOrElse(Option.scala:120)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$convertToParquetRelation(HiveMetastoreCatalog.scala:503)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:565)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:545)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)

    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)

    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

    at scala.collection.Iterator$class.foreach(Iterator.scala:727)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)

    at scala.collection.AbstractIterator.to(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)

    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

    at scala.collection.Iterator$class.foreach(Iterator.scala:727)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)

    at scala.collection.AbstractIterator.to(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$.apply(HiveMetastoreCatalog.scala:545)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$.apply(HiveMetastoreCatalog.scala:539)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)

    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)

    at scala.collection.immutable.List.foldLeft(List.scala:84)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)

    at scala.collection.immutable.List.foreach(List.scala:318)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)

    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:37)

    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:37)

    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:35)

    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)

    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)

    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:829)

    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)

    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)

    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)

    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)

    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:41)

    at $iwC$$iwC$$iwC.<init>(<console>:43)

    at $iwC$$iwC.<init>(<console>:45)

    at $iwC.<init>(<console>:47)

    at <init>(<console>:49)

    at .<init>(<console>:53)

    at .<clinit>(<console>)

    at .<init>(<console>:7)

    at .<clinit>(<console>)

    at $print(<console>)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)

    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)

    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)

    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)

    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)

    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)

    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)

    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)

    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)

    at org.apache.spark.repl.Main$.main(Main.scala:35)

    at org.apache.spark.repl.Main.main(Main.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)

    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)

    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)

    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)

    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

   Caused by: java.net.UnknownHostException: nameservice1

    ... 141 more

I'm using a Cloudera distribution with Spark 1.6.0 and Scala 2.10.5.

Any idea to solve it? Thanks in advance

1
BTW a property starting with hive.server2. has no effect whatsoever on the Metastore. I.e. that's not what you need.Samson Scharfrichter
Did you add the relevant Hadoop conf files in a directory in the CLASSPATH (or in a dir matching $HADOOP_CONF_DIR) starting with core-site.xml and hdfs-site.xml ?Samson Scharfrichter
Yes, my spark-shell session is taking these files because when I checked the configuration of the HiveContext that I have created, it has all the properties defined in these files. LIke I said, this is working fine for preproduction environment, but I get this error when I try to link to the production metastore.AngryCoder

1 Answers

0
votes

Finally, after I was reviewing the configuration of the sqlContext variable that spark-shell creates automatically in the server, I have seen that there a lot of url's and configuration variables and that I don't have permissions in HDFS or in other servers that I need to perform queries on PROD metastore.

As I know that querying PROD metastore with beeline works, I know that I can query this metastore via JDBC so I took the JDBC URL of the call to beeline.

I then use this JDBC URL and start using native Java (from Scala) methods and utilities to connect with a DB vía JDBC:

/*We will need hive-jdbc-0.14.0.jar to connect to a Hive metastore via JDBC */
import java.sql.ResultSetMetaData
import java.sql.{DriverManager, Connection, Statement, ResultSet}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
/* In the following lines I connect to Prod Metastore via JDBC and I execute the query as if I am connecting to a simple DB. Notice that, using this method, you are not using distributed computing */
val url="jdbc:hive2://URL_TO_PROD_METASTORE/BD;CREDENTIALS OR URL TO KERBEROS"
val query="SELECT * FROM BD.TABLE LIMIT 100"
val driver="org.apache.hive.jdbc.HiveDriver"
Class.forName(driver).newInstance
val conn: Connection = DriverManager.getConnection(url)
val r: ResultSet = conn.createStatement.executeQuery(query)
val list =scala.collection.mutable.MutableList[Row]()
/* Now we want to get all the values from all the columns. Notice that I creat a ROW object for each row of the results. Then I add each Row to a MutableList*/
while(r.next()){
  var value : Array[String] = new Array[String](r.getMetaData.getColumnCount())
  for(i<-1 to r.getMetaData.getColumnCount()){
  value(i-1) = r.getString(i)}
  list+=Row.fromSeq(value)}

/* Now we have the results of the query to PROD metastore and we want to transform this data to a Dataframe so we have to create a StructType with the names of the columns and we also need a list of rows with previous results */
var array : Array[StructField] = new Array[StructField] (r.getMetaData.getColumnCount())
for(i<- 1 to r.getMetaData.getColumnCount){
 array(i-1) =StructField(r.getMetaData.getColumnName(i),StringType)}
val struct=StructType(array)
val rdd=sc.parallelize(list)
val df=sqlContext.createDataFrame(rdd,struct)
r.close
conn.close

Notice that this question is related to one of my other answers. Because the best way of export the results of a Hive query to a CSV is using Spark (How to export a Hive table into a CSV file?). Is for that I want to query Prod metastore from a Spark session in PRE server.