flume+kafka+spark streaming日志流式处理系统搭建实验

    xiaoxiao2021-03-25  150

    大约2/3年前,基于flume,kafka,storm架构的流式处理系统几乎成为成为业界事实上的标准。时至今日,它依然在流处理方面有着广泛的应用场景。

    伴随着spark的强势崛起,其内置的spark streaming也随着spark的快速版本迭代,逐渐变的稳定和易用。不同于storm采用基于事件(event)级别的流处理,尽管spark steaming以mini-batch方式的近似流处理的微型批处理,和最小统计时间依然徘徊在亚秒级等先天的硬伤,但作为spark生态栈中重要的成员,其发展前景被大家广泛看好,吸引着大批爱好者加入其开发行列。 本次搭建实验也是基于spark streaming作为底层实时计算框架。

    总体架构图如下:

    用于实验的软件栈的版本为:

    软件栈版本flume1.7.0kafkakafka_2.11-0.9.0.1spark streamingspark-1.6.0-bin-hadoop2.6

    一. flume搭建

    Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力, 在我们的系统中目前使用exec方式进行日志采集。

    Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。本测试研究中由kafka来接收数据。

    由于是实验,这里仅用一台flume做日志收集,中转用,Flume中,sources,channels,sink的配置如下:

    1.采集端 flume-kafka-spark-collect.conf

    # Name the components on this agent a1.sources = tailsource-1 a1.sinks = remotesink a1.channels = memoryChnanel-1 # Describe/configure the source a1.sources.tailsource-1.type = exec a1.sources.tailsource-1.command = tail -F /tmp/test/raw_data.txt a1.sources.tailsource-1.channels = memoryChnanel-1 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.memoryChnanel-1.type = memory a1.channels.memoryChnanel-1.keep-alive = 10 a1.channels.memoryChnanel-1.capacity = 100000 a1.channels.memoryChnanel-1.transactionCapacity = 100000 # Bind the source and sink to the channel a1.sinks.remotesink.type = avro a1.sinks.remotesink.hostname = localhost a1.sinks.remotesink.port = 9999 a1.sinks.remotesink.channel = memoryChnanel-1

    2.接收端 flume-kafka-spark-receiver.conf

    这里接收端的源,使用序列化框架avro,映射到本机的9999端口。 注意:端口选择,请不要选择小于1024之前的端口,否则,系统系统时,因为属于公认端口,无法使用而报权限方面的错误。

    补充一下,TCP/IP端口常识:

    按端口号可分为3大类:

    (1)公认端口(WellKnownPorts):从0到1023,它们紧密绑定(binding)于一些服务。通常这些端口的通讯明确表明了某种服务的协议。例如:80端口实际上总是HTTP通讯。

    (2)注册端口(RegisteredPorts):从1024到49151。它们松散地绑定于一些服务。也就是说有许多服务绑定于这些端口,这些端口同样用于许多其它目的。例如:许多系统处理动态端口从1024左右开始。

    (3)动态和/或私有端口(Dynamicand/orPrivatePorts):从49152到65535。理论上,不应为服务分配这些端口。实际上,机器通常从1024起分配动态端口。但也有例外:SUN的RPC端口从32768开始。

    #agent section producer.sources = s producer.channels = c producer.sinks = r #source section producer.sources.s.type = avro producer.sources.s.bind = localhost producer.sources.s.port = 9999 producer.sources.s.channels = c # Each sink's type must be defined producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink producer.sinks.r.topic = mytopic producer.sinks.r.brokerList = localhost:9092,localhost:9093,localhost:9094 producer.sinks.r.requiredAcks = 1 producer.sinks.r.batchSize = 20 producer.sinks.r.channel = c1 #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = org.apache.flume.channel.kafka.KafkaChannel producer.channels.c.capacity = 10000 producer.channels.c.transactionCapacity = 1000 producer.channels.c.brokerList=localhost:9092,localhost:9093,localhost:9094 producer.channels.c.topic=channel1 producer.channels.c.zookeeperConnect=localhost:2181,localhost:2182,localhost:2183

    二. kafka搭建

    Kafka本身是一个高吞吐,高并发的分布式消息队列,也是一个订阅/发布系统。Kafka集群中每个节点都有一个被称为broker的实例,负责缓存数据。Kafka有两类客户端,Producer(消息生产者的)和Consumer(消息消费者)。Kafka中不同业务系统的消息可通过topic进行区分,每个消息都会被分区,用以分担消息读写负载,每个分区又可以有多个副本来防止数据丢失。消费者在具体消费某个topic消息时,指定起始偏移量。Kafka通过Zero-Copy、Exactly Once等技术语义保证了消息传输的实时、高效、可靠以及容错性。

    kafka使用zookeeper来保存其状态数据(如消费数据的offset),故搭建kafka集群时需要先搭建zk集群,zk集群搭建请参阅其他博客,这里不表。

    实验时,手头只有一台机器,因为搭建的是kafka伪分布式集群。

    集群中一台server-1的配置文件server-1.properties 如下:

    ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# listeners=PLAINTEXT://:9092 # The port the socket server listens on port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces host.name=127.0.0.1 # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). #advertised.host.name=<hostname routable by clients> # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. #advertised.port=<port accessible by clients> # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/home/david/bigdata/kafka-cluster/kafkalogs-1 # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168 message.max.bytes=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000

    server-2 / server-3的配置与此相同,基础配置只需更改如下配置:

    对于server-2:

    # 当前机器在集群中的唯一标识,和zookeeper的myid性质一样 broker.id=1 listeners=PLAINTEXT://:9093 # 当前kafka节点对外提供服务的端口 port=9093 # 配置kafka消息目录,主要用于存储kafka消息 log.dirs=/home/david/bigdata/kafka-cluster/kafkalogs-2

    对于server-3:

    # 当前机器在集群中的唯一标识,和zookeeper的myid性质一样 broker.id=2 listeners=PLAINTEXT://:9094 # 当前kafka节点对外提供服务的端口 port=9094 # 配置kafka消息目录,主要用于存储kafka消息 log.dirs=/home/david/bigdata/kafka-cluster/kafkalogs-3

    三. spark streaming侧程序

    Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。它将批处理、流处理、即席查询融为一体。Spark 社区也是相当火爆,平均每三个月迭代一次版本更是体现了它在大数据处理领域的地位。 不同于 Storm基于事件(event)级别的流处理,Spark Streaming 是 mini-batch 形式的近似流处理的微型批处理。

    Spark Streaming 提供了两种从 Kafka 中获取消息的方式:

    ① 利用 Kafka 消费者高级 API 在 Spark 的工作节点上创建消费者线程,订阅 Kafka 中的消息,数据会传输到 Spark 工作节点的执行器中,但是默认配置下这种方法在 Spark Job 出错时会导致数据丢失,如果要保证数据可靠性,需要在 Spark Streaming 中开启Write Ahead Logs(WAL),也就是上文提到的 Kafka 用来保证数据可靠性和一致性的数据保存方式。可以选择让 Spark 程序把 WAL 保存在分布式文件系统(比如 HDFS)中。

    ② 不需要建立消费者线程(Receivers),由spark streaming自身负责记录消费消息的偏移量,相比于原处理流程,不再使用zookeeper来记录消费数据的偏移量,简化了交互流程,提高了运行效率。使用 createDirectStream 接口直接去读取 Kafka 的 WAL,将 Kafka 分区与 RDD 分区做一对一映射,相较于第一种方法,不需再维护一份 WAL 数据,提高了性能。读取数据的偏移量由 Spark Streaming 程序通过检查点机制自身处理,避免在程序出错的情况下重现第一种方法重复读取数据的情况,消除了 Spark Streaming 与 ZooKeeper/Kafka 数据不一致的风险。保证每条消息只会被 Spark Streaming 处理一次。以下代码片通过第二种方式读取 Kafka 中的数据:

    spark streaming集群搭建不再详述,请参考其他博客进行搭建。 这里使用spark streaming的scala编码方式,进行文本流的实时统计计算。统计slideDuration时间段内,制定滑动窗口大小范围的数据的词频。 spark侧测试代码如下:

    package com.david.spark.toturial.sparkstreaming.flume_kafka_spark import kafka.serializer.StringDecoder import org.apache.log4j.{Level, Logger} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * Author: david * Date : 3/7/17 */ object StreamingDataTest { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN); Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR); val conf = new SparkConf().setAppName("StreamingDataTest").setMaster("local[4]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) // Kafka的topic val topics = Set("mytopic") //kafka brokers列表 val brokers = "localhost:9092,localhost:9093,localhost:9094" //kafka查询参数 val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder") //创建direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) //kafkaStream这个tuple的第二部分为接收kafka topic里的文本流 val rawDStream = kafkaStream.flatMap(_._2.split("\\s+")).map((_, 1)) val resDStream = rawDStream.reduceByKeyAndWindow( (v1: Int, v2: Int) => { v1 + v2 }, Seconds(8), Seconds(4)); resDStream.print(); ssc.start() ssc.awaitTermination() } }

    几大组件配置完成以后,开始进行流式计算测试:

    四. 整合之各模块启动流程

    4.1 flume

    a). 启动flume采集端进程

    bin/flume-ng agent –conf conf –conf-file conf/flume-kafka-spark-collect.conf –name a1 -Dflume.root.logger=INFO,console

    b). 启动flume聚合/分发端进程

    bin/flume-ng agent –conf conf –conf-file conf/flume-kafka-spark-receiver.conf –name producer -Dflume.root.logger=INFO,console

    4.2 zookeeper

    a). 启动zk伪分布式集群第一台节点

    ./server001/zookeeper-3.4.9/bin/zkServer.sh start &

    b). 启动zk伪分布式集群第二台节点

    ./server002/zookeeper-3.4.9/bin/zkServer.sh start &

    c). 启动zk伪分布式集群第三台节点

    ./server003/zookeeper-3.4.9/bin/zkServer.sh start &

    4.3 kafka

    ①启动kafka集群

    a). 启动kafka伪分布式集群第一台节点

    kafka_2.11-0.9.0.1/bin/kafka-server-start.sh kafka_2.11-0.9.0.1/config/server-1.properties &

    b). 启动kafka伪分布式集群第二台节点

    kafka_2.11-0.9.0.1/bin/kafka-server-start.sh kafka_2.11-0.9.0.1/config/server-2.properties &

    c). 启动kafka伪分布式集群第三台节点

    kafka_2.11-0.9.0.1/bin/kafka-server-start.sh kafka_2.11-0.9.0.1/config/server-3.properties &

    ②创建实验用topic,名字为:mytopic

    kafka_2.11-0.9.0.1/bin/kafka –create-topic.sh –replica 3 –partition 8 –topic mytopic –zookeeper hadoop1:2181,hadoop1:2182,hadoop1:2183

    4.4 启动上述spark程序

    4.5 模拟在文本末端增加数据

    echo “flume kafka spark streaming is a magic streaming programming” >> /tmp/test/raw_data.txt

    参考列表: 1.http://blog.csdn.net/wangweislk/article/details/47293523 2.http://www.aboutyun.com/thread-16851-1-1.html 3.http://www.aboutyun.com/thread-20705-1-1.html 4.http://www.ibm.com/developerworks/cn/analytics/library/ba-1512-elkstack-logprocessing/index.html

    转载请注明原文地址: https://ju.6miu.com/read-5062.html

    最新回复(0)