This example is the same as WordCount, but uses the Table API. See WordCount for details about execution and results.
To use the Streaming API, add flink-streaming
as a maven dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.4</version>
</dependency>
public class WordCountStreaming{
public static void main( String[] args ) throws Exception{
// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
DataStreamSource<String> source = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles"
);
source
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
// emit the pairs
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( new Tuple2<>( token, 1 ) );
}
}
} )
// due to type erasure, we need to specify the return type
.returns( TupleTypeInfo.getBasicTupleTypeInfo( String.class, Integer.class ) )
// group by the tuple field "0"
.keyBy( 0 )
// sum up tuple on field "1"
.sum( 1 )
// print the result
.print();
// start the job
env.execute();
}
}