1
votes

I have a dataframe as following:

+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
|  1 | 7000   | 30     |   0  |
|  2 | 0      | 9      |   0  |
|  3 | 23627  | 17     |   1  |
|  4 | 8373   | 23     |   0  |
|  5 | -0.5   | 4      |   1  |
+----+--------+--------+------+

I want to run following conditions-
1. If value is greater than 0, I want previous rows value2
2. If value is equal to 0, I want the average of previous row and next row's value2
3. If value is less than 0, then NULL
So I wrote the following code-

df = df.withColumn('value2',when(col(value1)>0,lag(col(value2))).when(col(value1)==0,\
                   (lag(col(value2))+lead(col(value2)))/2.0).otherwise(None))

What I want is that I should have the updated value when I am taking the previous and next rows' value, like following. It should go in an order of finding them, first for id-1, update it, then for id-2 take the updated value and so on.

+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
|  1 | 7000   | null   |   0  |
|  2 | 0      | 8.5    |   0  |
|  3 | 23627  | 8.5    |   1  |
|  4 | 8373   | 8.5    |   0  |
|  5 | -0.5   | null   |   1  |
+----+--------+--------+------+

I tried by just giving the id==1 in when,reassign dataframe and then again perform withcolumn,when operations.

df = df.withColumn('value2',when((col(id)==1)&(col(value1)>0,lag(col(value2)))
\.when((col(id)==1)&col(value1)==0,(lag(col(value2))+lead(col(value2)))/2.0)\
.when((col(id)==1)&col(col(value1)<0,None).otherwise(col(value2))

After this I'll get the updated column value and if I do the same operation again for id==2, I can get the updated value. But I certainly cannot do that for every id. How can I achieve this?

2
Where did you try adding id==1? - pvy4917
@karma4917 edited please take a look - Visualisation App
Did you try putting that in some loop? - pvy4917
If I have huge dataset loop is an inefficient way right - Visualisation App
can you please add expected output? - Ali Yesilli

2 Answers

0
votes
from pyspark.sql import SparkSession    
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window


spark = SparkSession \
    .builder \
    .appName('test') \
    .getOrCreate()


tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
                     StructField('value1',FloatType(),True),
                     StructField('value2',IntegerType(),True),
                     StructField('flag',IntegerType(),True)
                    ])

table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

new_column=when(col('value1')>0,cmb_value.pre_value) \
            .when(col('value1')<0,cmb_value.next_value)\
            .otherwise((cmb_value.pre_value+cmb_value.next_value)/2)


final_table=cmb_value.withColumn('value',new_column)

Above "final_table" will have field you are expecting.

0
votes

I think it will be complicated to do this entirely without looping. But you could split the work across different executors and subsets in pandas with a udf. In order for this to work there have to be enought break points (i.e., data points where the value is less than 0 and you are inserting a NULL).

Imports:

from pyspark.sql import Window
from pyspark.sql.functions import last
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
import pandas as pd
import numpy as np
from pyspark.sql.functions import col, lit, when

Input data:

df = spark.createDataFrame([[ 1, 7000.0, 30.0 ], [ 2, 0.0, 9.0], [3, 23628.0, 17.0], [4, 8373.0, 23.0], [5, -0.5, 4.0]], [ 'id', 'value1', 'value2' ]).cache()

Adding the next value2 and setting breakpoints whenever the value is smaller than 0:

dfwithnextvalue = df.alias("a").join(df.alias("b"), col("a.id") == col("b.id") - lit(1), 'left').select("a.*", col("b.value2").alias("nextvalue"))
dfstartnew = dfwithnextvalue.withColumn("startnew", when(col("value1") < lit(0), col("id")).otherwise(lit(None)))\
.withColumn("startnew", when(col("id") == lit(1), lit(1)).otherwise(col("startnew")))
window = Window.orderBy('id')
rolled = last(col('startnew'), ignorenulls=True).over(window)
dfstartnewrolled = dfstartnew.withColumn("startnew", rolled)

Now we can group by the startnew column and handle every piece in pandas. My pandas knowledge is not great, but this seems to work:

@pandas_udf("id long, value1 double, value2 double", PandasUDFType.GROUPED_MAP)
def loopdata(df):
  df = df.set_index('id').sort_index()
  for i in range(0, len(df.index)):
    if i == 0:
      df.loc[df.index[0], 'value2'] = np.nan
    elif df.loc[df.index[i], 'value1'] < 0:
      df.loc[df.index[i], 'value2'] = np.nan
    elif df.loc[df.index[i], 'value1'] > 0:
      df.loc[df.index[i], 'value2'] = df.loc[df.index[i-1], 'value2']
    else:
      nextvalue = df.loc[df.index[i], 'nextvalue']
      if pd.isna(nextvalue):
        nextvalue = 0
      prevvalue = df.loc[df.index[i-1], 'value2']
      if pd.isna(prevvalue):
        prevvalue = 0
      df.loc[df.index[i], 'value2'] = (nextvalue + prevvalue)/2.0
  df = df.drop(columns=['nextvalue', 'startnew'])
  df = df.reset_index()
  return df

Now you can compute the result:

dfstartnewrolled.groupBy("startnew").apply(loopdata)