0
votes

The window function first_value (equivalent first) seems to exist as the code snippet below demonstrates. The function is not listed in

https://spark.apache.org/docs/3.1.2/sql-ref-functions-builtin.html#window-functions

however it is listed in

https://spark.apache.org/docs/latest/api/sql/#first_value

in any case it seems to work as a window function

# first_value window function
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *
import pandas as pd
from time import perf_counter

# get a spark session
spark = SparkSession.builder.appName('learn').getOrCreate()

# create dataset
schema = StructType([
    StructField('c1', StringType(), nullable=True),
    StructField('c2', StringType(), nullable=True),
    StructField('value', DoubleType(), nullable=True),
])
import random
data = [(random.choice(list('ABC')), random.choice(list('abc')), random.random()) for _ in range(100)]
df = spark.createDataFrame(data, schema=schema).drop_duplicates()
df.createOrReplaceTempView('tmp_view')

# execute window function (using first() instead of first_value() gives the same result)
query ="""SELECT c1, first_value(value) OVER (PARTITION BY c1) as f FROM tmp_view"""
res = spark.sql(query)
res.drop_duplicates().show()

so the question is this a matter of omitting listing the first_value/first window function in the window function table in the documentation?

Looking at the dataframe API, it seems first_value does not exist, whilst first is not a window function but an aggregate function

import pyspark.sql.functions as f
f.first?
Signature: f.first(col, ignorenulls=False)
Docstring:
Aggregate function: returns the first value in a group.
The function by default returns the first values it sees. It will return the first non-null
value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
.. versionadded:: 1.3.0

However, it can be executed as a window function using the dataframe API:

from pyspark.sql.window import Window
w = Window.partitionBy("c1")
res = df.withColumn('f', f.first('value').over(w))
res.select(['c1', 'f']).drop_duplicates()
res.show()

It can also be used as an aggregation function

data = [('a', 3),
        ('a', 30),
        ('b', 7),
        ('b', 70)
        ]
df = spark.createDataFrame(data, ['nam', 'value'])
res = df.groupby('nam').agg(f.first(f.col('value')))
res.show()

what is going on? Is it is so that the documentation is confusing or my understanding is somehow wrong?

1

1 Answers

0
votes

check this official api and it's examples, you can define a windowing column from a dataframe column by xxx.over(w)