This section provides an overview of what pyspark is, and why a developer might want to use it.
It should also mention any large subjects within pyspark, and link out to the related topics. Since the Documentation for pyspark is new, you may need to create initial versions of those related topics.
There are two methods using which you can consume data from AWS S3 bucket.
aws_config = {} # set your aws credential here
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_config['aws.secret.access.key'])
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_config['aws.secret.access.key'])
s3_keys = ['s3n/{bucket}/{key1}', 's3n/{bucket}/{key2}']
data_rdd = sc.wholeTextFiles(s3_keys)
def download_data_from_custom_api(key):
# implement this function as per your understanding (if you're new, use [boto][1] api)
# don't worry about multi-threading as each worker will have single thread executing your job
return ''
s3_keys = ['s3n/{bucket}/{key1}', 's3n/{bucket}/{key2}']
# numSlices is the number of partitions. You'll have to set it according to your cluster configuration and performance requirement
key_rdd = sc.parallelize(s3_keys, numSlices=16)
data_rdd = key_rdd.map(lambda key: (key, download_data_from_custom_api(key))
I recommend to use approach 2 because while working with approach 1, the driver downloads all the data and the workers just process it. This has following drawbacks:
Detailed instructions on getting pyspark set up or installed.
The underlying example is just the one given in the official pyspark documentation. Please click here to reach this example.
# the first step involves reading the source text file from HDFS
text_file = sc.textFile("hdfs://...")
# this step involves the actual computation for reading the number of words in the file
# flatmap, map and reduceByKey are all spark RDD functions
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
# the final step is just saving the result.
counts.saveAsTextFile("hdfs://...")