I want to create an RDD based on a sub set of filtered results from another RDD that has a 1 to many relationship .
in this example there is 1-n between RDD A , to RDD B, and shared_id is a shared id between RDD A and RDD B
tableA_data = spark.sql("""
SELECT shared_id, dataA
FROM Table A
""")
tableB_data = spark.sql("""
SELECT shared_id, dataB
FROM Table B
""")
combined_data = tableA_data.rdd.map(lambda x: {
'tableB_data' : tableB_data.filter(tableB_data["shared_id"] == x['shared_id']),
'tableA_data': x['dataA']
})
and when I do combined_data.take(1)
Traceback (most recent call last): File "", line 1, in File
"/usr/lib/spark/python/pyspark/rdd.py", line 205, in repr return self._jrdd.toString() File "/usr/lib/spark/python/pyspark/rdd.py", line 2532, in _jrdd self._jrdd_deserializer, profiler) File "/usr/lib/spark/python/pyspark/rdd.py", line 2434, in _wrap_function pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) File "/usr/lib/spark/python/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD pickled_command = ser.dumps(command) File "/usr/lib/spark/python/pyspark/serializers.py", line 600, in dumps raise pickle.PicklingError(msg) _pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o69.getstate. Trace: py4j.Py4JException: Method getstate([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)
EDIT: examples input\output Example input:
TableA:
{
"shared_id":1
"dataA": "A"
}
TableB:
{
"shared_id":1
"dataB": "B1"
}
TableB:
{
"shared_id":1
"dataB": "B2"
}
Wanted output:
result:
{
"tableA_data" : "A",
"tableB_data: ["B1", "B2"]
}