In addition to peoples.csv
(see simple aggregation from a CSV) we have two more CSVs representing products and sales.
sales.csv
(people_id, product_id):
19,5
6,4
10,4
2,4
8,1
19,2
8,4
5,5
13,5
4,4
6,1
3,3
8,3
17,2
6,2
1,2
3,5
15,5
3,3
6,3
13,2
20,4
20,2
products.csv
(id, name, price):
1,Loperamide,47.29
2,pain relief pm,61.01
3,Citalopram,48.13
4,CTx4 Gel 5000,12.65
5,Namenda,27.67
We want to get the name and product for each sale of more than 40$:
public class SimpleJoinExample{
public static void main( String[] args ) throws Exception{
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment( env );
String peoplesPath = TableExample.class.getClassLoader().getResource( "peoples.csv" ).getPath();
String productsPath = TableExample.class.getClassLoader().getResource( "products.csv" ).getPath();
String salesPath = TableExample.class.getClassLoader().getResource( "sales.csv" ).getPath();
Table peoples = csvTable(
tableEnv,
"peoples",
peoplesPath,
"pe_id,last_name,country,gender",
new TypeInformation[]{ Types.INT(), Types.STRING(), Types.STRING(), Types.STRING() } );
Table products = csvTable(
tableEnv,
"products",
productsPath,
"prod_id,product_name,price",
new TypeInformation[]{ Types.INT(), Types.STRING(), Types.FLOAT() } );
Table sales = csvTable(
tableEnv,
"sales",
salesPath,
"people_id,product_id",
new TypeInformation[]{ Types.INT(), Types.INT() } );
// here is the interesting part:
Table join = peoples
.join( sales ).where( "pe_id = people_id" )
.join( products ).where( "product_id = prod_id" )
.select( "last_name, product_name, price" )
.where( "price < 40" );
DataSet<Row> result = tableEnv.toDataSet( join, Row.class );
result.print();
}//end main
public static Table csvTable( BatchTableEnvironment tableEnv, String name, String path, String header,
TypeInformation[]
typeInfo ){
CsvTableSource tableSource = new CsvTableSource( path, header.split( "," ), typeInfo);
tableEnv.registerTableSource( name, tableSource );
return tableEnv.scan( name );
}
}//end class
Note that it is important to use different names for each column, otherwise flink will complain about "ambiguous names in join".
Result:
Burton,Namenda,27.67
Marshall,Namenda,27.67
Burke,Namenda,27.67
Adams,Namenda,27.67
Evans,Namenda,27.67
Garza,CTx4 Gel 5000,12.65
Fox,CTx4 Gel 5000,12.65
Nichols,CTx4 Gel 5000,12.65
Stephens,CTx4 Gel 5000,12.65
Bradley,CTx4 Gel 5000,12.65
Lane,CTx4 Gel 5000,12.65