apache-sparkAan de slag met apache-spark


Opmerkingen

Apache Spark is een open source framework voor het verwerken van big data gebaseerd op snelheid, gebruiksgemak en geavanceerde analyses. Een ontwikkelaar moet het gebruiken wanneer hij / zij een grote hoeveelheid gegevens verwerkt, wat meestal geheugenbeperkingen en / of onbetaalbare verwerkingstijd impliceert.


Het moet ook alle grote onderwerpen binnen apache-spark vermelden en een link naar de gerelateerde onderwerpen bevatten. Aangezien de documentatie voor apache-spark nieuw is, moet u mogelijk eerste versies van die gerelateerde onderwerpen maken.

versies

Versie Publicatiedatum
2.2.0 2017/07/11
2.1.1 2017/05/02
2.1.0 2016/12/28
2.0.1 2016/10/03
2.0.0 2016/07/26
1.6.0 2016/01/04
1.5.0 2015/09/09
1.4.0 2015/06/11
1.3.0 2015/03/13
1.2.0 2014/12/18
1.1.0 2014/09/11
1.0.0 2014/05/30
0.9.0 2014/02/02
0.8.0 2013/09/25
0.7.0 2013/02/27
0.6.0 2012-10-15

Controleer de Spark-versie

In spark-shell :

sc.version
 

Over het algemeen in een programma:

SparkContext.version
 

spark-submit :

 spark-submit --version
 

Invoering

Prototype :

aggregaat (zeroValue, seqOp, combOp)

Beschrijving :

aggregate() kunt u een RDD nemen en een enkele waarde genereren die van een ander type is dan wat in de oorspronkelijke RDD was opgeslagen.

Parameters :

  1. zeroValue : de initialisatiewaarde voor uw resultaat in het gewenste formaat.
  2. seqOp : de bewerking die u wilt toepassen op RDD-records. Wordt één keer uitgevoerd voor elk record in een partitie.
  3. combOp : definieert hoe de resulterende objecten (één voor elke partitie) worden gecombineerd.

Voorbeeld :

Bereken de som van een lijst en de lengte van die lijst. Retourneer het resultaat in een paar (sum, length) .

Maak in een Spark-shell een lijst met 4 elementen, met 2 partities :

listRDD = sc.parallelize([1,2,3,4], 2)
 

Definieer vervolgens seqOp :

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
 

Definieer vervolgens combOp :

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
 

Vervolgens geaggregeerd:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
 

De eerste partitie heeft de sublijst [1, 2]. Dit past de seqOp toe op elk element van die lijst, wat een lokaal resultaat oplevert - Een paar (sum, length) dat het resultaat lokaal weergeeft, alleen in die eerste partitie.

local_result wordt geïnitialiseerd naar de parameter zeroValue aggregate() werd verstrekt. (0, 0) en list_element bijvoorbeeld het eerste element van de lijst:

0 + 1 = 1
0 + 1 = 1
 

Het lokale resultaat is (1, 1), wat betekent dat de som 1 is en de lengte 1 voor de 1e partitie na verwerking van alleen het eerste element. local_result wordt bijgewerkt van (0, 0) tot (1, 1).

1 + 2 = 3
1 + 1 = 2
 

Het lokale resultaat is nu (3, 2), wat het eindresultaat van de 1e partitie zal zijn, aangezien dit geen andere elementen in de sublijst van de 1e partitie zijn. Hetzelfde doen voor retouren van de 2e partitie (7, 2).

Pas combOp toe op elk lokaal resultaat om het uiteindelijke, globale resultaat te vormen:

(3,2) + (7,2) = (10, 4)
 

Voorbeeld beschreven in 'figuur':

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)
 

Transformatie versus actie

Spark gebruikt luie evaluatie ; dat betekent dat het geen werk zal doen, tenzij het echt moet. Die aanpak stelt ons in staat om onnodig geheugengebruik te voorkomen, waardoor we met big data kunnen werken.

Een transformatie wordt lui geëvalueerd en het eigenlijke werk gebeurt wanneer een actie plaatsvindt.

Voorbeeld:

In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example
 

Dus in [1] hebben we Spark verteld een bestand in te lezen in een RDD, met de naam lines . Spark hoorde ons en zei: "Ja, ik zal het doen", maar in feite heeft het het bestand nog niet gelezen.

In [2] filteren we de regels van het bestand, ervan uitgaande dat de inhoud ervan regels bevat met fouten die in het begin zijn gemarkeerd met een error . Dus we vertellen Spark om een nieuwe RDD te maken, genaamd errors , die de elementen van de RDD- lines zal hebben, die het woord error bij hun start.

Nu in [3] , vragen we Spark om de fouten te tellen , dat wil zeggen het aantal elementen tellen dat de RDD errors heeft genoemd. count() is een actie , die Spark geen keuze laat, maar de bewerking daadwerkelijk uitvoert, zodat het resultaat van count() kan worden gevonden, wat een geheel getal zal zijn.

Als een resultaat [3] wordt bereikt, worden [1] en [2] daadwerkelijk uitgevoerd, dat wil zeggen dat wanneer we [3] , dan en alleen dan:

  1. het bestand wordt gelezen in textFile() (vanwege [1] )

  2. lines worden filter() 'ed (vanwege [2] )

  3. count() wordt uitgevoerd vanwege [3]


Debug-tip: aangezien Spark pas echt werkt als [3] is bereikt, is het belangrijk om te begrijpen dat als er een fout bestaat in [1] en / of [2] , deze niet verschijnt totdat de actie in [3] activeert Spark om daadwerkelijk werk te doen. Als uw gegevens in het bestand bijvoorbeeld geen ondersteuning bieden voor de startsWith() ik heb gebruikt, wordt [2] door Spark goed geaccepteerd en geeft dit geen fouten, maar wanneer [3] wordt ingediend en Spark daadwerkelijk evalueert zowel [1] als [2] , en dan en alleen dan zal het begrijpen dat er iets niet klopt met [2] en een beschrijvende fout produceren.

Als gevolg hiervan kan een fout worden geactiveerd wanneer [3] wordt uitgevoerd, maar dat betekent niet dat de fout in de verklaring van [3] moet liggen!

Merk op dat noch lines noch errors na [3] in het geheugen worden opgeslagen. Ze blijven alleen bestaan als een set verwerkingsinstructies. Als er meerdere acties op een van deze RDD's worden uitgevoerd, zal Spark de gegevens meerdere keren lezen en filteren. Om dubbele bewerkingen bij het uitvoeren van meerdere acties op een enkele RDD te voorkomen, is het vaak handig om gegevens in het geheugen op te slaan met behulp van de cache .


U kunt meer transformaties / acties zien in Spark-documenten .