storm是一款开源的、分布式的、低延迟的、可扩展的、容错的实时计算框架,采用clojure和java的混合编程,总体两者的代码总量是55开的,但clojure语言具有很强的表现力,所以storm的核心基本都是使用clojure语言实现的。jstorm是阿里对storm的java改写版本,阿里团队也对其做了一些优化,使得jstorm更加强大,而且jstorm是完全对storm兼容的,只是有一些细微的差别,但是基本都是内部的差别,对外的API不会有什么区别,所以学习的成本也不会增加太多。 我们对开源的、分布式的、低延迟的、可扩展的、容错的做一下解释:
开源的:感觉这个应当是不用讲的,开源不光意味着代码的开放,同样意味着其具有强大的生命力,因为jstorm不是属于一个团队的,而是大家都可以对其进行改造,生命力源自于此。分布式的:一台物理机器的CPU、内存、磁盘终究是有限度的,当数据量巨大的情况下,利用多台廉价的机器来协同完成计算成为了上上策,而jstorm也就是用于协同多台机器完成计算的框架,当然这里的计算是指实时计算。其实分布式也是容错性的一部分。低延迟的:低延迟一方面得益于他是分布式的,计算能力可以通过机器数量的扩展得到提升。传统的计算框架中也有很多分布式的,比如hadoop,但是那是批量处理的模式,对数据进行缓存后再进行一个批处理,比如计算网站的访问量,批处理的方式是一个小时内的数据进行计算,得到访问量。而实时计算的做法有两种,一种是一条一条记录的计算,这样讲处理的延迟就会降低,这也是jstorm采用的方案,另一种便是将批处理的“批”的时间间隔大小减少,比如一个batch时间定为200ms,这样他的处理延迟也会比传统的离线批处理快很多,这是另一款实时处理框架spark streaming的处理方法。可扩展:通过源代码的阅读,发现jstorm采用的是thrift的server/client的模式,rpc通过thrift定义接口,代码的各个大板块之前都可以使用不同的语言开发,但是提供给我们使用的API的语言貌似只有java,也可以有一些扩展,但是支持的版本比较低。容错:不同的应用程序对容错性的要求不同,如银行的交易,对容错性较高,网站访问数量的计算,其容错性要求就比较低。总的来数,核心业务的容错性要求较高。jstorm的容错性体现在两方面,一是其为集群的、分布式的系统,而且jstorm是一个无状态的模型,其所有的状态都保存在一个集群的zookeeper中,在机器、进程死掉后,可以通过zookeeper中记录的信息进行重启。二是其具有ack机制,对每一条信息,都可以进行ack告知上游处理完成,或者fail,告诉上游处理不成功,没能手动的ack,便会触发timeout,上游也会fail,根据我们容错性要求的不同,其处理办法也不同。有一些比较两者的文章已经很多了,我就讲一讲,我自己体会到的差别有一些什么就好了
spark概述:spark streaming是spark上的一个扩展组件,spark是一个批处理的模型用来实现实时计算,其核心抽象的数据类型是rdd,rdd是一个可恢复的分布式的数据集,spark streaming是对一个信息流(dstream),也就是源源不断到来的新的数据进行一个时间段的批处理,比如每200ms的数据当成一个批处理,交给spark处理。其他的细节,大家可以自行去查阅一下。
整体架构:spark streaming 整体的架构是driver,master,worker,zookeeper并不是必须的。jstorm整体的架构是client,nimbus,zookeeper,supervisor,worker组成的,其实这样说可能大家也不会太清楚,但是这一点展开说确实太多了,在这里展开说就不太好,jstorm的整体架构会在后面谈到,而spark的整体架构,大家可以自行去查阅,也没有太难。延迟:不考虑什么网络延迟的差异,那么jstorm的实时性比spark会好一些,原因便是spark streaming其实是一个批处理的模型,其会等待一个时间窗口进行计算。基本每一个spark streaming的程序在创建流的时候都需要制定一个duration,这个duration便是批处理的时间大小,而jstorm的进行一条一条的计算,其时间延迟当然低。但是相应的,在面对长时间的计算的时候,jstorm的吞吐量是不如spark streaming的。编程API:经过自己写过一点点两者的代码,笔者自己感觉,spark streaming的API接口较为简单,提供的操作种类也比较多,因为spark是使用函数式编程的scala编写的,所以其API接口给人的感觉更加直观。stream:其实两者对数据流的抽象都是stream。spark由于是批处理模型,所以下一步抽象便是把stream按时间段截断为一个一个的小batch,称为RDD。jstorm提供的抽象具体到单条记录,每一条记录称为tuple,tuple就是一个list(列表)。批处理与实时计算:jstorm在以前的基础上,还封装了一层,叫trident,这是针对批处理的一个抽象,上面提供了一些对于批处理的常用操作,比如groupby(按键值进行分组)等,这和spark streaming是一样的。另外,相对的,spark也可以把自己批处理切分得小一些,比如50ms为一个batch,这样也就达到了实时计算的要求。个人感觉,在批处理和实时计算上面两者其实是算打成平手的。容错性对于spark,如果所有的输入数据都存在于一个容错的文件系统如HDFS,Spark Streaming总可以从任何错误中恢复并且执行所有数据。这给出了一个恰好一次(exactly-once)语义,即无论发生什么故障,所有的数据都将会恰好处理一次。细讲一下这个地方:每个后续的RDD或者是更上一层的抽象DStream,都不是立刻计算出来的,而是在最后触发action操作的时候(也就是有输出 )才开始计算,这样,只要我们数据源存储在一个容错的文件系统中,当我们中间计算结果丢失后,我们通过最原始的数据和中间的转化操作,重新计算出我们需要的中间结果,从而恢复计算。对于jstorm的容错性,其提供ack/fail/timeout机制来保证消息被正确的处理,如果我们再采用其事务机制,或者模糊型事务机制,那么可以得到有且只有一次处理的语义(具体的事务机制我们后面会细讲),如果是由于机器垮掉,由于jstorm本身是无状态的,信息记录在zookeeper中,所以在挂掉的机子上的任务可以在其他机子上重启并恢复。基本概念:
Topologies(拓扑)Streams(流)Spouts(数据源)Bolts(数据处理组件)Stream groupings(数据流分组)Reliability(可靠性)Tasks(任务)Workers(工作进程) Topology是一个高层的抽象,是对计算逻辑的封装,比如应用中包含了什么spout和bolt以及如何进行数据的分组等信息Stream是对数据的抽象,实时计算中数据源源不断的到来,就如“流”一般,stream其实就是一个接一个的单元,每一个单元称作Tuple,那么流就是tuple1、tuple2、tuple3、…….。每一个tuple中都有一个id用来标识这一个tuple,而且保证每一个相同的tuple只有一个相同的id,其实在内部实现的时候采用的是java UUID.randomUUID().toString()实现这一唯一ID的。UUID根据网口,时间等生成一个号称全球唯一的ID。tuple中元素可以是String,int,long等基本类型,也可以是自己定义的类型,但是需要序列化的才可以哦。Spout标识Stream的源头,一个Stream总是需要有源头的,Soput中有一个函数叫nextTuple(),而Tuple就是Stream的一个小单元,系统不断的调用nextTuple(),于是源源不断的Stream就产生了。在nextTuple中调用collector的emi()将tuple发射出去于是产生了Stream。Bolt用于处理Stream,比如Spout中产生的每一个Tuple都是Integer=2,那么Bolt可以做+1操作,通过实现Bolt中的excute(Tupele t)方法即可,excute()不断的处理Stream中的每一个Tuple,当然Bolt也可以像Spout一样,发射他没处理完的tuple给下一级的bolt继续处理。在使用bolt的时候需要申明bolt的excute方法中参数tuple是来自哪一个上一级组件发射出来的tuple。Streaming groupings,为拓扑中的每个 Bolt 的确定输入数据流是定义一个拓扑的重要环节。数据流分组定义了在 Bolt 的不同任务(tasks)中划分数据流的方式。在 Storm 中有八种内置的数据流分组方式,而且你还可以通过CustomStreamGrouping 接口实现自定义的数据流分组模型。这八种分组分时分别为:
随机分组(Shuffle grouping):这种方式下元组会被尽可能随机地分配到 Bolt 的不同任务(tasks)中,使得每个任务所处理元组数量能够能够保持基本一致,以确保集群的负载均衡。域分组(Fields grouping):这种方式下数据流根据定义的“域”来进行分组。例如,如果某个数据流是基于一个名为“user-id”的域进行分组的,那么所有包含相同的“user-id”的元组都会被分配到同一个任务中,这样就可以确保消息处理的一致性。部分关键字分组(Partial Key grouping):这种方式与域分组很相似,根据定义的域来对数据流进行分组,不同的是,这种方式会考虑下游 Bolt 数据处理的均衡性问题,在输入数据源关键字不平衡时会有更好的性能1。感兴趣的读者可以参考这篇论文,其中详细解释了这种分组方式的工作原理以及它的优点。完全分组(All grouping):这种方式下数据流会被同时发送到 Bolt 的所有任务中(也就是说同一个元组会被复制多份然后被所有的任务处理),使用这种分组方式要特别小心。全局分组(Global grouping):这种方式下所有的数据流都会被发送到 Bolt 的同一个任务中,也就是 id 最小的那个任务。非分组(None grouping):使用这种方式说明你不关心数据流如何分组。目前这种方式的结果与随机分组完全等效,不过未来 Storm 社区可能会考虑通过非分组方式来让 Bolt 和它所订阅的 Spout 或 Bolt 在同一个线程中执行。直接分组(Direct grouping):这是一种特殊的分组方式。使用这种方式意味着元组的发送者可以指定下游的哪个任务可以接收这个元组。只有在数据流被声明为直接数据流时才能够使用直接分组方式。使用直接数据流发送元组需要使用 OutputCollector 的其中一个 emitDirect 方法。Bolt 可以通过 TopologyContext 来获取它的下游消费者的任务 id,也可以通过跟踪 OutputCollector 的 emit 方法(该方法会返回它所发送元组的目标任务的 id)的数据来获取任务 id。本地或随机分组(Local or shuffle grouping):如果在源组件的 worker 进程里目标 Bolt 有一个或更多的任务线程,元组会被随机分配到那些同进程的任务中。换句话说,这与随机分组的方式具有相似的效果。ReliabilityStorm 可以通过拓扑来确保每个发送的元组都能得到正确处理。通过跟踪由 Spout 发出的每个元组构成的元组树可以确定元组是否已经完成处理。每个拓扑都有一个“消息延时”参数,如果 Storm 在延时时间内没有检测到元组是否处理完成,就会将该元组标记为处理失败,并会在稍后重新发送该元组。为了充分利用 Storm 的可靠性机制,你必须在元组树创建新结点的时候以及元组处理完成的时候通知 Storm。这个过程可以在 Bolt 发送元组时通过 OutputCollector 实现:在 emit 方法中实现元组的锚定(Anchoring),同时使用 ack 方法表明你已经完成了元组的处理。
task是Spout和Bolt的组成单元,spout和bolt都是由多个task组成的,一个task对应一个线程,但是一个进程中的多个线程可能是不属于同一个Spout和Bolt的。这样做的好处(自己想的,也许不对呢):一是如果一个进程挂掉了,那么Spout和Bolt不会整体的死掉,比如Spout由4个task组成,而就算一个进程挂掉,那么里面最多运行了一个task是属于这个spout的,还有3个task在运行,不会影响整个topology的完整性,二是,spout中的一个task产生数据,可以在进程内传输给一个bolt,而不用跨进程传输甚至是跨机器传输。woker便是对应的工作者,其实对应的就是进程,Jstorm 会在所有的 worker 中分散任务,以便实现集群的负载均衡,我们也可以指定topology的worker的并行度。