相关文章:
Dubbo源码学习文章目录
前言
Dubbo 的定位是分布式服务框架,为了避免单点压力过大,服务的提供者通常部署多台,如何从服务提供者集群中选取一个进行调用,就依赖于Dubbo的负载均衡策略。
Dubbo 负载均衡策略
Dubbo 负载均衡策略提供下列四种方式:
Random LoadBalance 随机,按权重设置随机概率。 Dubbo的默认负载均衡策略 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
RoundRobin LoadBalance 轮循,按公约后的权重设置轮循比率。 存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
LeastActive LoadBalance 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
ConsistentHash LoadBalance 一致性Hash,相同参数的请求总是发到同一提供者。 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
源码
LoadBalance
首先查看 LoadBalance 接口
Invoker select(List> invokers, URL url, Invocation invocation) throws RpcException;
LoadBalance 定义了一个方法就是从 invokers 列表中选取一个
AbstractLoadBalance
AbstractLoadBalance 抽象类是所有负载均衡策略实现类的父类,实现了LoadBalance接口 的方法,同时提供抽象方法交由子类实现,
public <T> Invoker<T>
select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers ==
null || invokers.
size() ==
0)
return null;
if (invokers.
size() ==
1)
return invokers.
get(
0);
return doSelect(invokers, url, invocation);
}
protected abstract <T> Invoker<T>
doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
RandomLoadBalance
protected <T> Invoker<T>
doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.
size();
int totalWeight =
0;
boolean sameWeight =
true;
for (
int i =
0; i < length; i++) {
int weight =
getWeight(invokers.
get(i), invocation);
totalWeight += weight;
if (sameWeight && i >
0
&& weight !=
getWeight(invokers.
get(i -
1), invocation)) {
sameWeight =
false;
}
}
if (totalWeight >
0 && ! sameWeight) {
int offset = random.
nextInt(totalWeight);
for (
int i =
0; i < length; i++) {
offset -=
getWeight(invokers.
get(i), invocation);
if (offset <
0) {
return invokers.
get(i);
}
}
}
return invokers.
get(random.
nextInt(length));
}
RandomLoadBalance 实现很简单,如果每个提供者的权重都相同,那么根据列表长度直接随机选取一个, 如果权重不同,累加权重值。根据0~累加的权重值 选取一个随机数,然后判断该随机数落在那个提供者上。
RoundRobinLoadBalance
private final ConcurrentMap<String, AtomicPositiveInteger> sequences =
new ConcurrentHashMap<String, AtomicPositiveInteger>();
private final ConcurrentMap<String, AtomicPositiveInteger> weightSequences =
new ConcurrentHashMap<String, AtomicPositiveInteger>();
protected <T> Invoker<T>
doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.
get(
0).
getUrl().
getServiceKey() +
"." + invocation.
getMethodName();
int length = invokers.
size();
int maxWeight =
0;
int minWeight = Integer.
MAX_VALUE;
for (
int i =
0; i < length; i++) {
int weight =
getWeight(invokers.
get(i), invocation);
maxWeight = Math.
max(maxWeight, weight);
minWeight = Math.
min(minWeight, weight);
}
if (maxWeight >
0 && minWeight < maxWeight) {
AtomicPositiveInteger weightSequence = weightSequences.
get(key);
if (weightSequence ==
null) {
weightSequences.
putIfAbsent(key,
new AtomicPositiveInteger());
weightSequence = weightSequences.
get(key);
}
int currentWeight = weightSequence.
getAndIncrement() % maxWeight;
List<Invoker<T>> weightInvokers =
new ArrayList<Invoker<T>>();
for (Invoker<T> invoker : invokers) {
if (
getWeight(invoker, invocation) > currentWeight) {
weightInvokers.
add(invoker);
}
}
int weightLength = weightInvokers.
size();
if (weightLength ==
1) {
return weightInvokers.
get(
0);
}
else if (weightLength >
1) {
invokers = weightInvokers;
length = invokers.
size();
}
}
AtomicPositiveInteger sequence = sequences.
get(key);
if (sequence ==
null) {
sequences.
putIfAbsent(key,
new AtomicPositiveInteger());
sequence = sequences.
get(key);
}
return invokers.
get(sequence.
getAndIncrement() % length);
}
首先也是判断权重是否一致,如果一致,通过维护一个 AtomicInteger 的增长 进行取模乱来轮训。 如果权重不一致,通过维护一个 AtomicInteger 的增长 与最大权重取模作为当前权重,然后获取大于当前权重的列表作为调用者列表,然后进行取模轮训
LeastActiveLoadBalance
LeastActiveLoadBalance 源码比较简单就不列出了,思路主要是,获取最小的活跃数,把活跃数等于最小活跃数的调用者维护成一个数组 如果权重一致随机取出,如果不同则跟 RandomLoadBalance 一致,累加权重,然后随机取出。
ConsistentHashLoadBalance
protected <T> Invoker<T>
doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.
get(
0).
getUrl().
getServiceKey() +
"." + invocation.
getMethodName();
int identityHashCode = System.
identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.
get(key);
if (selector ==
null || selector.
getIdentityHashCode() != identityHashCode) {
selectors.
put(key,
new ConsistentHashSelector<T>(invokers, invocation.
getMethodName(), identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.
get(key);
}
return selector.
select(invocation);
}
public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName,
int identityHashCode) {
this.
virtualInvokers =
new TreeMap<Long, Invoker<T>>();
this.
identityHashCode = System.
identityHashCode(invokers);
URL url = invokers.
get(
0).
getUrl();
this.
replicaNumber = url.
getMethodParameter(methodName,
"hash.nodes",
160);
String[] index = Constants.
COMMA_SPLIT_PATTERN.
split(url.
getMethodParameter(methodName,
"hash.arguments",
"0"));
argumentIndex =
new int[index.
length];
for (
int i =
0; i < index.
length; i ++) {
argumentIndex[i] = Integer.
parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
for (
int i =
0; i < replicaNumber /
4; i++) {
byte[] digest =
md5(invoker.
getUrl().
toFullString() + i);
for (
int h =
0; h <
4; h++) {
long m =
hash(digest, h);
virtualInvokers.
put(m, invoker);
}
}
}
}
通过doselect方法可以看出 ConsistentHashLoadBalance 主要是通过内部类 ConsistentHashSelector 来实现的,首先看ConsistentHashSelector构造函数的源码可以看出 首先根据invokers的url获取分片个数,创建相同大小的虚拟节点。
public Invoker<T>
select(Invocation invocation) {
String key =
toKey(invocation.
getArguments());
byte[] digest =
md5(key);
Invoker<T> invoker =
sekectForKey(
hash(digest,
0));
return invoker;
}
private String
toKey(Object[] args) {
StringBuilder buf =
new StringBuilder();
for (
int i : argumentIndex) {
if (i >=
0 && i < args.
length) {
buf.
append(args[i]);
}
}
return buf.
toString();
}
private Invoker<T>
sekectForKey(
long hash) {
Invoker<T> invoker;
Long key = hash;
if (!virtualInvokers.
containsKey(key)) {
SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.
tailMap(key);
if (tailMap.
isEmpty()) {
key = virtualInvokers.
firstKey();
}
else {
key = tailMap.
firstKey();
}
}
invoker = virtualInvokers.
get(key);
return invoker;
}
然后根据参数的MD5值 获取对应的提供者