pysparkAan de slag met pyspark


Opmerkingen

Deze sectie geeft een overzicht van wat pyspark is en waarom een ontwikkelaar het misschien wil gebruiken.

Het moet ook alle grote onderwerpen binnen het pyspark vermelden en een link naar de gerelateerde onderwerpen bevatten. Aangezien de Documentatie voor pyspark nieuw is, moet u mogelijk eerste versies van die gerelateerde onderwerpen maken.

Gegevens van S3 gebruiken met PySpark

Er zijn twee methoden waarmee u gegevens uit de AWS S3-bucket kunt gebruiken.

  1. API met sc.textFile (of sc.wholeTextFiles) gebruiken: deze API kan ook worden gebruikt voor HDFS en het lokale bestandssysteem.
aws_config = {}  # set your aws credential here
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_config['aws.secret.access.key'])
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_config['aws.secret.access.key'])
s3_keys = ['s3n/{bucket}/{key1}', 's3n/{bucket}/{key2}']
data_rdd = sc.wholeTextFiles(s3_keys)
 
  1. Lezen met behulp van aangepaste API (Say a boto downloader):
def download_data_from_custom_api(key):
    # implement this function as per your understanding (if you're new, use [boto][1] api)
    # don't worry about multi-threading as each worker will have single thread executing your job
    return ''

s3_keys = ['s3n/{bucket}/{key1}', 's3n/{bucket}/{key2}']
# numSlices is the number of partitions. You'll have to set it according to your cluster configuration and performance requirement
key_rdd = sc.parallelize(s3_keys, numSlices=16) 

data_rdd = key_rdd.map(lambda key: (key, download_data_from_custom_api(key))
 

Ik raad aan benadering 2 te gebruiken, omdat tijdens het werken met benadering 1 de bestuurder alle gegevens downloadt en de werknemers deze gewoon verwerken. Dit heeft de volgende nadelen:

  1. Je geheugen raakt op als de gegevens groter worden.
  2. Uw werknemers blijven inactief totdat de gegevens zijn gedownload

Installatie of instellingen

Gedetailleerde instructies voor het instellen of installeren van pyspark.

Voorbeeld Word Count in Pyspark

Het onderliggende voorbeeld is precies datgene dat in de officiële pyspark-documentatie is gegeven. Klik hier om naar dit voorbeeld te gaan.

# the first step involves reading the source text file from HDFS 
text_file = sc.textFile("hdfs://...")

# this step involves the actual computation for reading the number of words in the file
# flatmap, map and reduceByKey are all spark RDD functions
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

# the final step is just saving the result.
counts.saveAsTextFile("hdfs://...")