基于Heka+Flume+Kafka+ELK的日志系统

    xiaoxiao2022-06-27  24

    前期准备

    ELK官网:https://www.elastic.co/,  软件包下载和完善的文档资料。

    Zookeeper官网:https://zookeeper.apache.org/

    Kafka官网:http://kafka.apache.org/documentation.html,软件包下载和完善的文档资料。

    Flume官网:https://flume.apache.org/

    Heka官网:https://hekad.readthedocs.io/en/v0.10.0/

     

    系统是centos6.664位机器。

    所用软件版本:

    Logstash2.3.3

    JDK:1.8.0_25

    Elasticsearch:2.3.4

    Kibana:4.5.2

    Heka:0.10.0

    Flume:1.7.0

    Zookeeper:3.4.8

    Kafka:0.8.2.2

    Nginx:1.7.6

    Kafka-manager:1.3.1.6

     

    整体架构图

     

     

     

     

    安装步骤

    零.安装JDK

    一.安装Zookeeper集群

    二.安装Kafka集群

    三.安装Elasticsearch集群

    四.安装Flume

    五.业务机器安装Heka

    六.使用Logstash对接KafkaElasticsearch集群

    七.安装Kibana

    八.安装Nginx反向代理

    九.安装kafka-manager

     

    零.安装JDK

    所有相关机器安装JDK,配置PATH, CLASS_PATH, JAVA_HOME

     

    一.安装Zookeeper集群

    准备

    HostnameIP的对应关系

    192.168.0.51  data1

    192.168.0.49  data2

    192.168.0.72  data3

    同时需要配置/etc/hosts文件,添加以上记录

     

    安装

    官网下载安装包,解压到/usr/local/public/,配置,启动【其他服务类似,后续只做配置启动说明】

    wget http://apache.fayea.com/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz

    cd /usr/local/public/

    tar -zxvf zookeeper-3.4.8.tar.gz

    ln -s zookeeper-3.4.8 zookeeper

     

    配置文件

    cd conf/

    cp zoo_samle.cfg zoo.cfg

     

    zoo.cfg的内容

    tickTime=2000

    initLimit=10

    syncLimit=5

    dataDir=/data/zookeeper

    clientPort=2181

    server.1=data1:2888:3888

    server.2=data2:2888:3888

    server.3=data3:2888:3888

     

    创建集群标志文件myid,内容为配置文件中server.后面的数字。

    echo “1” > /usr/local/public/zookeeper/data/myid

     

    另外2台机器的配置一模一样,除了myid文件的内容是2或者3以外。

     

    启动集群

    3台服务器依次执行:bin/zkServer.sh start

     

    测试

    bin/zkServer.sh status 查看当前server属于leader还是follower

    bin/zkCli.sh -server GZHL-192-168-0-51.boyaa.com:2181  连接某一台Zookeeper Server

     

     

    二.安装Kafka集群

    安装

    类似zookeeper,官网下载安装包,解压。

     

    配置文件config/server.properties

    broker.id=1

    log.dirs=/disk1/bigdata/kafka

    zookeeper.connect=192.168.0.51:2181,192.168.0.49:2181,192.168.0.72:2181/kafka

     

    另外2台机器的配置一模一样,除了broker.id文件的值是2或者3以外。

     

    启动集群

    3台服务器依次执行:nohup bin/kafka-server-start.sh config/server.properties 2>&1 >> /dev/null &

     

    测试

    bin/kafka-topics.sh --create --zookeeper GZHL-192-168-0-51.boyaa.com:2181,GZHL-192-168-0-49.boyaa.com:2181,GZHL-192-168-2-147.boyaa.com:2181/kafka --topic test   //创建一个topic

     

    三.安装ElasticSearch集群

    安装

    类似zookeeper,官网下载安装包,解压。

     

    配置文件config/elasticsearch.yml

    cluster.name: dfqp-application

    node.name: node-2

    path.data: /disk1/es/,/disk2/es/,/disk3/es/,/disk4/es/,/disk5/es/,/disk6/es/,/disk7/es/,/disk8/es/,/disk9/es/,/disk10/es/,/disk11/es/,/disk12/es/

    path.logs: /data/other/es/logs

    network.host: 192.168.0.150

    discovery.zen.ping.unicast.hosts: ["192.168.0.149", "192.168.0.150"]

    2台机器的配置一模一样,除了node.namenetwork.host的值

     

    启动集群

    bin/elasticsearch -d

     

    测试

    curl http://192.168.0.149:9200

    curl http://192.168.0.150:9200

    返回以下数据表示ES集群正常

    {

      "name" : "node-2",

      "cluster_name" : "dfqp-application",

      "version" : {

        "number" : "2.3.4",

        "build_hash" : "e455fd0c13dceca8dbbdbb1665d068ae55dabe3f",

        "build_timestamp" : "2016-06-30T11:24:31Z",

        "build_snapshot" : false,

        "lucene_version" : "5.5.0"

      },

      "tagline" : "You Know, for Search"

    }

     

    四.安装Flume

    安装

    类似zookeeper,官网下载安装包,解压。

     

    配置文件conf/lremote.properties

    agent_remote.sources = s1

    agent_remote.sinks = k1

    agent_remote.channels = c1

     

    agent_remote.sources.s1.type = thrift

    agent_remote.sources.s1.bind = 192.168.0.49

    agent_remote.sources.s1.port = 6666

    agent_remote.sources.s1.channels = c1

    agent_remote.sources.s1.threads = 20

     

    agent_remote.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

    agent_remote.sinks.k1.topic = dfqp_server_log_from_heka

    agent_remote.sinks.k1.brokerList = 192.168.0.51:9092,192.168.0.49:9092,192.168.0.72:9092

    agent_remote.sinks.k1.requiredAcks = 0

    agent_remote.sinks.k1.batchSize = 2000

    agent_remote.sinks.k1.channel = c1

    agent_remote.sinks.k1.kafka.request.timeout.ms = 6000

    agent_remote.sinks.k1.kafka.request.timeout.ms = 6000

     

    agent_remote.sources.s1.interceptors = i2

    agent_remote.sources.s1.interceptors.i2.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

    agent_remote.sources.s1.interceptors.i2.headerName=key

    agent_remote.sources.s1.interceptors.i2.preserveExisting=false

     

    agent_remote.channels.c1.type = file

    agent_remote.channels.c1.capacity = 100000000

    agent_remote.channels.c1.dataDirs = /usr/local/public/flume/.flume_remote/c1/data_dir/

    agent_remote.channels.c1.checkpointDir = /usr/local/public/flume/.flume_remote/c1/checkpoint_dir/

     

    启动flume

    nohup bin/flume-ng agent -n agent_remote -c conf/ -f conf/remote.properties -Dflume.log.file=flume_remote.log &

     

     

    五.业务机器安装Heka

    安装

    官网下载源码,编写heka输出插件flume

     

    配置文件conf/log2flume.toml

    [hekad]

    maxprocs = 1

    base_dir = "./base_dir"

    log_info_filename = "logs/info.log"

    log_error_filename = "logs/error.log"

    log_file_max_size = 64

    log_file_max_backups = 7

     

    [LogstreamerInput]

    log_directory = "/data/"

    journal_directory = "./base_dir"

    file_match = '(?P<dir1>[^/]*)/?(?P<dir2>[^/]*)/?(?P<dir3>[^/]*)/?(?P<dir4>[^/]*)/?(?P<dir5>[^/]*)/?(?P<dir6>[^/]*)/?(?P<dir7>[^/]*)/?(?P<file_name>.*)\.log\.?(?P<Seq>\d*)'

    priority = ["^Seq"]

    rescan_interval = "30s"

    oldest_duration = "1h"

    differentiator = ["dfqp-",  "dir1", "-", "dir2", "-", "dir3", "-", "dir4", "-",  "dir5", "-",  "dir6", "-", "dir7",  "-",  "file_name", ".log"]

     

    [paycenter_sqls_output]

    type = "FlumeOutput"

    message_matcher = "TRUE"

    address = "192.168.0.49:6666"

    encoder = "PayloadEncoder"

    batch_size = 2000

    use_buffering = false

     

    [PayloadEncoder]

    append_newlines = false

     

    启动hekad

    ./hekad -config conf/log2flume.toml

     

    六.使用Logstash对接KafkaElasticsearch集群

    安装

    类似zookeeper,官网下载安装包,解压。

     

    配置文件etc/kafka_2_es.conf

    input {

            kafka {

                    topic_id => "my_kafka_topic"

                    group_id => "my_kafka_topic_group"

                    consumer_threads => 4

                    zk_connect => "data1:2181,data2:2181,data3:2181/kafka"

                    codec => "plain"

            }

    }

     

    output {

            elasticsearch {

                    index => "server-log-%{+YYYY.MM.dd}"

                    hosts => ["192.168.0.149", "192.168.0.150"]

            }

    }

    2台机器的配置一模一样。

     

    启动Logstash

    nohup bin/logstash -l logs/logstash${i}.log  -f etc/kafka_2_es.conf > /dev/null &

    其中${i}分别为1,2,3,4,5,6,也就是一台机器启动6logstash进程。

     

    七.安装kibana

    安装

    类似zookeeper,官网下载安装包,解压。

     

    配置文件config/kibana.yml

    elasticsearch.url: "http://192.168.0.149:9200"

     

    启动Logstash

    nohup ./bin/kibana > /dev/null &

     

    八.安装Nginx反向代理

    配置文件/usr/local/nginx-1.7.6/conf/vhosts/vhost.conf

    server

    {

            listen       80;

            server_name  log.oa.com;

     

            location / {

                    proxy_pass http://192.168.0.149:5601$request_uri;

                    proxy_set_header   Host             $host;

                    proxy_set_header   X-Real-IP        $remote_addr;

                    proxy_set_header   X-Forwarded-For  $proxy_add_x_forwarded_for;

     

            }

     

            location ~ .*\.(gif|jpg|jpeg|png|bmp|swf|data|js|css|html|htm)$

            {

                    expires      365d;

            }

     

            access_log /data/other/nginx_logs/${host}_${server_port}_access.log main;

    }

    九. 安装kafka-manager

    git clone https://github.com/yahoo/kafka-manager cd kafka-manager ./sbt clean dist

    会生成一个zip包,解压后得到kafka-manager,打开配置文件conf/application.conf, 修改zk地址。

    启动:nohup bin/kafka-manager -Dconfig.file=conf/application.conf >/dev/null 2>&1 &

    访问:http://192.168.0.51:9000

    转载请注明原文地址: https://ju.6miu.com/read-1124064.html

    最新回复(0)