Apache Flink SQL示例

    xiaoxiao2021-03-25  105

    本章讲解如何使用Flink SQL进行批处理操作。

    这里我提取了2016年中超联赛射手榜的数据,通过Flink SQL进行简单的汇总。

    1、源数据

    这里保存为csv格式:

    2、在pom中添加Table API的依赖:

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>${flink.version}</version> </dependency>

    3、案例

    import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; public class TableJob { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env); //source,这里读取CSV文件,并转换为对应的Class DataSet<TopScorers> csvInput = env .readCsvFile("E://flink-training-exercises//src//main//resource//2016_Chinese_Super_League_Top_Scorers.csv") .ignoreFirstLine() .pojoType(TopScorers.class,"rank","player","country","club","total_score","total_score_home","total_score_visit","point_kick"); //将DataSet转换为Table Table topScore = tableEnv.fromDataSet(csvInput); //将topScore注册为一个表 tableEnv.registerTable("topScore",topScore); //查询球员所在的国家,以及这些国家的球员(内援和外援)的总进球数 Table groupedByCountry = tableEnv.sql("select country,sum(total_score) as sum_total_score from topScore group by country order by 2 desc"); //转换回dataset DataSet<Result> result = tableEnv.toDataSet(groupedByCountry,Result.class); //将dataset map成tuple输出 result.map(new MapFunction<Result, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(Result result) throws Exception { String country = result.country; int sum_total_score = result.sum_total_score; return Tuple2.of(country,sum_total_score); } }).print(); } /** * 源数据的映射类 */ public static class TopScorers { /** * 排名,球员,国籍,俱乐部,总进球,主场进球数,客场进球数,点球进球数 */ public int rank; public String player; public String country; public String club; public int total_score; public int total_score_home; public int total_score_visit; public int point_kick; public TopScorers() { super(); } } /** * 统计结果对应的类 */ public static class Result { public String country; public int sum_total_score; public Result() {} } }

    4、说明

    这里注意一下csv的格式,由于第一行是个说明行,所以在处理时要将第一行去掉:

    ignoreFirstLine()

    另外,本统计是想统计在这份榜单中,以球员所在的国家进行分组,然后统计进球数。

    通过sql:

    "select country,sum(total_score) as sum_total_score from topScore group by country order by 2 desc"

    来实现,并将结果按照进球数倒叙排列。

    5、结果

    从结果中可以看出,巴西球员最受各俱乐部欢迎,而且以巴西为主的南美洲以及非洲球员,都是各队比较喜欢的。

    6、结论:

    首先,南美球员技术出众,单兵作战能力强,而且南美球员不觉得来中超踢球是件丢人的事,所以很适合中超;第二,非洲球员身体素质出色,对抗能力强,前场突击能力也是各俱乐部看中的地方。第三,国内球员表现疲软,排名前30名的国内球员,总共进球数才43,也能从侧面反映出国足目前锋无力的情况是多么严重了。

    转载请注明原文地址: https://ju.6miu.com/read-11576.html

    最新回复(0)