2
votes

I'm using Pyspark on my local machine. I have a spark dataframe with 4.5 million rows and approximately 30,000 different stocks. I need to calculate the percentage change for each stock over time. I've already ran orderBy so that all the stocks are grouped together (as shown in the example below).

A simplified example dataframe is below.

df = spark.read.csv("stock_price.txt", header=True, inferSchema=True)
df.show()

**Company**     **Price**
Company_A         100
Company_A         103
Company_A         105
Company_A         107
Company_B          23
Company_B          25
Company_B          28
Company_B          30

My desired output would be something like this

**Company**     **Price**     **%_Change**
Company_A         100              0
Company_A         103              3%
Company_A         105              2%
Company_A         107              2%
Company_B          23              0
Company_B          25              9%
Company_B          28              12%
Company_B          30              7%

The trick (in my opinion) is setting up a code that can do two things: 1) identify each time a new stock is listed 2) start calculating percentage change on the second observation for that stock and continue calculating the percentage change until the last observation. It needs to start on the second observation since there can't be a percentage change until the second observation occurs.

2

2 Answers

3
votes

You can achieve this using window operation, ideally you would have column with id or timestamp to sort. For the sake of example, I am using company as sorting key.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

df = spark.read.csv("stock_price.txt", header=True, inferSchema=True)
price_window = Window.partitionBy("company").orderBy("company")
df = df.withColumn("prev_value", F.lag(df.price).over(price_window))
df = df.withColumn("diff", F.when(F.isnull(df.price - df.prev_value), 0).otherwise(df.price - df.prev_value))

+---------+-----+----------+----+
|  company|price|prev_value|diff|
+---------+-----+----------+----+
|Company_B|   23|      null|   0|
|Company_B|   25|        23|   2|
|Company_B|   28|        25|   3|
|Company_B|   30|        28|   2|
|Company_A|  100|      null|   0|
|Company_A|  103|       100|   3|
|Company_A|  105|       103|   2|
|Company_A|  107|       105|   2|
+---------+-----+----------+----+
1
votes

I believe Windows are the way to go here

some imports

from pyspark.sql.window import Window
from pyspark.sql.functions import lag

partitionBy('Company') keeps our stocks together. I'm ordering by Price here, but it will likely be whatever datetime you have

win = Window.partitionBy('Company').orderBy('Price')

Calc percentage changed with the help of lag which grabs the previous value in a window

df.withColumn('perc_change', (df.Price - lag(df['Price']).over(win))/100).show()

+---------+-----+-----------+
|  Company|Price|perc_change|
+---------+-----+-----------+
|Company_B|   23|       null|
|Company_B|   25|       0.02|
|Company_B|   28|       0.03|
|Company_B|   30|       0.02|
|Company_A|  100|       null|
|Company_A|  103|       0.03|
|Company_A|  105|       0.02|
|Company_A|  107|       0.02|
+---------+-----+-----------+