apache-spark开始使用apache-spark


备注

Apache Spark是一个开源的大数据处理框架,围绕速度,易用性和复杂的分析而构建。开发人员在处理大量数据时应使用它,这通常意味着内存限制和/或处理时间过长。


它还应该提到apache-spark中的任何大型主题,并链接到相关主题。由于apache-spark的文档是新的,您可能需要创建这些相关主题的初始版本。

版本

发布日期
2.2.0 2017年7月11日
2.1.1 2017年5月2日
2.1.0 2016年12月28日
2.0.1 2016年10月3日
2.0.0 2016年7月26日
1.6.0 2016年1月4日
1.5.0 2015年9月9日
1.4.0 2015年6月11日
1.3.0 2015年3月13日
1.2.0 情节中字
1.1.0 2014年9月11日
1.0.0 二零一四年五月三十日
0.9.0 2014年2月2日
0.8.0 2013年9月25日
0.7.0 2013年2月27日
0.6.0 2012年10月15日

检查Spark版本

spark-shell

sc.version
 

通常在一个程序中:

SparkContext.version
 

使用spark-submit

 spark-submit --version
 

介绍

原型

aggregate(zeroValue,seqOp,combOp)

说明

aggregate() 允许您获取RDD并生成与原始RDD中存储的类型不同的单个值。

参数

  1. zeroValue :结果的初始化值,采用所需格式。
  2. seqOp :要应用于RDD记录的操作。对分区中的每个记录运行一次。
  3. combOp :定义结果对象(每个分区一个)的组合方式。

示例

计算列表的总和和该列表的长度。将结果返回一对(sum, length)

在Spark shell中,创建一个包含4个元素的列表,其中包含2个分区

listRDD = sc.parallelize([1,2,3,4], 2)
 

然后定义seqOp

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
 

然后定义combOp

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
 

然后汇总:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
 

第一个分区有子列表[1,2]。这将seqOp应用于该列表的每个元素,这会产生一个本地结果 - 一对(sum, length) 将在本地反映结果,仅在第一个分区中。

local_result 被初始化为zeroValue 参数,其中提供了aggregate() 。例如,(0,0)和list_element 是列表的第一个元素:

0 + 1 = 1
0 + 1 = 1
 

局部结果是(1,1),这意味着在处理第一个元素之后,和为1,第一个分区的长度为1。 local_result 从(0,0)更新为(1,1)。

1 + 2 = 3
1 + 1 = 2
 

本地结果现在是(3,2),这将是第一个分区的最终结果,因为它们不是第一个分区的子列表中的其他元素。对第二个分区执行相同操作返回(7,2)。

将combOp应用于每个本地结果以形成最终的全局结果:

(3,2) + (7,2) = (10, 4)
 

'figure'中描述的示例:

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)
 

转型与行动

Spark使用懒惰评估 ;这意味着它不会做任何工作,除非它真的必须这样做。这种方法允许我们避免不必要的内存使用,从而使我们能够处理大数据。

转换是惰性评估的,并且当动作发生时实际工作发生。

例:

In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example
 

因此,在[1] 我们告诉Spark将文件读入RDD,命名lines 。星火听到我们,对我们说:“是的,我做”,但实际上它并没有读取文件。

在[2]中,我们过滤文件的行,假设其内容包含错误的行,这些行在开始时标记为error 。因此,我们告诉Spark创建一个名为errors 的新RDD,它将包含RDD lines 的元素,这些元素在开头就有error

现在在[3] ,我们要求Spark 计算 错误 ,即计算RDD调用errors 具有的元素数量。 count() 是一个动作 ,它不选择Spark,而是实际进行操作,以便它可以找到count() 的结果,这将是一个整数。

结果,当达到[3] 时, [1][2] 实际上将被执行,即当我们达到[3] ,那么只有这样:

  1. 该文件将在textFile() 读取(因为[1]

  2. linesfilter() “ED(因为[2]

  3. count() 将执行,因为[3]


调试提示:由于Spark在达到[3] 之前不会进行任何实际工作,因此了解如果[1] 和/或[2] 存在错误,则不会出现,直到该操作进入[3] 触发Spark做实际工作。例如,如果文件中的数据不支持我使用的startsWith() ,那么[2] 将被Spark正确接受,它不会引发任何错误,但是当提交[3] 时,实际上是Spark评估[1][2] ,然后才会理解[2] 中的某些内容不正确并产生描述性错误。

因此,执行[3] 时可能会触发错误,但这并不意味着错误必须位于[3] 的语句中!

注意,在[3] 之后, lineserrors 都不会存储在存储器中。它们将继续仅作为一组处理指令存在。如果对这些RDD中的任何一个执行多个操作,spark将多次读取并过滤数据。为避免在单个RDD上执行多个操作时重复操作,使用cache 将数据存储到内存中通常很有用。


您可以在Spark文档中看到更多转换/操作。