apache-flinkAan de slag met apache-flink


Opmerkingen

Deze sectie geeft een overzicht van wat apache-flink is en waarom een ontwikkelaar het misschien wil gebruiken.

Het moet ook alle grote onderwerpen binnen apache-flink vermelden en naar de gerelateerde onderwerpen linken. Aangezien de documentatie voor apache-flink nieuw is, moet u mogelijk eerste versies van die gerelateerde onderwerpen maken.

Om een groot programma van uw IDE uit te voeren (we kunnen Eclipse of Intellij IDEA gebruiken (voorkeur)), hebt u twee afhankelijkheden nodig: flink-java / flink-scala en flink-clients (vanaf februari 2016). Deze JARS kunnen worden toegevoegd met Maven en SBT (als u scala gebruikt).

  • Maven
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.1.4</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.1.4</version>
</dependency>
 
  • SBT- naam: = ""

     version := "1.0"
     
     scalaVersion := "2.11.8"
     
     libraryDependencies ++= Seq(
       "org.apache.flink" %% "flink-scala" % "1.2.0",
       "org.apache.flink" %% "flink-clients" % "1.2.0"
     )
     

belangrijk : de 2.11 in de artefactnaam is de scala-versie , zorg ervoor dat deze overeenkomt met de versie die u op uw systeem hebt.

Lokale runtime-instelling

  1. zorg ervoor dat u Java 6 of hoger hebt en dat de omgevingsvariabele JAVA_HOME is ingesteld.

  2. download de nieuwste Flink binaire hier :

    wget flink-XXXX.tar.gz
     

    Als je niet van plan bent om met Hadoop te werken, kies dan de versie Hadoop 1. Let ook op de scala-versie die u downloadt, zodat u de juiste maven-afhankelijkheden in uw programma's kunt toevoegen.

  3. begin flink:

    tar xzvf flink-XXXX.tar.gz
    ./flink/bin/start-local.sh
     

    Flink is al geconfigureerd om lokaal te worden uitgevoerd. Om er zeker van te zijn dat flink draait, kunt u de logs in flink/log/ inspecteren of de interface van de flinke jobManager op http://localhost:8081 .

  4. stop flink:

    ./flink/bin/stop-local.sh
     

Overzicht en vereisten

Wat is Flink?

Net als Apache Hadoop en Apache Spark is Apache Flink een community-driven open source framework voor gedistribueerde Big Data Analytics. Flink is geschreven in Java en heeft API's voor Scala, Java en Python, waardoor analyse van batch- en realtime streaming mogelijk is.

Voorwaarden

  • een UNIX-achtige omgeving, zoals Linux, Mac OS X of Cygwin;
  • Java 6.X of hoger;
  • [optioneel] Maven 3.0.4 of hoger.

stack

voer hier de afbeeldingsbeschrijving in

Uitvoeringsomgevingen

Apache Flink is een gegevensverwerkingssysteem en een alternatief voor de MapReduce-component van Hadoop . Het heeft zijn eigen looptijd in plaats van voort te bouwen op MapReduce. Als zodanig kan het volledig onafhankelijk van het Hadoop-ecosysteem werken.

De ExecutionEnvironment is de context waarin een programma wordt uitgevoerd. Er zijn verschillende omgevingen die u kunt gebruiken, afhankelijk van uw behoeften.

  1. JVM-omgeving : Flink kan op een enkele Java Virtual Machine worden uitgevoerd, waardoor gebruikers Flink-programma's rechtstreeks vanuit hun IDE kunnen testen en debuggen. Wanneer u deze omgeving gebruikt, hebt u alleen de juiste afhankelijkheden van de maven nodig.

  2. Lokale omgeving : om een programma op een actieve Flink-instantie te kunnen uitvoeren (niet vanuit uw IDE), moet u Flink op uw computer installeren. Zie lokale configuratie .

  3. Clusteromgeving : Flink volledig gedistribueerd uitvoeren vereist een zelfstandige of een garencluster. Zie de clusterconfiguratiepagina of deze slideshare voor meer informatie. mportant__: de 2.11 in de artefactnaam is de scala-versie , zorg ervoor dat deze overeenkomt met de versie die u op uw systeem hebt.

APIs

Flink kan worden gebruikt voor stream- of batchverwerking. Ze bieden drie API's:

  • DataStream API : stroomverwerking, dwz transformaties (filters, tijdvensters, aggregaties) op onbeperkte gegevensstromen.
  • DataSet API : batchverwerking, dwz transformaties op gegevenssets.
  • Table API : een SQL-achtige expressietaal (zoals dataframes in Spark) die kan worden ingebed in zowel batch- als streaming-applicaties.

Bouw blokken

Op het meest basale niveau bestaat Flink uit bron (nen), transformaties (s) en sink (s).

voer hier de afbeeldingsbeschrijving in

Op het meest basale niveau bestaat een Flink-programma uit:

  • Gegevensbron : inkomende gegevens die Flink verwerkt
  • Transformaties : de verwerkingsstap wanneer Flink inkomende gegevens wijzigt
  • Gegevensverzameling : waar Flink gegevens verzendt na verwerking

Bronnen en putten kunnen lokale / HDFS-bestanden, databases, berichtenwachtrijen, etc. zijn. Er zijn al veel connectoren van derden beschikbaar, of u kunt eenvoudig uw eigen connectoren maken.

Aantal woorden

Maven

Voeg de afhankelijkheden flink-java en flink-client (zoals uitgelegd in het instellingsvoorbeeld van de JVM-omgeving ).

De code

public class WordCount{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // input data
        // you can also use env.readTextFile(...) to get words
        DataSet<String> text = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles,"
        );

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap( new LineSplitter() )
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy( 0 )
                        .aggregate( Aggregations.SUM, 1 );

        // emit result
        counts.print();
    }   
}
 

LineSplitter.java :

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>{

    public void flatMap( String value, Collector<Tuple2<String, Integer>> out ){
        // normalize and split the line into words
        String[] tokens = value.toLowerCase().split( "\\W+" );

        // emit the pairs
        for( String token : tokens ){
            if( token.length() > 0 ){
                out.collect( new Tuple2<String, Integer>( token, 1 ) );
            }
        }
    }
}
 

Als u Java 8 gebruikt, kunt u .flatmap(new LineSplitter()) door een lambda-expressie:

DataSet<Tuple2<String, Integer>> counts = text
    // split up the lines in pairs (2-tuples) containing: (word,1)
    .flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
        // normalize and split the line into words
        String[] tokens = value.toLowerCase().split( "\\W+" );

        // emit the pairs
        for( String token : tokens ){
            if( token.length() > 0 ){
                out.collect( new Tuple2<>( token, 1 ) );
            }
        }
    } )
    // group by the tuple field "0" and sum up tuple field "1"
    .groupBy( 0 )
    .aggregate( Aggregations.SUM, 1 );
 

Executie

Van de IDE : druk gewoon op uitvoeren in uw IDE. Flink creëert een omgeving binnen de JVM.

Vanaf de grote opdrachtregel : om het programma uit te voeren met een zelfstandige lokale omgeving, doet u het volgende:

  1. zorg ervoor dat flink draait ( flink/bin/start-local.sh );

  2. maak een jar-bestand ( maven package );

  3. gebruik het flink opdrachtregelprogramma (in de map bin van uw grote installatie) om het programma te starten:

    flink run -c your.package.WordCount target/your-jar.jar
     

    Met de optie -c kunt u de uit te voeren klasse opgeven. Het is niet nodig als de pot uitvoerbaar is / een hoofdklasse definieert.

Resultaat

(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)
 

WordCount - Streaming API

Dit voorbeeld is hetzelfde als WordCount , maar gebruikt de Table API. Zie WordCount voor details over uitvoering en resultaten.

Maven

Om de Streaming API te gebruiken, voeg flink-streaming als een maven-afhankelijkheid:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.1.4</version>
</dependency>
 

De code

public class WordCountStreaming{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // get input data
        DataStreamSource<String> source = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        );
        
        source
                // split up the lines in pairs (2-tuples) containing: (word,1)
                .flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
                    // emit the pairs
                    for( String token : value.toLowerCase().split( "\\W+" ) ){
                        if( token.length() > 0 ){
                            out.collect( new Tuple2<>( token, 1 ) );
                        }
                    }
                } )
                // due to type erasure, we need to specify the return type
                .returns( TupleTypeInfo.getBasicTupleTypeInfo( String.class, Integer.class ) )
                // group by the tuple field "0"
                .keyBy( 0 )
                // sum up tuple on field "1"
                .sum( 1 )
                // print the result
                .print();

        // start the job
        env.execute();
    }
}
 

WordCount - Tabel-API

Dit voorbeeld is hetzelfde als WordCount , maar gebruikt de Table API. Zie WordCount voor details over uitvoering en resultaten.

Maven

Om de Table API te gebruiken, voeg flink-table als een maven-afhankelijkheid:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.1.4</version>
</dependency>
 

De code

public class WordCountTable{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment( env );

        // get input data
        DataSource<String> source = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        );

        // split the sentences into words
        FlatMapOperator<String, String> dataset = source
                .flatMap( ( String value, Collector<String> out ) -> {
                    for( String token : value.toLowerCase().split( "\\W+" ) ){
                        if( token.length() > 0 ){
                            out.collect( token );
                        }
                    }
                } )
                // with lambdas, we need to tell flink what type to expect
                .returns( String.class );

        // create a table named "words" from the dataset
        tableEnv.registerDataSet( "words", dataset, "word" );

        // word count using an sql query
        Table results = tableEnv.sql( "select word, count(*) from words group by word" );
        tableEnv.toDataSet( results, Row.class ).print();
    }
}
 

Opmerking : vervang de lambda voor een versie met Java <8 door een anonieme klasse:

FlatMapOperator<String, String> dataset = source.flatMap( new FlatMapFunction<String, String>(){
        @Override
        public void flatMap( String value, Collector<String> out ) throws Exception{
            for( String token : value.toLowerCase().split( "\\W+" ) ){
                if( token.length() > 0 ){
                    out.collect( token );
                }
            }
        }
    } );