Add the dependencies flink-java
and flink-client
(as explained in the JVM environment setup example).
public class WordCount{
public static void main( String[] args ) throws Exception{
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// input data
// you can also use env.readTextFile(...) to get words
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap( new LineSplitter() )
// group by the tuple field "0" and sum up tuple field "1"
.groupBy( 0 )
.aggregate( Aggregations.SUM, 1 );
// emit result
counts.print();
}
}
LineSplitter.java
:
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>{
public void flatMap( String value, Collector<Tuple2<String, Integer>> out ){
// normalize and split the line into words
String[] tokens = value.toLowerCase().split( "\\W+" );
// emit the pairs
for( String token : tokens ){
if( token.length() > 0 ){
out.collect( new Tuple2<String, Integer>( token, 1 ) );
}
}
}
}
If you use Java 8, you can replace .flatmap(new LineSplitter())
by a lambda expression:
DataSet<Tuple2<String, Integer>> counts = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
// normalize and split the line into words
String[] tokens = value.toLowerCase().split( "\\W+" );
// emit the pairs
for( String token : tokens ){
if( token.length() > 0 ){
out.collect( new Tuple2<>( token, 1 ) );
}
}
} )
// group by the tuple field "0" and sum up tuple field "1"
.groupBy( 0 )
.aggregate( Aggregations.SUM, 1 );
From the IDE: simply hit run in your IDE. Flink will create an environment inside the JVM.
From the flink command line: to run the program using a standalone local environment, do the following:
ensure flink is running (flink/bin/start-local.sh
);
create a jar file (maven package
);
use the flink
command-line tool (in the bin
folder of your flink installation) to launch the program:
flink run -c your.package.WordCount target/your-jar.jar
The -c
option allows you to specify the class to run. It is not necessary if the jar is executable/defines a main class.
(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)