What you try to do seems impossible in current Spark's version. The executed query is constructed as that:
val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
(see org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD#compute)
options.table
corresponds to (SELECT ...) statement from table
attribute.
Could you explain why do you need to load the columns from a subquery ? If you make joins or other SQL operations inside this subquery, you can always "workaround" that and use Spark SQL to do that (joins, SQL operations and so on).
edit:
As you explained, the reason of your subquery use is JSONB extract. Evidently, it'll perform better as SQL native operation but if you want to use Spark to parallelize your processing, IMO you'll need to declare your JSON processing at Spark level, as here:
CREATE TABLE jsonb_test (
content jsonb
);
INSERT INTO jsonb_test (content) VALUES
('{"first_name": "X", "last_name": "Y"}');
And the code going with:
val opts = Map("url" -> "jdbc:postgresql://127.0.0.1:5432/spark_jsonb_test",
"dbtable" -> "jsonb_test", "user" -> "root", "password" -> "root",
"driver" -> "org.postgresql.Driver")
val schema = StructType(Seq(
StructField("first_name", StringType, true), StructField("last_name", StringType, true)
))
import sparkSession.implicits._
val personDataFrame = sparkSession.read
.format("jdbc")
.options(opts)
.load()
.withColumn("person", functions.from_json($"content", schema))
val extractedJsonNames = personDataFrame.collect.map(row => row.get(1).toString)
extractedJsonNames should have size 1
extractedJsonNames(0) shouldEqual "[X,Y]"
Spark supports JSONB fields throughout PostgresDialect
that in its method converting DB types to Catalyst types considers JSONB as a StringType
:
private def toCatalystType(
typeName: String,
precision: Int,
scale: Int): Option[DataType] = typeName match {
case "bool" => Some(BooleanType)
case "bit" => Some(BinaryType)
case "int2" => Some(ShortType)
case "int4" => Some(IntegerType)
case "int8" | "oid" => Some(LongType)
case "float4" => Some(FloatType)
case "money" | "float8" => Some(DoubleType)
case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" =>
Some(StringType)