This example is the same as WordCount, but uses the Table API. See WordCount for details about execution and results.
To use the Table API, add flink-table
as a maven dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.1.4</version>
</dependency>
public class WordCountTable{
public static void main( String[] args ) throws Exception{
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment( env );
// get input data
DataSource<String> source = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles"
);
// split the sentences into words
FlatMapOperator<String, String> dataset = source
.flatMap( ( String value, Collector<String> out ) -> {
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( token );
}
}
} )
// with lambdas, we need to tell flink what type to expect
.returns( String.class );
// create a table named "words" from the dataset
tableEnv.registerDataSet( "words", dataset, "word" );
// word count using an sql query
Table results = tableEnv.sql( "select word, count(*) from words group by word" );
tableEnv.toDataSet( results, Row.class ).print();
}
}
Note: For a version using Java < 8, replace the lambda by an anonymous class:
FlatMapOperator<String, String> dataset = source.flatMap( new FlatMapFunction<String, String>(){
@Override
public void flatMap( String value, Collector<String> out ) throws Exception{
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( token );
}
}
}
} );