通过 Storm 消费 Loghub 日志

    xiaoxiao2021-03-25  128

    基本结构和流程

    上图中红色虚线框中就是提供的 loghub storm spout,每个 storm topology 会有一组 spout,同组内的 spout 共同负责读取 logstore 中全部数据。不同 topology 中的 spout 相互不干扰。 每个 topology 需要选择唯一的 loghub consume group 名字来相互标识,同一 topology 内的 spout 通过 Loghub client lib 来完成负载均衡和自动 failover。 spout 从 loghub 中实时读取数据之后,发送至 topology 中的 bolt 节点,定期保存消费完成位置作为 checkpoint 到 loghubserver 端。(也就是说loghub中的日志不论是否消费都会临时保存48h,但是消费后会有checkpoint在loghub中更新) 日志消费的消费模式是默认开启的,因此您可以实时通过指定游标(cursor)和分区(shard)大批量消费日志数据。

    在写入日志后,最基本功能就是如何消费日志(消费日志与查询日志都意味着“读取”日志,两者区别见 这里)。对于一个 shard 中日志,消费过程如下:

    根据时间、Begin、End 等条件获得游标。 通过游标、步长参数读取日志,同时返回下一个位置游标。 不断移动游标进行日志消费。 LogService的三种消费模式,即“BEGIN_CURSOR”,“END_CURSOR”和“SPECIAL_TIMER_CURSOR”,默认是“END_CURSOR”。 BEGIN_CURSOR:从日志头开始消费,如果有checkpoint记录,则从checkpoint处开始消费。checkpoint即游标 END_CURSOR:从日志尾开始消费,如果有checkpoint记录,则从checkpoint处开始消费。 SPECIAL_TIMER_CURSOR:从指定时间点开始消费,如果有checkpoint记录,则从checkpoint处开始消费。单位为秒。 以上三种消费模式都收到checkpoint记录的影响,如果存在checkpoint记录,则从checkpoint处开始消费,不管指定的是什么消费模式。

    使用注意点

    为了防止滥用,每个 logstore 最多支持 5 个 consumer group,对于不再使用的 consumer group,可以使用 Java SDK 中的 DeleteConsumerGroup 接口进行删除。 Spout 的个数最好和 shard 个数相同,否则可能会导致单个 spout 处理数据量过多而处理不过来。 如果单个 shard 的数据量还是太大,超过一个 spout 处理能,则可以使用 shard split 接口分裂 shard,来降低每个 shard 的数据量 在 loghub spout 中,强制依赖 storm 的 ack 机制,用于确认 spout 将消息正确发送至 bolt,所以在 bolt 中一定要调用 ack 进行确认。 publicclassLogHubConfig { //worker 默认的拉取数据的时间间隔 publicstaticfinallong DEFAULT_DATA_FETCH_INTERVAL_MS=200; //consumer group 的名字,不能为空,支持 [a-z][0-9] 和'_','-',长度在 [3-63]字符,只能以小写字母和数字开头结尾 privateString mConsumerGroupName; //consumer 的名字,必须确保同一个 consumer group 下面的各个 consumer 不重名 privateString mWorkerInstanceName; //loghub 数据接口地址 privateString mLogHubEndPoint; //项目名称 privateString mProject; //日志库名称 privateString mLogStore; //云账号的 access key id privateString mAccessId; //云账号的 access key privateString mAccessKey; //用于指出在服务端没有记录 shard 的 checkpoint 的情况下应该从什么位置消费 shard,如果服务端保存了有效的 checkpoint 信息,那么这些取值不起任何作用, mCursorPosition 取值可以是 [BEGIN_CURSOR, END_CURSOR, SPECIAL_TIMER_CURSOR]中的一个,BEGIN_CURSOR 表示从 shard 中的第一条数据开始消费,END_CURSOR 表示从 shard 中的当前时刻的最后一条数据开始消费,SPECIAL_TIMER_CURSOR 和下面的 mLoghubCursorStartTime 配对使用,表示从特定的时刻开始消费数据。 privateLogHubCursorPosition mCursorPosition; //当 mCursorPosition 取值为 SPECIAL_TIMER_CURSOR 时,指定消费时间,单位是秒。 privateint mLoghubCursorStartTime=0; // 轮询获取 loghub 数据的时间间隔,间隔越小,抓取越快,单位是毫秒,默认是 DEFAULT_DATA_FETCH_INTERVAL_MS,建议时间间隔 200ms 以上。 privatelong mDataFetchIntervalMillis; // worker 向服务端汇报心跳的时间间隔,单位是毫秒,建议取值 10000ms。 privatelong mHeartBeatIntervalMillis; //是否按序消费 privateboolean mConsumeInOrder; }
    转载请注明原文地址: https://ju.6miu.com/read-9442.html

    最新回复(0)