1
votes

I want to create an RDD based on a sub set of filtered results from another RDD that has a 1 to many relationship .

in this example there is 1-n between RDD A , to RDD B, and shared_id is a shared id between RDD A and RDD B

tableA_data = spark.sql("""
            SELECT shared_id, dataA
            FROM  Table A
            """)

tableB_data = spark.sql("""
            SELECT shared_id, dataB
            FROM  Table B
            """)   

combined_data = tableA_data.rdd.map(lambda x: {
            'tableB_data' : tableB_data.filter(tableB_data["shared_id"] == x['shared_id']),
            'tableA_data': x['dataA']
        })

and when I do combined_data.take(1)

Traceback (most recent call last): File "", line 1, in File

"/usr/lib/spark/python/pyspark/rdd.py", line 205, in repr return self._jrdd.toString() File "/usr/lib/spark/python/pyspark/rdd.py", line 2532, in _jrdd self._jrdd_deserializer, profiler) File "/usr/lib/spark/python/pyspark/rdd.py", line 2434, in _wrap_function pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) File "/usr/lib/spark/python/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD pickled_command = ser.dumps(command) File "/usr/lib/spark/python/pyspark/serializers.py", line 600, in dumps raise pickle.PicklingError(msg) _pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o69.getstate. Trace: py4j.Py4JException: Method getstate([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

EDIT: examples input\output Example input:

TableA:
{
    "shared_id":1
    "dataA": "A"
}

TableB:
{
    "shared_id":1
    "dataB": "B1"
}

TableB:
{
    "shared_id":1
    "dataB": "B2"
}

Wanted output:

result:
{
    "tableA_data" : "A",
    "tableB_data: ["B1", "B2"]

}
1
may be an idea to show some input and expected output. u should no better. does not look too good.thebluephantom
Thanks for the feedback , I've added an exampleShachaf.Gortler

1 Answers

1
votes

Not really sure of your data and why RDD over DF, but here are 2 approaches that you can tailor:

from pyspark.sql import functions as F

dfa = spark.createDataFrame([(1, "A"), (2, "C")],["shared_id", "dataA"])
dfb = spark.createDataFrame([(1, "B1"), (1, "B2"), (1, "B3"), (2, "B9"), (2, "B10") ],["shared_id", "dataB"])

df = dfa.join(dfb, on=['shared_id'], how='inner')

# OPTION 1    
df.groupby('shared_id').agg(F.collect_set('dataA').alias("tableA_Data"), F.collect_list('dataB').alias("tableB_Data")).select("tableA_Data", "tableB_Data").show()

# OPTION 2
df.groupby('shared_id', 'dataA').agg(F.collect_list('dataB').alias("tableB_Data")).select("dataA", "tableB_Data").show()

returns:

+-----------+------------+
|tableA_Data| tableB_Data|
+-----------+------------+
|        [A]|[B1, B2, B3]|
|        [C]|   [B9, B10]|
+-----------+------------+

+-----+------------+
|dataA| tableB_Data|
+-----+------------+
|    A|[B1, B2, B3]|
|    C|   [B9, B10]|
+-----+------------+