继续上文的flume进行学习,不多说, 直接进入主题。
Flume支持根据zookeeper的agent的配置。这是个实验性的特征(我估计这么说可能说明目前还不是很稳定,猜的),配置文件需要上传到zookeeper上面,有着配置文件的后缀。配置文件被保存在zookeeper的节点数据中,以下是节点树查看agent的a1和a2:
- /flume |- /a1 [Agent config file] |- /a2 [Agent config file] 一旦配置文件上传完毕,使用如下操作启动agent:
$ bin/flume-ng agent –conf conf -z zkhost:2181,zkhost1:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console
Argument Name Default Description z – Zookeeper connection string. Comma separated list of hostname:port p /flume Base Path in Zookeeper to store Agent configurations
(中间关于插件部分不做介绍)
Flume分布式中包含的Avro客户端可以通过Avro的RPC机制发送一个文件到Flume的Avro源。
$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
上面的命令将会发送/usr/logs/log.10的内容到监听此接口的flume源中。
Flume支持以下的机制来从不同的log系统中读取数据:
AvroThriftSyslogNetcat
为了通过多个agent来流动数据,前一个agent的sink和后一个hop的souce必须avro的类型,并且sink要指向souce的主机的ip和端口。
在log收集中一个比较常用的方案是:大量的日志生产者发送数据到少数消费者agent,这些消费者agent带有存储的子系统。比如,来自于上百个网络服务器的日志收集器发送给十几个agent,这些agent把数据写入HDFS集群:
这可以通过配置大量的第一层的agent的sink为avro来完成,并且他们所有都指向一个avro的source,第二层的agent把接收到的事件整合到一个单独的channel中,最终被来自于目的地的sink所消费。
Flume支持把事件流发送个一个或者多个目的地中。这个可以通过定义一个流的multiplexer来完成,这个转化器可以复制或者选择指定的事件到一个或者多个channel中。
上面的例子展示了来自于代理foo的源把流展开到了三个不同的channel中。这种展开可以通过复制或者multiplexer来完成。在复制流的情况下,每个事件被分发到所有的三个channel中。在multiplexing的情况下,如果事件的属性符合事先预设置的值得话,event事件被发送到三个channel的子集中。比如,一个event的属性叫做txnType被设置成“customer”,那么这个流会走channel1和channel3。如果他是vector的话,那么他会走channel2,否则走channel3.这种映射关系可以在配置文件中指定。
正如之前所述一样,flume的agent的配置可以从java格式的文件中读取获得。
想要定义一个流到一个单独的agent中,你需要凭借channel来连接sink和souce。你需要列出所有的sources,sinks和channels(对于给定的源来说),然后把sink和souce指向一个channel。source的实例可以指向多个channels,但是一个sink只能指定一个channel,格式如下:
# list the sources, sinks and channels for the agent <Agent>.sources = <Source> <Agent>.sinks = <Sink> <Agent>.channels = <Channel1> <Channel2> # set channel for source <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ... # set channel for sink <Agent>.sinks.<Sink>.channel = <Channel1> 比如,一个名字叫做agent_foo的agent从外部的avro客户端读取数据并且通过内存channel发送给HDFS,那么配置文件weblog.config如下:
# list the sources, sinks and channels for the agent agent_foo.sources = avro-appserver-src-1 agent_foo.sinks = hdfs-sink-1 agent_foo.channels = mem-channel-1 # set channel for source agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1 # set channel for sink agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1 这将会使得事件流通过内存channel:mem-channel-1从avro-appserver-src-1到hdfs-sink-1中。当agent使用weblog.config作为他的配置文件的时候,将会初始化该流。
在定义流之后,你需要为每个source、channel、sink定义属性值。这是在相同层次的命名空间下完成的,也正是在这里我们定义控件类型和一些其他的属性值:
# properties for sources <Agent>.sources.<Source>.<someProperty> = <someValue> # properties for channels <Agent>.channel.<Channel>.<someProperty> = <someValue> # properties for sinks <Agent>.sources.<Sink>.<someProperty> = <someValue> type属性需要被设置到每个组件中,是为了让flume知道这是哪种类型的对象。每个souce、sink、channel类型有他自己函数所需的一系列属性值。所有这些所需的属性需要被设置 ,在前一个例子当中,通过内存channel:mem-channel-1我们把avro-app souce的流导入hdfs-clust sink的集群中。下面是里面所有组件的配置的例子:
agent_foo.sources = avro-AppSrv-source agent_foo.sinks = hdfs-Cluster1-sink agent_foo.channels = mem-channel-1 # set channel for sources, sinks # properties of avro-AppSrv-source agent_foo.sources.avro-AppSrv-source.type = avro agent_foo.sources.avro-AppSrv-source.bind = localhost agent_foo.sources.avro-AppSrv-source.port = 10000 # properties of mem-channel-1 agent_foo.channels.mem-channel-1.type = memory agent_foo.channels.mem-channel-1.capacity = 1000 agent_foo.channels.mem-channel-1.transactionCapacity = 100 # properties of hdfs-Cluster1-sink agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata #... 一个独立的agent可以包含多个独立的流。你可以在一个配置文件中列出多个source、sink、channel。这些组件可以被用来形成多个流:
# list the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Source2> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> 接下来,你可以把相应的souce和sink连接起来来建立连个截然不同的流。例如,如果你想要在一个agent中建立两个流,一个来自与外部的avro客户端到外部的hdfs,另外一个来自于外部的tail输出到avro sink中。配置如下:
# list the sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source1 exec-tail-source2 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 file-channel-2 # flow #1 configuration agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 # flow #2 configuration agent_foo.sources.exec-tail-source2.channels = file-channel-2 agent_foo.sinks.avro-forward-sink2.channel = file-channel-2 想要建立一个多层级的流,你的第一个hop需要一个avro/thrift sink用来指向下一个hop的avro/thrift的source。这将会导致第一个Flume的agent的事件流流入到下一个Flume的agent中。比如,你定期的使用avro客户端发送文件到本地flume 的agent中,那么这个本地的agent可以把流转向另一个agent用来存储数据。
weblog 的agent配置如下:
# list sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source agent_foo.sinks = avro-forward-sink agent_foo.channels = file-channel # define the flow agent_foo.sources.avro-AppSrv-source.channels = file-channel agent_foo.sinks.avro-forward-sink.channel = file-channel # avro sink properties agent_foo.sources.avro-forward-sink.type = avro agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100 agent_foo.sources.avro-forward-sink.port = 10000 # configure other pieces #... HDFS的agent配置如下:
# list sources, sinks and channels in the agent agent_foo.sources = avro-collection-source agent_foo.sinks = hdfs-sink agent_foo.channels = mem-channel # define the flow agent_foo.sources.avro-collection-source.channels = mem-channel agent_foo.sinks.hdfs-sink.channel = mem-channel # avro sink properties agent_foo.sources.avro-collection-source.type = avro agent_foo.sources.avro-collection-source.bind = 10.1.1.100 agent_foo.sources.avro-collection-source.port = 10000 # configure other pieces #... 这里我们把来自于weblog的agent的avro-forward-sink连接到了来自于hdfs的agent的avro-collection-source。这将会导致来自于外部appserver的事件最终被保存在hdfs中。