apache-spark Getting started with apache-spark Introduction


Example

Prototype:

aggregate(zeroValue, seqOp, combOp)

Description:

aggregate() lets you take an RDD and generate a single value that is of a different type than what was stored in the original RDD.

Parameters:

  1. zeroValue: The initialization value, for your result, in the desired format.
  2. seqOp: The operation you want to apply to RDD records. Runs once for every record in a partition.
  3. combOp: Defines how the resulted objects (one for every partition), gets combined.

Example:

Compute the sum of a list and the length of that list. Return the result in a pair of (sum, length).

In a Spark shell, create a list with 4 elements, with 2 partitions:

listRDD = sc.parallelize([1,2,3,4], 2)

Then define seqOp:

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

Then define combOp:

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

Then aggregated:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

The first partition has the sublist [1, 2]. This applies the seqOp to each element of that list, which produces a local result - A pair of (sum, length) that will reflect the result locally, only in that first partition.

local_result gets initialized to the zeroValue parameter aggregate() was provided with. For example, (0, 0) and list_element is the first element of the list:

0 + 1 = 1
0 + 1 = 1

The local result is (1, 1), which means the sum is 1 and the length 1 for the 1st partition after processing only the first element. local_result gets updated from (0, 0), to (1, 1).

1 + 2 = 3
1 + 1 = 2

The local result is now (3, 2), which will be the final result from the 1st partition, since they are no other elements in the sublist of the 1st partition. Doing the same for 2nd partition returns (7, 2).

Apply combOp to each local result to form the final, global result:

(3,2) + (7,2) = (10, 4)

Example described in 'figure':

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)