Spark学习笔记(七)

    xiaoxiao2021-03-25  114

    本文目的:

    正式化RDD概念Spark应用程序的生命过程性能调试

    正式化RDD概念 科学定义RDD 是一个接口 1.分区的集合 2.对父RDD的依赖性列表 3.计算分区的函数(作为迭代器) 示例:HadoopRDD and Filtered RDD RDD Graph (DAG of tasks) 同时,每一RDD包括5个部分(1.分区2.依赖3.计算4.(可选)分割器5.首选位置)

    Spark应用程序的生命过程 (这个省略了吧)

    性能调试 1.分布式性能:由于调度,协调或数据分布导致程序缓慢) 2.本地性能:程序慢,因为我运行的任何节点在单个节点上都很慢 两个有用的工具:

    应用程序Web UI(默认端口4040) 执行器日志(火花/工作)

    GC垃圾回收 查看Web用户界面中的“GC时间”列


    Spark SQL组件

    Catalyst优化 关系代数+表达式查询优化Spark SQL核心 执行查询作为RDD阅读Parquet,JSONHive支持 HQL,MetaStore,SerDes,UDF

    Spark SQL和Shark 关系

    Shark: Shark修改了Hive以后端运行Spark,但有两个挑战:

    与Spark程序的有限集成Hive优化器不是为Spark设计的

    Spark SQL: Spark SQL重用了Shark的最好的部分:

    借鉴

    Hive数据加载内存中列存储

    添加

    RDD感知优化器丰富的语言接口

    将Schema添加到RDD

    Spark + RDDs 对不透明对象的分区集合的功能变换。 SQL + SchemaRDDs 对元组的分区集合的声明性变换。

    使用Spark SQL

    SQLContext 所有SQL功能的入口点 包装/扩展现有的spark上下文 from pyspark.sql import SQLContext sqlCtx = SQLContext(sc)

    数据集举例: 包含用户名称和年龄的文本文件:

    Python):

    # Load a text file and convert each line to a dictionary. lines = sc.textFile("examples/.../people.txt") parts = lines.map(lambda l:l.split(",")) people = parts.map(lambda p:{"name": p[0],"age": int(p[1])}) # Infer the schema, and register the SchemaRDD as a table peopleTable = qlCtx.inferSchema(people) peopleTable.registerAsTable("people")

    Scala):

    val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ // Define the schema using a case class. case class Person(name: String, age: Int) //Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(p =>Person(p(0),p(1).trim.toInt)) people.registerAsTable("people")

    使用SQL查询

    #SQL can be run over SchemaRDDs that have been registered #as a table. teenagers = sqlCtx.sql(""" SELECT name FROM people WHERE age >= 13 AND age <= 19""") # The results of SQL queries are RDDs and support all the normal # RDD operations. teenNames = teenagers.map(lambda p: Name: "+p.name)

    SQL and Machine Learning

    training_data_table = sql(""" SELECT e.action,u.age,u.latitude,u.logitude FROM Users u JOIN Events e ON u.userId =e.userId""") def featurize(u): LabeledPoint(u.action,[u.age,u.latitude,u.longitude]) //SQL results are RDDs so can be used directly in Mllib. training_data = training_data_table.map(featurize) model = new LogisticRegressionWithSGD.train(training_data)

    Hive兼容性 在Hive生态系统中访问数据和代码的接口: o支持在HQL中编写查询 o目录信息从Hive MetaStore o使用Hive SerDes的Tablescan运算符 o Hive UDF,UDAF,UDTF的包装器

    Parquet兼容性 在Parquet中读取数据的本机支持: •列式存储避免读取不需要的数据。 •RDD可以写入parquet文件,保留模式。

    转载请注明原文地址: https://ju.6miu.com/read-12147.html

    最新回复(0)