spark、scala基础

    xiaoxiao2021-04-11  39

    Spark组件

    spark core:包含Spark的基本功能,包含任务调度,内存管理,容错机制等;定义了RDDs(弹性分布式数据集);提供了很多API来创建和操作这些RRDsSpark SQL:是Spark处理结构化数据的库Spark Streaming:实时数据流处理组件,提供API来操作实时数据流Mlib:包含通用机器学习功能的包Graphx:处理图的库,进行图的并行计算Cluster Managers:Spark自带的一个集群管理调度器

    开发环境

    scala下载安装: http://www.scala-lang.org/download/

    cmd-> scala -version报异常 java.lang.UnsupportedClassVersionError: scala/tools/nsc/MainGenericRunner : Unsupported major.minor version 52.0 检查java -version版本为1.8 javac -version版本为1.7 重新配置环境变量,成功

    Intellij Idea Intellij Idea下载安装: http://www.jetbrains.com/idea/ 破解: http://idea.lanyus.com/ Idea安装scala插件 File->settings->plugins->搜索scala 安装eclipse http://scala-ide.org/

    scala基础语法

    变量声明:

    val 变量值不可修改,一旦分配不能重新指向别的值var 分配后可以重新指向类型相同的值

    函数:

    // def funcName(param:paramType):ReturnTpye={ // function body // } object func_example{ def hello(name:String):String={ s"Hello, ${name}" } hello("Jack") }

    运行结果:

    if表达式:

    //if(logical_exp) valA else valB object func_example{ var a=1 if(a!=1) "not one" if(a!=1) "not one" else a a=3 if(a!=1) "not one" else a }

    运行结果: for表达式

    //for{ // x <- xs // y = x+1 // if (y > 0) //}yield y object func_example{ val l =List("alice","bob","cathy") for{ s <- l s1 = s.toUpperCase() if(s1 != "") }yield(s1) }

    运行结果:

    for推导式生成的集合与它的第一个生成器的类型兼容,例如:

    try-catch-finally

    object func_example{ val result = try { Integer.parseInt("dog") } catch { case _ => 0 } finally { println("always be printed") } }

    运行结果:

    match

    object func_example{ val a = 1 val result = a match { case 1 => "one" case 2 => "two" case _ => "others" } }

    运行结果:

    柯里化 柯里化函数把具有多个参数的函数转换为一条函数链,每个节点上是单一的参数 例如以下两个函数是等价的

    def add(x:Int,y:Int)=x+y def add(x:Int)(y:Int)=x+y//柯里化语法

    柯里化应用举例:

    object func_example{ def add(a:Int)(b:Int)=a+b val addOne=add(1)_ addOne(2) }

    运行结果:

    尾递归 尾递归中,所有递归形式的调用都出现在函数末尾,当编译器检测到一个函数调用的是尾递归的时候,它就覆盖当前的活动记录而不是在栈中创建一个新的。

    object func_example{ //常规递归 def factorial1(n:Int):Int= if(n<=0) 1 else n*factorial1(n-1) //尾递归 @annotation.tailrec def factorial2(n:Int,m:Int):Int= if(n<=0) m else factorial2(n-1,m*n) }

    尾递归与柯里化应用 求和 baf(x)

    object func_example{ def sum (f:Int => Int)(a:Int)(b:Int):Int ={ @annotation.tailrec def loop (n:Int)(acc:Int):Int={ if(n>b) acc else loop(n+1)(acc+f(n)) } loop(a)(0) } sum(x=>x)(1)(5) sum(x=>x*x)(1)(5) val sum2=sum(x=>x*x)_ sum2(1)(5) }

    运行结果:

    匿名函数和类型推断

    lines.filter(line => line.contains("world"))

    在Scala里,匿名函数的定义格式为(形参列表)=> {函数体} 定义了一个匿名函数line => line.contains(“world”),接收一个参数line,使用line这个String类型变量上的contains方法,并返回结果。line的类型不需要指定,能够推断出来。

    spark基础

    RDD的基本操作:Transformation

    map map()接收函数,把函数应用到RDD的每一个元素,返回新的RDD //第一步 scala> val lines=sc.paralelize(Array("hello","spark","hello","world","!")) scala> lines.foreach(println) //第二步 scala> val lines2=lines.map(word=>(word,1)) scala> lines2.foreach(println)

    第一步输出结果为

    hello spark hello world !

    第二步输出结果为

    (hello,1) (spark,1) (hello,1) (world,1) (!,1)

    filter filter()接收函数,返回只包含满足filter()函数的元素的新RDD scala> val lines3=lines.filter(word => word.contains("hello")) scala> lines3.foreach(println)

    输出结果为

    hello hello

    flatMap 对每个输入元素,输出多个输出元素

    scala 快速排序

    object hello { def quickSort(in:List[Int]):List[Int]={ if(in.length<2) in else quickSort(in.filter(_<in.head))++ in.filter(_ ==in.head) ++ quickSort(in.filter(_>in.head)) } val m=List(1,6,3,7,2,3,4,9,5) quickSort(m) }

    运行结果

    转载请注明原文地址: https://ju.6miu.com/read-666577.html

    最新回复(0)