storm、jstorm调研系列 (三)-----jstorm 代码例子和解读(自己写的代码,可以运行)

    xiaoxiao2025-12-03  6

    storm、jstorm调研系列 (一) storm、jstorm调研系列 (二)


    在前面两个系列中我们讲到了jstorm是怎么一回事,现在,我们开始写自己的jstorm代码(storm代码可以完全的移植到jstorm上)


    如果以后有时间,就把jstorm的DRPC和Trident原语以及事务机制还有hook机制,外加自己的源代码的阅读知道的东西写下来,这里在这里做一个记号。其实jstorm还有好多东西自己都没写下来,看以后是否有个空来写一写吧


    jstorm的代码结构(针对单条数据处理topology)

    public static void main(String[] args) throws Exception { String zks = "ip:port";//这个是kafka的地址,ip和port请执行填写 //消息的topic String topic = "user_action"; //订阅的kafka的topic //strom在zookeeper上的根 String zkRoot = "/jstorm"; //你再配置jstorm的时候指定的jstorm在zookeeper中的根目录 String id = "test_yc28"; //我们的user_id这个随便写 BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConf.forceFromStart = true; spoutConf.zkServers = Arrays.asList(new String[] {"kafka-ip"});//kafka地址,kafka-ip请自行填写 spoutConf.zkPort = 2181;//kafka端口 Config conf = new Config(); conf.setNumWorkers(2); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf),3); builder.setBolt("print-bolt", new KafkaBolt(),3).shuffleGrouping("kafka-reader"); StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());//topologyName在提交topology时给其指定的名字,kill的时候指定这个名字即可 } public class KafkaBolt implements IBasicBolt{ private static final Logger LOG = LoggerFactory.getLogger(SimpleBolt.class); @Override public void prepare(Map stormConf, TopologyContext context) { LOG.info("Successfully do prepare"); } @Override public void execute(Tuple input, BasicOutputCollector collector) { LOG.info(input.getString(0)+"..........................................................................."); LOG.info(input.getFields().toString()); } @Override public void cleanup() { LOG.info("Successfully do cleanup"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // declarer.declare(new Fields("BatchId", "counters")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }

    这一个例子就是从kafka中读取日志,里面的kafka地址请读者自己填写自己公司的kafka地址,当然也可以换成一个普通的spout,自己定义nexttuple()函数,不断产生一个随机数即可。

    main函数中主要就是下面这一段:

    TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf),3); builder.setBolt("print-bolt", new KafkaBolt(),3).shuffleGrouping("kafka-reader"); StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());

    builder是一个topology的生成器,builder.createTopology()会生成一个StormTopology的对象,由StormSubmit提交。 这段代码前面的部分不管,只是kafka的一些设置问题,大家可以自己写一个简单的spout放在这里,不一定需要kafka产生数据。

    TopologyBuilder builder = new TopologyBuilder();

    生成一个topology的生成器,我们对其添加一些spout或者bolt的组件:

    builder.setSpout("kafka-reader", new KafkaSpout(spoutConf),3); builder.setBolt("print-bolt", new KafkaBolt(),3).shuffleGrouping("kafka-reader");

    这里给topology添加了两个组件,一个spout,这个spout利用前面设置的kafka属性,从kafka那里接受信息。其中的 “kafka-reader” 便是这个组件的名字,3表示这个spout由3个task(线程)执行。然后再set了一个Bolt, print-bolt 是这个bolt的名字,3同样表示这个bolt的并行度。

    setBolt后面跟了一个shuffleGrouping(“kafka-reader”)表示这个bolt接收的数据来源是一个叫做kafka-reader的组件,也就是上面申明的Spout。shuffleGrouping表示的是分组方式,在storm、jstorm调研系列 (一) 已经有讲到的shuffe表示随机的分组,因为bolt中有3个task,一个tuple来了交给谁处理,就由这个方法指定,可以是shuffe的roundRobin的方法,也可以是GroupBy一个key,那么key相同的数据,分到同一个task中处理。

    KafkaSpout是Jstorm自带的Spout组件,我们不在这里多讲,后面的章节中会再把具体的提到,我们来讲一讲KafkaBolt:

    我们自己定义的bolt最好继承自IBasicBolt,因为他会自动的帮我们ACK信息,会比较方便 Bolt是一个接口,我们去实现它的每一个方法就好了

    interface IBasicBolt{ public void prepare(Map stormConf, TopologyContext context); public void execute(Tuple input, BasicOutputCollector collector); public void cleanup() ; public void declareOutputFields(OutputFieldsDeclarer declarer); public Map<String, Object> getComponentConfiguration() ; } prepare:在bolt初始化的时候会被调用一次,可以做一些bolt的初始化工作,比如bolt技术设0execute:是在每一次处理上游传过来额一个tuple的时候调用,可以对tuple进行计算,存储到外部存储介质,如数据库中cleanup:在bolt被销毁的时候调用declareOutPutFields:定义我们输出的字段,当我们bolt需要继续向他的下游发送数据的时候,需要定义自己发送出去的字段。getComponentConfiguration:得到当前Conponent的配置信息

    当然我们可以选择去实现系统提供的Spout接口中的任意一个,比如ISpout ,IBaseSpout,IRichSpout。Bolt也是一样的,只要我们去实现这些接口对应方法就可以了。但是会有一些系统自带的Spout实现,比如这里我们用到的KafkaSpout。


    jstorm代码结构(Trident,用于jstorm批处理的抽象)

    public class SimpleBatchTopology { private static String topologyName = "kafka_trident2"; private static Config conf; public static void main(String[] args) throws Exception { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true); conf = new Config(); TridentTopology topology = new TridentTopology(); topology.newStream("kafka_batch_reader",spout) .parallelismHint(6) .each(new Fields("sentence"),new Split(),new Fields("word")) .groupBy(new Fields("word")) .aggregate(new Fields("word"),new WordCountAggregator(),new Fields("count")).each(new Fields("word","count"),new Print(),new Fields("hehe")) .parallelismHint(6); //StormSubmitter.submitTopology(topologyName, conf,topology.build()); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName,conf,topology.build()); System.out.println("input yes"); } } class WordCountAggregator implements ReducerAggregator<Long> { public Long init() { return 1L; } public Long reduce(Long l, TridentTuple tuple) { return l + 1L; } } class Split extends BaseFunction{ private static final Logger LOG = LoggerFactory.getLogger(Split.class); public void execute(TridentTuple tuple, TridentCollector collector) { String[] strs = tuple.getString(0).split(" "); System.out.println("........................................."); for(String str: strs) { System.out.println(str); collector.emit(new Values(str)); } System.out.println("......................................."); } } class Print extends BaseFunction{ private static final Logger LOG = LoggerFactory.getLogger(Print.class); public void execute(TridentTuple t, TridentCollector collector ) { String s = t.getString(0); Long lt = t.getLong(1); System.out.println(s+":"+lt); collector.emit(new Values("hehe")); } }

    上面是全部的代码,我们来分步看:

    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true);

    FixedBatchSpout是一个特别简单的内置Spout,用来做测试用的数据来源特别的棒,定义输出的字段名为“sentence” 数据就是后面那些Values,每一个Values对应一个Tuple,这里一共4个tuple,最后 Spout.setCycle(true) 让这些数据循环产生,而不是产生一次就没了。

    TridentTopology topology = new TridentTopology(); topology.newStream("kafka_batch_reader",spout) .parallelismHint(6) .each(new Fields("sentence"),new Split(),new Fields("word")) .groupBy(new Fields("word")) .aggregate(new Fields("word"),new WordCountAggregator(),new Fields("count")).each(new Fields("word","count"),new Print(),new Fields("hehe")) .parallelismHint(6);

    TridentTopology就是一个topology,但是这是对应的批处理的Topology。

    topology.newStream(“kafka_batch_reader”,spout) 是指创建了一个Stream,这个Stream是一个Batch的Stream,其源头是spout,名字是“kafka_batch_reader”newStream中的不光可以传入一个代表batch的Spout作为输入源,同样的的可以输入一个非batch的输入源,在newStream中,会利用 new RichSpoutBatchExecutor(spout) 将这个spout转化成batch的Spout,然后再构建streamparallelismHint(6) 是指定其并行度
    each 是对Stream的操作。其需要三个参数,第一个字段和第三个字段分别表示接受字段和输出字段(在有一些操作,输出字段是在接收子弹后面追加的,这种情况都是在本机上操作,不涉及传输的时候回这样,在shuffle的时候输出字段是单独的,不会追加在输入字段后面),第2个参数就是我们对tuple进行计算的操作,是一个Operation(Function继承自Operation)。 class Print extends BaseFunction{ private static final Logger LOG LoggerFactory.getLogger(Print.class); public void execute(TridentTuple t, TridentCollector collector ) { String s = t.getString(0); Long lt = t.getLong(1); System.out.println(s+":"+lt); collector.emit(new Values("hehe")); } }

    BaseFunction是Function是一个默认实现,实现自己Function,那么继承自BaseFunction,然后重写他的execute方法即可。


    GroupBy 是一个分组操作,这个操作后得到的是一个GruopingStream,我们将Stream中不同的Tuple按照Word不同分成不同的组,每一个组中word都不同。Aggrate 便是将分组后的数据进行聚合,是不是和map-reduce感觉很像,其实就是一样的,前面的每一个each都是相当于就是在不断的map,然后aggrate就是在reduce,得到聚合的结果。 Aggrator分组有几个,可以根据需要选择一个来实现一般情况下,int是在聚合前初始化,RecudceAggrator是对每一个数据进行转化一个特定的类型T,而Aggrator是将一个Batch转化为一个状态T,两者的初始化时不同的。 public interface Aggregator<T> extends Operation { T init(Object batchId, TridentCollector collector); void aggregate(T state, TridentTuple tuple, TridentCollector collector); void complete(T state, TridentCollector collector); } public interface ReducerAggregator<T> extends Serializable { T init(); T reduce(T curr, TridentTuple tuple); } public interface CombinerAggregator<T> extends Serializable { T init(TridentTuple tuple); T combine(T val1, T val2); T zero(); }
    转载请注明原文地址: https://ju.6miu.com/read-1304558.html
    最新回复(0)