本章讲解如何使用Flink SQL进行批处理操作。
这里我提取了2016年中超联赛射手榜的数据,通过Flink SQL进行简单的汇总。
1、源数据
这里保存为csv格式:
2、在pom中添加Table API的依赖:
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-table_2.11
</artifactId>
<version>${flink.version}
</version>
</dependency>
3、案例
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class TableJob {
public static void main(String[] args)
throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(
1);
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
DataSet<TopScorers> csvInput = env
.readCsvFile(
"E://flink-training-exercises//src//main//resource//2016_Chinese_Super_League_Top_Scorers.csv")
.ignoreFirstLine() .pojoType(TopScorers.class,
"rank",
"player",
"country",
"club",
"total_score",
"total_score_home",
"total_score_visit",
"point_kick");
Table topScore = tableEnv.fromDataSet(csvInput);
tableEnv.registerTable(
"topScore",topScore);
Table groupedByCountry = tableEnv.sql(
"select country,sum(total_score) as sum_total_score from topScore group by country order by 2 desc");
DataSet<Result> result = tableEnv.toDataSet(groupedByCountry,Result.class);
result.map(
new MapFunction<Result, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer>
map(Result result)
throws Exception {
String country = result.country;
int sum_total_score = result.sum_total_score;
return Tuple2.of(country,sum_total_score);
}
}).print();
}
/**
* 源数据的映射类
*/
public static class TopScorers {
/**
* 排名,球员,国籍,俱乐部,总进球,主场进球数,客场进球数,点球进球数
*/
public int rank;
public String player;
public String country;
public String club;
public int total_score;
public int total_score_home;
public int total_score_visit;
public int point_kick;
public TopScorers() {
super();
}
}
/**
* 统计结果对应的类
*/
public static class Result {
public String country;
public int sum_total_score;
public Result() {}
}
}
4、说明
这里注意一下csv的格式,由于第一行是个说明行,所以在处理时要将第一行去掉:
ignoreFirstLine()
另外,本统计是想统计在这份榜单中,以球员所在的国家进行分组,然后统计进球数。
通过sql:
"
select country,sum(total_score) as sum_total_score from topScore group by country order by 2 desc"
来实现,并将结果按照进球数倒叙排列。
5、结果
从结果中可以看出,巴西球员最受各俱乐部欢迎,而且以巴西为主的南美洲以及非洲球员,都是各队比较喜欢的。
6、结论:
首先,南美球员技术出众,单兵作战能力强,而且南美球员不觉得来中超踢球是件丢人的事,所以很适合中超;第二,非洲球员身体素质出色,对抗能力强,前场突击能力也是各俱乐部看中的地方。第三,国内球员表现疲软,排名前30名的国内球员,总共进球数才43,也能从侧面反映出国足目前锋无力的情况是多么严重了。
转载请注明原文地址: https://ju.6miu.com/read-11576.html