Looking for apache-spark Keywords? Try Ask4Keywords

apache-sparkDémarrer avec apache-spark


Remarques

Apache Spark est une infrastructure de traitement de données volumineuses open source construite autour de la vitesse, de la facilité d’utilisation et d’analyses sophistiquées. Un développeur doit l'utiliser lorsqu'il traite une grande quantité de données, ce qui implique généralement des limitations de mémoire et / ou un temps de traitement prohibitif.


Il devrait également mentionner tous les grands sujets dans apache-spark, et établir un lien avec les sujets connexes. La documentation de apache-spark étant nouvelle, vous devrez peut-être créer des versions initiales de ces rubriques connexes.

Versions

Version Date de sortie
2.2.0 2017-07-11
2.1.1 2017-05-02
2.1.0 2016-12-28
2.0.1 2016-10-03
2.0.0 2016-07-26
1.6.0 2016-01-04
1.5.0 2015-09-09
1.4.0 2015-06-11
1.3.0 2015-03-13
1.2.0 2014-12-18
1.1.0 2014-09-11
1.0.0 2014-05-30
0.9.0 2014-02-02
0.8.0 2013-09-25
0.7.0 2013-02-27
0.6.0 2012-10-15

Vérifier la version Spark

En spark-shell :

sc.version
 

Généralement dans un programme:

SparkContext.version
 

En utilisant spark-submit :

 spark-submit --version
 

introduction

Prototype :

agrégat (valeur zéro, seqOp, combOp)

Description :

aggregate() vous permet de prendre un RDD et de générer une valeur unique d'un type différent de ce qui était stocké dans le RDD d'origine.

Paramètres :

  1. zeroValue : La valeur d'initialisation, pour votre résultat, au format souhaité.
  2. seqOp : opération que vous souhaitez appliquer aux enregistrements RDD. Fonctionne une fois pour chaque enregistrement dans une partition.
  3. combOp : définit comment les objets résultants (un pour chaque partition) sont combinés.

Exemple :

Calculez la somme d'une liste et la longueur de cette liste. Renvoie le résultat dans une paire de (sum, length) .

Dans un shell Spark, créez une liste de 4 éléments, avec 2 partitions :

listRDD = sc.parallelize([1,2,3,4], 2)
 

Ensuite, définissez seqOp :

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
 

Ensuite, définissez combOp :

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
 

Puis agrégé:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
 

La première partition a la sous-liste [1, 2]. Ceci applique le seqOp à chaque élément de cette liste, ce qui produit un résultat local - Une paire de (sum, length) qui reflétera le résultat localement, uniquement dans cette première partition.

local_result obtient initialisé à la zeroValue paramètre aggregate() a été fourni avec. Par exemple, (0, 0) et list_element sont le premier élément de la liste:

0 + 1 = 1
0 + 1 = 1
 

Le résultat local est (1, 1), ce qui signifie que la somme est 1 et la longueur 1 pour la 1ère partition après avoir traité uniquement le premier élément. local_result est mis à jour de (0, 0) à (1, 1).

1 + 2 = 3
1 + 1 = 2
 

Le résultat local est maintenant (3, 2), qui sera le résultat final de la 1ère partition, car ils ne sont pas d’autres éléments dans la sous-liste de la 1ère partition. Faire la même chose pour les retours de 2ème partition (7, 2).

Appliquez combOp à chaque résultat local pour former le résultat final global:

(3,2) + (7,2) = (10, 4)
 

Exemple décrit dans 'figure':

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)
 

Transformation vs Action

Spark utilise une évaluation paresseuse ; cela signifie qu’il ne fera aucun travail, à moins que ce soit vraiment nécessaire. Cette approche nous permet d'éviter l'utilisation inutile de la mémoire, ce qui nous permet de travailler avec des données volumineuses.

Une transformation est évaluée paresseuse et le travail réel se produit lorsqu'une action se produit.

Exemple:

In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example
 

Donc, dans [1] nous avons dit à Spark de lire un fichier dans un RDD, nommé lines . Spark nous a entendus et nous a dit: "Oui, je le ferai", mais en fait, il n'a pas encore lu le fichier.

Dans [2], nous filtrons les lignes du fichier, en supposant que son contenu contient des lignes avec des erreurs marquées d'une error au début. Nous disons donc à Spark de créer un nouveau RDD, appelé des errors , qui aura les éléments des lines RDD, qui comportaient le mot error au début.

Maintenant, dans [3] , nous demandons à Spark de compter les erreurs , c'est-à-dire de compter le nombre d'éléments que le RDD appelle des errors . count() est une action qui ne laisse pas le choix à Spark, mais à effectuer l'opération, afin qu'elle puisse trouver le résultat de count() , qui sera un entier.

Par conséquent, lorsque [3] est atteint, [1] et [2] seront effectivement exécutés, c’est-à-dire que lorsque nous atteignons [3] , alors et seulement alors:

  1. le fichier va être lu dans textFile() (à cause de [1] )

  2. lines seront filter() 'ed (à cause de [2] )

  3. count() s'exécutera, à cause de [3]


Astuce de débogage: Puisque Spark ne fera pas de travail réel avant d'avoir atteint [3] , il est important de comprendre que si une erreur existe dans [1] et / ou [2] , elle n'apparaîtra pas tant que l'action dans [3] déclenche le travail de Spark. Par exemple, si vos données dans le fichier ne supportent pas le startsWith() j'ai utilisé, alors [2] sera correctement accepté par Spark et ne provoquera aucune erreur, mais quand [3] sera soumis et que Spark sera effectivement évalue à la fois [1] et [2] , alors et seulement alors il comprendra que quelque chose n'est pas correct avec [2] et produit une erreur descriptive.

En conséquence, une erreur peut être déclenchée lorsque [3] est exécuté, mais cela ne signifie pas que l'erreur doit se trouver dans l'instruction de [3] !

Notez que ni les lines ni les errors ne seront stockées en mémoire après [3] . Ils continueront à exister uniquement comme un ensemble d'instructions de traitement. Si plusieurs actions sont effectuées sur l'un de ces RDD, spark lit et filtre les données plusieurs fois. Pour éviter les opérations de duplication lors de l'exécution de plusieurs actions sur un seul RDD, il est souvent utile de stocker des données en mémoire à l'aide du cache .


Vous pouvez voir plus de transformations / actions dans les documents Spark .