pyspark开始使用pyspark


备注

本节概述了pyspark是什么,以及开发人员可能想要使用它的原因。

它还应该提到pyspark中的任何大型主题,并链接到相关主题。由于pyspark的文档是新的,您可能需要创建这些相关主题的初始版本。

使用PySpark从S3消耗数据

您可以使用两种方法来使用AWS S3存储桶中的数据。

  1. 使用sc.textFile(或sc.wholeTextFiles)API:此api也可用于HDFS和本地文件系统。
aws_config = {}  # set your aws credential here
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_config['aws.secret.access.key'])
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_config['aws.secret.access.key'])
s3_keys = ['s3n/{bucket}/{key1}', 's3n/{bucket}/{key2}']
data_rdd = sc.wholeTextFiles(s3_keys)
 
  1. 使用自定义API读取它(说一个boto下载器):
def download_data_from_custom_api(key):
    # implement this function as per your understanding (if you're new, use [boto][1] api)
    # don't worry about multi-threading as each worker will have single thread executing your job
    return ''

s3_keys = ['s3n/{bucket}/{key1}', 's3n/{bucket}/{key2}']
# numSlices is the number of partitions. You'll have to set it according to your cluster configuration and performance requirement
key_rdd = sc.parallelize(s3_keys, numSlices=16) 

data_rdd = key_rdd.map(lambda key: (key, download_data_from_custom_api(key))
 

我建议使用方法2,因为在使用方法1时,驱动程序会下载所有数据,而工作人员只需处理它。这有以下缺点:

  1. 随着数据大小的增加,内存不足。
  2. 您的工作人员将闲置,直到数据下载完毕

安装或设置

有关获取pyspark设置或安装的详细说明。

Pyspark中的示例字数

基础示例只是官方pyspark文档中给出的示例。请点击此处访问此示例。

# the first step involves reading the source text file from HDFS 
text_file = sc.textFile("hdfs://...")

# this step involves the actual computation for reading the number of words in the file
# flatmap, map and reduceByKey are all spark RDD functions
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

# the final step is just saving the result.
counts.saveAsTextFile("hdfs://...")