To calculate moving average of salary of the employers based on their role:
val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
.over( Window.partitionBy("Role").rowsBetween(-1,1)) )
withColumn()
creates a new column named movingAverage
, performing average
on Salary
columnover()
is used to define window specification.partitionBy()
partitions the data over the column Role
rowsBetween(start, end)
This function defines the rows that are to be included in the window. The parameters (start
and end
) takes numerical inputs,0
represents the current row, -1
is the previous row, 1
is the next row and so on. The function includes all rows in between start
and end
, thus in this example three rows(-1,0,1) are included in the window. scala> movAvg.show
+------+---------+------+------------------+
| Name| Role|Salary| movingAverage|
+------+---------+------+------------------+
| bob|Developer|125000| 116500.0|
| mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
| eric|Developer|144000|117333.33333333333|
| henry|Developer|110000| 127000.0|
| carl| Tester| 70000| 67500.0|
| jon| Tester| 65000| 72333.33333333333|
| roman| Tester| 82000| 74000.0|
|carlos| Tester| 75000| 78500.0|
+------+---------+------+------------------+
Spark automatically ignores previous and next rows,if the current row is first and last row respectively.
In the above example, movingAverage of first row is average of current & next row only, as previous row doesn't exist. Similarly the last row of the partition (i.e 6th row) is average of current & previous row, as next row doesn't exist.