Dubbo 中的负载均衡策略

Dubbo 内置了 4 种负载均衡策略

  • RandomLoadBalance:随机负载均衡,随机选择一个服务结点,跟权重相关。该策略是 Dubbo默认负载均衡策略。
  • RoundRobinLoadBalance:轮询负载均衡,轮询选择一个服务结点,跟权重相关。
  • LeastActiveLoadBalance:最少活跃调用数,相同的活跃数随机选择一个结点,跟权重相关。实现方式是对活跃数值前后计数差。使得慢的结点收到更少的请求,因为越慢的结点前后计数差越大。
  • ConsistentHashLoadBalance:一致性哈希负载均衡,即相同的参数的请求总是落在同一个服务结点上。

通过观察类之间依赖图,可以快速帮我们梳理其之间的关系,如下图所示:

实践

纸上得来终觉浅,觉知此时要躬行。

Dubbo 中负载均衡的源码可自行通过源码来学习,为了加深印象,这里我以 Dubbo 源码中的思路来实现这四个负载均衡策略。

准备工作

定义结点接口

这里我们不实现 Dubbo 中的 Invoker,以一个结点类 Endpoint 代替。

Endpoint.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface Endpoint {

/**
* hello方法
*
* @param param 参数
*/
void hello(String param);

/**
* 获取结点名称
*
* @return 结点名称
*/
String name();

/**
* 获取结点权重
*
* @return 结点权重
*/
int weight();
}

实现多个结点

这里我们分别实现 S0, S1, S2 三个结点,代码如下:

S0Endpoint.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class S0Endpoint implements Endpoint {

@Override
public void hello(String param) {
System.out.println("S0 -> " + param);
}

@Override
public String name() {
return "S0";
}

@Override
public int weight() {
return 3;
}
}

S1Endpoint.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class S1Endpoint implements Endpoint {

@Override
public void hello(String param) {
System.out.println("S1 -> " + param);
}

@Override
public String name() {
return "S1";
}

@Override
public int weight() {
return 2;
}
}

S2Endpoint.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class S2Endpoint implements Endpoint {

@Override
public void hello(String param) {
System.out.println("S2 -> " + param);
}

@Override
public String name() {
return "S2";
}

@Override
public int weight() {
return 1;
}
}

定义负载均衡的接口以及抽象类

LoadBalance.java

1
2
3
4
public interface LoadBalance {

Endpoint select(List<Endpoint> endpoints, String param);
}

AbstractLoadBalance.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class AbstractLoadBalance implements LoadBalance {

@Override
public Endpoint select(List<Endpoint> endpoints, String param) {
if (Objects.isNull(endpoints) || endpoints.size() == 0) {
throw new RuntimeException("没有可用的结点");
}
if (endpoints.size() == 1) {
return endpoints.get(0);
}

return doSelect(endpoints, param);
}

protected abstract Endpoint doSelect(List<Endpoint> endpoints, String param);

int getWeight(Endpoint endpoint) {
return endpoint.weight();
}
}

程序入口

1
2
3
4
5
6
7
8
9
10
11
12
public class App {

public static void main(String[] args) {
List<Endpoint> endpoints = Lists.newArrayList(new S0Endpoint(), new S1Endpoint(), new S2Endpoint());
// 需要演示哪种负载均衡策略,替换为对应的实现类即可
LoadBalance loadBalance = new RoundRobinLoadBalance();
for (int i = 0; i < 18; i++) {
Endpoint endpoint = loadBalance.select(endpoints, "LoanBance");
endpoint.hello("LoadBance");
}
}
}

随机负载均衡

Dubbo 中随机负载均衡策略,会受每个服务结点权重的影响。由于随机负载均衡策略的实现比较简单,每一步的作用,可查看注释,这里就不详细讲解。代码如下:

RandomLoadBalance.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class RandomLoadBalance extends AbstractLoadBalance {

@Override
protected Endpoint doSelect(List<Endpoint> endpoints, String param) {
// 先获取结点总数
int length = endpoints.size();
// 每个结点权重是否相等
boolean sameWeight = true;
// 每个计算器的权重
int[] weights = new int[length];
// 获取第一个计算器的权重,Dubbo 中权重会从 Invoker 中取
int firstWeight = getWeight(endpoints.get(0));
weights[0] = firstWeight;
int totalWeight = firstWeight;
for (int i = 1; i < length; i++) {
// 获取后续每一个结点的权重,用于判断所有结点权重是否相等
int weight = getWeight(endpoints.get(i));
weights[i] = weight;
totalWeight += weight;
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}

if (totalWeight > 0 && !sameWeight) {
// 若各个结点权重不相等,则随机一个[0~总权重数]之间的数值
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return endpoints.get(i);
}
}
}

return endpoints.get(ThreadLocalRandom.current().nextInt(length));
}

}

将程序入口中的负载均衡替换为随机负载均衡的实现类,其输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
S0 -> LoadBance
S0 -> LoadBance
S2 -> LoadBance
S0 -> LoadBance
S0 -> LoadBance
S0 -> LoadBance
S2 -> LoadBance
S0 -> LoadBance
S0 -> LoadBance
S1 -> LoadBance
S1 -> LoadBance
S0 -> LoadBance
S1 -> LoadBance
S0 -> LoadBance
S1 -> LoadBance
S0 -> LoadBance
S0 -> LoadBance
S1 -> LoadBance

可发现其调用每个结点的规律大致与所占权重的比例相同。

轮询负载均衡

轮询负载均衡就是依次调用所有的服务结点,和随机负载均衡一样轮询负载均衡也有权重的概念。轮询负载均衡算法可以让服务调用严格的按照我们设置的权重比例来分配,并且跟调用的次数无关。缺点是慢的结点会累计请求,从而导致整个系统变慢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public class RoundRobinLoadBalance extends AbstractLoadBalance {

/** 在内存中存储所有的 服务名+方法 **/
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<>();
/** 保证原子性 **/
private AtomicBoolean updateLock = new AtomicBoolean();
/** 默认的回收期,设置为1分钟 **/
private static final int RECYCLE_PERIOD = 60000;

@Override
protected Endpoint doSelect(List<Endpoint> endpoints, String param) {
System.out.println("-----------------------------------------------------------------");
// Dubbo 中这个 Key 值是通过解析 Invoker 被调用的服务URL+方法名以及对应的参数而拼装的字符串,这里为了演示,直接写死
String key = "com.exp.Endpoint.add";
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (Objects.isNull(map)) {
// 若没有该 key 值的权重信息,则说明是第一次调用,创建新的放入methodWeightMap即可
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
map = methodWeightMap.get(key);
}
// 总权重
int totalWeight = 0;
// 当前最大权重,初始值设为 Long 类型最小值
long maxCurrent = Long.MIN_VALUE;
// 记录当前时间
long now = System.currentTimeMillis();
// 被选择的结点
Endpoint selectedEndpoint = null;
// 被选择的结点的权重轮询信息
WeightedRoundRobin selectedWRR = null;
for (Endpoint endpoint : endpoints) {
String name = endpoint.name();
WeightedRoundRobin weightedRoundRobin = map.get(name);
int weight = getWeight(endpoint);
if (Objects.isNull(weightedRoundRobin)) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(name, weightedRoundRobin);
}
if (weight != weightedRoundRobin.getWeight()) {
weightedRoundRobin.setWeight(weight);
}
// 对权重进行加权操作(其实就上次内存中该节点的权重 + 该节点的真实权重),计算出该次调用该节点的权重
long cur = weightedRoundRobin.increaseCurrent();
System.out.println(String.format("节点%s -> 内存中权重:%s,真实权重:%s,该次调用权重:%s,当前最大权重:%s", name, weightedRoundRobin.current, weight, cur, maxCurrent));
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedEndpoint = endpoint;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if (!updateLock.get() && endpoints.size() != map.size()) {
// 若 methodWeightMap 中该该服务+方法的节点数据量与内存中的不一致
if (updateLock.compareAndSet(false, true)) {
try {
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>();
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
if (Objects.nonNull(selectedEndpoint)) {
long tmpWeight = selectedWRR.current.get();
selectedWRR.sel(totalWeight);
System.out.println(String.format("本地调用选择 %s,同时调整 %s 节点在内存中的权重为 %s - %s = %s", selectedEndpoint.name(), selectedEndpoint.name(), tmpWeight, totalWeight, selectedWRR.current));
System.out.println("此时内存中各节点权重:");
map.entrySet().forEach(item-> System.out.println(String.format(" %s -> 权重:%s", item.getKey(), item.getValue().current)));

System.out.println("-----------------------------------------------------------------");
return selectedEndpoint;
}
System.out.println("-----------------------------------------------------------------");
return endpoints.get(0);
}

/**
* 该对象用于计算每次调用各个结点的权重信息
*/
protected static class WeightedRoundRobin {
/** 每次调用时结点当次的权重值 **/
private AtomicLong current = new AtomicLong(0);
/** 各个结点的真实权重 **/
private int weight;
/** 最后一次更新的时间 **/
private long lastUpdate;

public int getWeight() {
return weight;
}

public void setWeight(int weight) {
this.weight = weight;
}

public long getLastUpdate() {
return lastUpdate;
}

public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}

public long increaseCurrent() {
return current.addAndGet(weight);
}

public void sel(int total) {
current.addAndGet(-1 * total);
}

public AtomicLong getCurrent() {
return current;
}

}
}

为了方便的观察每次调用各个结点的是如何选择的,程序中打印了对应的日志。修改程序入口中的负载均衡策略为轮询负载均衡,观察其输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
-----------------------------------------------------------------
节点S0 -> 内存中权重:3,真实权重:3,该次调用权重:3,当前最大权重:-9223372036854775808
节点S1 -> 内存中权重:2,真实权重:2,该次调用权重:2,当前最大权重:3
节点S2 -> 内存中权重:1,真实权重:1,该次调用权重:1,当前最大权重:3
本地调用选择 S0,同时调整 S0 节点在内存中的权重为 3 - 6 = -3
此时内存中各节点权重:
S0 -> 权重:-3
S1 -> 权重:2
S2 -> 权重:1
-----------------------------------------------------------------
S0 -> LoadBance
-----------------------------------------------------------------
节点S0 -> 内存中权重:0,真实权重:3,该次调用权重:0,当前最大权重:-9223372036854775808
节点S1 -> 内存中权重:4,真实权重:2,该次调用权重:4,当前最大权重:0
节点S2 -> 内存中权重:2,真实权重:1,该次调用权重:2,当前最大权重:4
本地调用选择 S1,同时调整 S1 节点在内存中的权重为 4 - 6 = -2
此时内存中各节点权重:
S0 -> 权重:0
S1 -> 权重:-2
S2 -> 权重:2
-----------------------------------------------------------------
S1 -> LoadBance
-----------------------------------------------------------------
节点S0 -> 内存中权重:3,真实权重:3,该次调用权重:3,当前最大权重:-9223372036854775808
节点S1 -> 内存中权重:0,真实权重:2,该次调用权重:0,当前最大权重:3
节点S2 -> 内存中权重:3,真实权重:1,该次调用权重:3,当前最大权重:3
本地调用选择 S0,同时调整 S0 节点在内存中的权重为 3 - 6 = -3
此时内存中各节点权重:
S0 -> 权重:-3
S1 -> 权重:0
S2 -> 权重:3
-----------------------------------------------------------------
S0 -> LoadBance
-----------------------------------------------------------------
节点S0 -> 内存中权重:0,真实权重:3,该次调用权重:0,当前最大权重:-9223372036854775808
节点S1 -> 内存中权重:2,真实权重:2,该次调用权重:2,当前最大权重:0
节点S2 -> 内存中权重:4,真实权重:1,该次调用权重:4,当前最大权重:2
本地调用选择 S2,同时调整 S2 节点在内存中的权重为 4 - 6 = -2
此时内存中各节点权重:
S0 -> 权重:0
S1 -> 权重:2
S2 -> 权重:-2
-----------------------------------------------------------------
S2 -> LoadBance
-----------------------------------------------------------------
节点S0 -> 内存中权重:3,真实权重:3,该次调用权重:3,当前最大权重:-9223372036854775808
节点S1 -> 内存中权重:4,真实权重:2,该次调用权重:4,当前最大权重:3
节点S2 -> 内存中权重:-1,真实权重:1,该次调用权重:-1,当前最大权重:4
本地调用选择 S1,同时调整 S1 节点在内存中的权重为 4 - 6 = -2
此时内存中各节点权重:
S0 -> 权重:3
S1 -> 权重:-2
S2 -> 权重:-1
-----------------------------------------------------------------
S1 -> LoadBance
-----------------------------------------------------------------
节点S0 -> 内存中权重:6,真实权重:3,该次调用权重:6,当前最大权重:-9223372036854775808
节点S1 -> 内存中权重:0,真实权重:2,该次调用权重:0,当前最大权重:6
节点S2 -> 内存中权重:0,真实权重:1,该次调用权重:0,当前最大权重:6
本地调用选择 S0,同时调整 S0 节点在内存中的权重为 6 - 6 = 0
此时内存中各节点权重:
S0 -> 权重:0
S1 -> 权重:0
S2 -> 权重:0
-----------------------------------------------------------------
S0 -> LoadBance
...

通过观察可发现,S0S1S2结点的选择确实是严格按照其对应的权重比例来进行分配的。

最少活跃调用数负载均衡

最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差,使慢的机器收到更少。

Dubbo 中计算活跃调用数是通过 Dubbo 的 Filter 机制来实现的。具体可以参考 ActiveLimitFilter 类的源码。本文示例直接采用简单的调用前后的耗时进行累加来表示每个结点的活跃值。

改造 Endpoint 接口

由于最少活跃调用数需要获取活跃调用数,故 Endpoint 接口新增返回调用活跃数的方法。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface Endpoint {

/**
* hello方法
*
* @param param 参数
*/
void hello(String param);

/**
* 获取节点名称
*
* @return 节点名称
*/
String name();

/**
* 获取节点权重
*
* @return 节点权重
*/
int weight();

/**
* 获取调用活跃数
*
* @return 调用活跃数
*/
int active();
}

各个结点实现类实现对活跃调用数的累加

S0Endpoint.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class S0Endpoint implements Endpoint {

private int activeValue = 0;

@Override
public void hello(String param) {
long start = System.currentTimeMillis();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("S0 -> " + param);
long remain = (System.currentTimeMillis() - start);
activeValue += remain;
}

@Override
public String name() {
return "S0";
}

@Override
public int weight() {
return 3;
}

@Override
public int active() {
return activeValue;
}
}

S1Endpoint.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class S1Endpoint implements Endpoint {

private int activeValue = 0;

@Override
public void hello(String param) {
long start = System.currentTimeMillis();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("S1 -> " + param);
long remain = (System.currentTimeMillis() - start);
activeValue += remain;
}

@Override
public String name() {
return "S1";
}

@Override
public int weight() {
return 2;
}

@Override
public int active() {
return activeValue;
}
}

S2Endpoint.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class S2Endpoint implements Endpoint {

private int activeValue = 0;

@Override
public void hello(String param) {
long start = System.currentTimeMillis();
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("S2 -> " + param);
long remain = (System.currentTimeMillis() - start);
activeValue += remain;
}

@Override
public String name() {
return "S2";
}

@Override
public int weight() {
return 1;
}

@Override
public int active() {
return activeValue;
}
}

实现最小活跃调用数的负载均衡类

LeastActiveLoadBalance.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class LeastActiveLoadBalance extends AbstractLoadBalance {

@Override
protected Endpoint doSelect(List<Endpoint> endpoints, String param) {
System.out.println("-----------------------------------------------------------------");
// 总结点数
int length = endpoints.size();
// 所有结点的最小活动值
int leastActive = -1;
// 具有最小有效值的结点的数量
int leastCount = 0;
// 定义一个数组来存储每个结点的最小活跃数
int[] leastIndexes = new int[length];
// 权重数组
int[] weights = new int[length];
// 总权重
int totalWeight = 0;
// 最不活跃的结点的权重
int firstWeight = 0;
// 每个最不活跃的结点是否都具有相同的权重
boolean sameWeight = true;

// 过滤掉最不活跃的结点
for (int i = 0; i < length; i++) {
Endpoint endpoint = endpoints.get(i);

// 获取结点的活跃值,
int active = endpoint.active();
// Dubbo 中的结点活跃值是在ActiveLimitFilter这个类中计算的,
// 而本示例由于没有实现Filter机制与Listener机制,直接同步累加对应结点的调用耗时来实现
// 由于本示例的原因,i=0表示S0结点,i=1表示S1结点,i=2表示S2结点
System.out.println(String.format("i = %s, 活跃值:%s", i, active));
// 获取结点的权重
int weight = getWeight(endpoint);
weights[i] = weight;
// 从下面的逻辑可以看出若一个结点的活跃值越大,则不会添加到备选项中
// 如果它是第一个,或者结点的活动编号小于当前的最小活跃值
if (leastActive == -1 || active < leastActive) {
// 将最小活跃值赋值给 leastActive
leastActive = active;
leastCount = 1;
leastIndexes[0] = i;
System.out.println(String.format("i = %s,此时由于当前结点活跃值最小或该结点为第一遍历的结点", i));
totalWeight = weight;
firstWeight = weight;
sameWeight = true;
} else if (active == leastActive) {
System.out.println(String.format("i = %s,此时由于当前结点的活跃值等于最小活跃值", i));
// 若当前结点的活跃值与最小活跃值相同
leastIndexes[leastCount++] = i;
totalWeight += weight;

if (sameWeight && i > 0 && weight != firstWeight) {
sameWeight = false;
}
}
}
if (leastCount == 1) {
return endpoints.get(leastIndexes[0]);
}

if (!sameWeight && totalWeight > 0) {
// 这里与随机负载均衡策略相同
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
System.out.println("-----------------------------------------------------------------");
return endpoints.get(leastIndex);
}
}
}
System.out.println("-----------------------------------------------------------------");
return endpoints.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}

}

将程序入口中的负载均衡替换为随机负载均衡的实现类,其输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
-----------------------------------------------------------------
i = 0, 活跃值:0
i = 0,此时由于当前结点活跃值最小或该结点为第一遍历的结点
i = 1, 活跃值:0
i = 1,此时由于当前结点的活跃值等于最小活跃值
i = 2, 活跃值:0
i = 2,此时由于当前结点的活跃值等于最小活跃值
-----------------------------------------------------------------
S1 -> LoadBance
-----------------------------------------------------------------
i = 0, 活跃值:0
i = 0,此时由于当前结点活跃值最小或该结点为第一遍历的结点
i = 1, 活跃值:1
i = 2, 活跃值:0
i = 2,此时由于当前结点的活跃值等于最小活跃值
-----------------------------------------------------------------
S0 -> LoadBance
-----------------------------------------------------------------
i = 0, 活跃值:2
i = 0,此时由于当前结点活跃值最小或该结点为第一遍历的结点
i = 1, 活跃值:1
i = 1,此时由于当前结点活跃值最小或该结点为第一遍历的结点
i = 2, 活跃值:0
i = 2,此时由于当前结点活跃值最小或该结点为第一遍历的结点
-----------------------------------------------------------------
S2 -> LoadBance
...

通过观察日志可发现,确实实现了最少活跃调用数有限,相同最小活跃数的则随机

一致性哈希负载均衡

Dubbo 中的一致性哈希负载均衡是唯一一个与权重无关的负载均衡算法。使用一致性 Hash 算法,让相同参数的请求总是发到同一结点,当某一结点崩溃时,原本发往该结点的请求,基于虚拟节点,平摊到其它结点,不会引起剧烈变动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class ConsistentHashLoadBalance extends AbstractLoadBalance {

private final ConcurrentMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();

@Override
protected Endpoint doSelect(List<Endpoint> endpoints, String param) {
// Dubbo中这个Key值的含义是被调用的服务名+方法名组成的字符串,与轮询随机算法一致
String key = "com.exp.Endpoint.add";
// identityHashCode 是为了防止新增结点或部分结点失效的问题
int identityHashCode = System.identityHashCode(endpoints);
System.out.println(String.format("identityHashCode : %s", identityHashCode));
ConsistentHashSelector selector = selectors.get(key);
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector(endpoints, identityHashCode));
selector = selectors.get(key);
}
return selector.select(param);
}

private static final class ConsistentHashSelector {
/** 存放结点信息的Map **/
private final TreeMap<Long, Endpoint> virtualEndpoints;
/** 对结点列表取的hashCode **/
private final int identityHashCode;
/** 虚拟结点个数 **/
private final int replicaNumber;

public ConsistentHashSelector(List<Endpoint> endpoints, int identityHashCode) {
this.virtualEndpoints = new TreeMap<>();
this.identityHashCode = identityHashCode;
// Dubbo 默认的虚拟结点个数是160,也可以通过配置文件修改
this.replicaNumber = 4;

// 注意:Dubbo在构建 ConsistentHashSelector 对象时会获取对参数的下标值,用于后续对参数做哈希,
// 这里我们示例比较简单,直接对所有参数做哈希
for (Endpoint endpoint : endpoints) {
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(endpoint.name() + i);
for (int j = 0; j < 4; j++) {
long m = hash(digest, j);
virtualEndpoints.put(m, endpoint);
}
}
}
}

public Endpoint select(String param) {
byte[] digest = md5(param);
// 这里的哈希算法不要采用 java.util.Objects.hash的哈希算法
long key = hash(digest, 0);
System.out.println(String.format("param key : %s", key));
return selectForKey(key);
}

private Endpoint selectForKey(long hash) {
Map.Entry<Long, Endpoint> entry = virtualEndpoints.ceilingEntry(hash);
if (entry == null) {
entry = virtualEndpoints.firstEntry();
}
return entry.getValue();
}

private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}

private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}

}
}

替换程序入口中的负载均衡算法为一致性哈希负载均衡的实现类,观察输出

1
2
3
4
5
6
7
8
9
10
identityHashCode : 1018547642
param key : 119807787
S1 -> LoadBance
identityHashCode : 1018547642
param key : 119807787
S1 -> LoadBance
identityHashCode : 1018547642
param key : 119807787
S1 -> LoadBance
...

通过观察发现对于相同的参数,程序永远调用了 S1 结点。