apache-flinkapache-flink入门


备注

本节概述了apache-flink是什么,以及开发人员为什么要使用它。

它还应该提到apache-flink中的任何大型主题,并链接到相关主题。由于apache-flink的文档是新的,您可能需要创建这些相关主题的初始版本。

要从IDE运行flink程序(我们可以使用Eclipse或Intellij IDEA(preffered)),您需要两个依赖项: flink-java / flink-scalaflink-clients (截至2016年2月)。可以使用Maven和SBT添加这些JARS(如果使用scala)。

  • Maven的
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.1.4</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.1.4</version>
</dependency>
 
  • SBT名称:=“”

     version := "1.0"
     
     scalaVersion := "2.11.8"
     
     libraryDependencies ++= Seq(
       "org.apache.flink" %% "flink-scala" % "1.2.0",
       "org.apache.flink" %% "flink-clients" % "1.2.0"
     )
     

重要 :工件名称中的2.11scala版本 ,请确保与您系统上的版本匹配。

本地运行时设置

  1. 确保您拥有java 6或更高版本并且设置了JAVA_HOME 环境变量。

  2. 这里下载最新的flink二进制文件:

    wget flink-XXXX.tar.gz
     

    如果您不打算使用Hadoop,请选择hadoop 1版本。另外,请注意您下载的scala版本,以便在程序中添加正确的maven依赖项。

  3. 开始flink:

    tar xzvf flink-XXXX.tar.gz
    ./flink/bin/start-local.sh
     

    Flink已配置为在本地运行。要确保flink正在运行,您可以在flink/log/ 检查日志,或者打开在http://localhost:8081 上运行的flink jobManager接口。

  4. 停止flink:

    ./flink/bin/stop-local.sh
     

概述和要求

什么是Flink

Apache HadoopApache Spark一样 ,Apache Flink是一个社区驱动的开源框架,用于分布式大数据分析。 Flink使用Java编写,具有Scala,Java和Python API,允许批量和实时流分析。

要求

  • 类似UNIX的环境,例如Linux,Mac OS X或Cygwin;
  • Java 6.X或更高版本;
  • [可选] Maven 3.0.4或更高版本。

在此处输入图像描述

执行环境

Apache Flink是一个数据处理系统, 是Hadoop MapReduce组件的替代品 。它具有自己的运行时而不是构建在MapReduce之上。因此,它可以完全独立于Hadoop生态系统工作。

ExecutionEnvironmentExecutionEnvironment 程序的上下文。根据您的需要,您可以使用不同的环境。

  1. JVM环境 :Flink可以在单个Java虚拟机上运行,​​允许用户直接从IDE测试和调试Flink程序。使用此环境时,您只需要正确的maven依赖项。

  2. 本地环境 :为了能够在正在运行的Flink实例上运行程序(而不是在IDE中),您需要在计算机上安装Flink。请参阅本地设置

  3. 集群环境 :以完全分布式方式运行Flink需要独立或纱线集群。有关详细信息,请参阅群集设置页面此幻灯片共享 。 mportant__:工件名称中的2.11scala版本 ,请确保与您系统上的版本匹配。

蜜蜂

Flink可用于流或批处理。他们提供三种API:

  • DataStream API :流处理,即无界数据流上的转换(过滤器,时间窗,聚合)。
  • DataSet API :批处理,即数据集的转换。
  • 表API :类似于SQL的表达式语言(如Spark中的数据帧),可以嵌入批处理和流应用程序中。

建筑模块

在最基本的层面上,Flink由源(s),转换和接收器组成。

在此处输入图像描述

在最基本的层面上,Flink计划由以下部分组成:

  • 数据源 :Flink处理的传入数据
  • 转换 :Flink修改传入数据时的处理步骤
  • 数据接收 :Flink在处理后发送数据的位置

源和接收器可以是本地/ HDFS文件,数据库,消息队列等。已有许多第三方连接器可用,或者您可以轻松创建自己的连接器。

字数

Maven的

添加依赖项flink-javaflink-client (如JVM环境设置示例中所述)。

代码

public class WordCount{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // input data
        // you can also use env.readTextFile(...) to get words
        DataSet<String> text = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles,"
        );

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap( new LineSplitter() )
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy( 0 )
                        .aggregate( Aggregations.SUM, 1 );

        // emit result
        counts.print();
    }   
}
 

LineSplitter.java

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>{

    public void flatMap( String value, Collector<Tuple2<String, Integer>> out ){
        // normalize and split the line into words
        String[] tokens = value.toLowerCase().split( "\\W+" );

        // emit the pairs
        for( String token : tokens ){
            if( token.length() > 0 ){
                out.collect( new Tuple2<String, Integer>( token, 1 ) );
            }
        }
    }
}
 

如果使用Java 8,则可以用lambda表达式替换.flatmap(new LineSplitter())

DataSet<Tuple2<String, Integer>> counts = text
    // split up the lines in pairs (2-tuples) containing: (word,1)
    .flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
        // normalize and split the line into words
        String[] tokens = value.toLowerCase().split( "\\W+" );

        // emit the pairs
        for( String token : tokens ){
            if( token.length() > 0 ){
                out.collect( new Tuple2<>( token, 1 ) );
            }
        }
    } )
    // group by the tuple field "0" and sum up tuple field "1"
    .groupBy( 0 )
    .aggregate( Aggregations.SUM, 1 );
 

执行

在IDE中 :只需在IDE中运行即可。 Flink将在JVM中创建一个环境。

从flink命令行 :使用独立的本地环境运行程序,执行以下操作:

  1. 确保flink正在运行( flink/bin/start-local.sh );

  2. 创建一个jar文件( maven package );

  3. 使用flink 命令行工具(在flink安装的bin 文件夹中)启动程序:

    flink run -c your.package.WordCount target/your-jar.jar
     

    -c 选项允许您指定要运行的类。如果jar是可执行的/定义主类是没有必要的。

结果

(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)
 

WordCount - 流式API

此示例与WordCount相同,但使用Table API。有关执行和结果的详细信息,请参阅WordCount

Maven的

要使用Streaming API,请将flink-streaming 添加为maven依赖项:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.1.4</version>
</dependency>
 

代码

public class WordCountStreaming{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // get input data
        DataStreamSource<String> source = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        );
        
        source
                // split up the lines in pairs (2-tuples) containing: (word,1)
                .flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
                    // emit the pairs
                    for( String token : value.toLowerCase().split( "\\W+" ) ){
                        if( token.length() > 0 ){
                            out.collect( new Tuple2<>( token, 1 ) );
                        }
                    }
                } )
                // due to type erasure, we need to specify the return type
                .returns( TupleTypeInfo.getBasicTupleTypeInfo( String.class, Integer.class ) )
                // group by the tuple field "0"
                .keyBy( 0 )
                // sum up tuple on field "1"
                .sum( 1 )
                // print the result
                .print();

        // start the job
        env.execute();
    }
}
 

WordCount - 表API

此示例与WordCount相同,但使用Table API。有关执行和结果的详细信息,请参阅WordCount

Maven的

要使用Table API,请将flink-table 添加为maven依赖项:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.1.4</version>
</dependency>
 

代码

public class WordCountTable{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment( env );

        // get input data
        DataSource<String> source = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        );

        // split the sentences into words
        FlatMapOperator<String, String> dataset = source
                .flatMap( ( String value, Collector<String> out ) -> {
                    for( String token : value.toLowerCase().split( "\\W+" ) ){
                        if( token.length() > 0 ){
                            out.collect( token );
                        }
                    }
                } )
                // with lambdas, we need to tell flink what type to expect
                .returns( String.class );

        // create a table named "words" from the dataset
        tableEnv.registerDataSet( "words", dataset, "word" );

        // word count using an sql query
        Table results = tableEnv.sql( "select word, count(*) from words group by word" );
        tableEnv.toDataSet( results, Row.class ).print();
    }
}
 

注意 :对于使用Java <8的版本,请使用匿名类替换lambda:

FlatMapOperator<String, String> dataset = source.flatMap( new FlatMapFunction<String, String>(){
        @Override
        public void flatMap( String value, Collector<String> out ) throws Exception{
            for( String token : value.toLowerCase().split( "\\W+" ) ){
                if( token.length() > 0 ){
                    out.collect( token );
                }
            }
        }
    } );