在ConsumerOffsetManager中,存储了topic@group 对应的每一个MessageQueue 的offset信息。 【topic@group ->(queueId,offset)】
内存结果如下,采用fastjson进行序列化。
private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
在TopicConfigManager 中存储了topic 与TopicConfig信息。 即该broker存储有关topic的元数据。同时存储了版本信息DataVersion
public class TopicConfig { private static final String SEPARATOR = " "; public static int defaultReadQueueNums = 16; public static int defaultWriteQueueNums = 16; private String topicName; private int readQueueNums = defaultReadQueueNums; private int writeQueueNums = defaultWriteQueueNums; private int perm = PermName.PERM_READ | PermName.PERM_WRITE; private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG; private int topicSysFlag = 0; private boolean order = false;
}
topic内存数据结构如下,采用fastjson序列化。
private final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(1024); private final DataVersion dataVersion = new DataVersion();
SubscriptionGroupManager 存储了consumer 的group信息。 groupName->SubscriptionGroupConfig 映射关系。
public class SubscriptionGroupConfig { private String groupName; private boolean consumeEnable = true; private boolean consumeFromMinEnable = true; private boolean consumeBroadcastEnable = true; private int retryQueueNums = 1; private int retryMaxTimes = 16; private long brokerId = MixAll.MASTER_ID; private long whichBrokerWhenConsumeSlowly = 1; private boolean notifyConsumerIdsChangedEnable = true;
}
内存数据结构如下:
private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024); private final DataVersion dataVersion = new DataVersion();
ScheduleMessageService 也存储一部分数据结果,目前不详。
