Dubbo
内置了 4 种负载均衡策略
RandomLoadBalance
:随机负载均衡,随机选择一个服务结点,跟权重相关。该策略是 Dubbo
的默认 负载均衡策略。
RoundRobinLoadBalance
:轮询负载均衡,轮询选择一个服务结点,跟权重相关。
LeastActiveLoadBalance
:最少活跃调用数,相同的活跃数随机选择一个结点,跟权重相关。实现方式是对活跃数值前后计数差。使得慢的结点收到更少的请求,因为越慢的结点前后计数差越大。
ConsistentHashLoadBalance
:一致性哈希负载均衡,即相同的参数的请求总是落在同一个服务结点上。
通过观察类之间依赖图,可以快速帮我们梳理其之间的关系,如下图所示:
实践
纸上得来终觉浅,觉知此时要躬行。
Dubbo
中负载均衡的源码可自行通过源码来学习,为了加深印象,这里我以 Dubbo
源码中的思路来实现这四个负载均衡策略。
准备工作 定义结点接口 这里我们不实现 Dubbo
中的 Invoker
,以一个结点类 Endpoint
代替。
Endpoint.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public interface Endpoint { void hello (String param) ; String name () ; int weight () ; }
实现多个结点 这里我们分别实现 S0
, S1
, S2
三个结点,代码如下:
S0Endpoint.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class S0Endpoint implements Endpoint { @Override public void hello (String param) { System.out.println("S0 -> " + param); } @Override public String name () { return "S0" ; } @Override public int weight () { return 3 ; } }
S1Endpoint.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class S1Endpoint implements Endpoint { @Override public void hello (String param) { System.out.println("S1 -> " + param); } @Override public String name () { return "S1" ; } @Override public int weight () { return 2 ; } }
S2Endpoint.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class S2Endpoint implements Endpoint { @Override public void hello (String param) { System.out.println("S2 -> " + param); } @Override public String name () { return "S2" ; } @Override public int weight () { return 1 ; } }
定义负载均衡的接口以及抽象类 LoadBalance.java
1 2 3 4 public interface LoadBalance { Endpoint select (List<Endpoint> endpoints, String param) ; }
AbstractLoadBalance.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public abstract class AbstractLoadBalance implements LoadBalance { @Override public Endpoint select (List<Endpoint> endpoints, String param) { if (Objects.isNull(endpoints) || endpoints.size() == 0 ) { throw new RuntimeException ("没有可用的结点" ); } if (endpoints.size() == 1 ) { return endpoints.get(0 ); } return doSelect(endpoints, param); } protected abstract Endpoint doSelect (List<Endpoint> endpoints, String param) ; int getWeight (Endpoint endpoint) { return endpoint.weight(); } }
程序入口 1 2 3 4 5 6 7 8 9 10 11 12 public class App { public static void main (String[] args) { List<Endpoint> endpoints = Lists.newArrayList(new S0Endpoint (), new S1Endpoint (), new S2Endpoint ()); LoadBalance loadBalance = new RoundRobinLoadBalance (); for (int i = 0 ; i < 18 ; i++) { Endpoint endpoint = loadBalance.select(endpoints, "LoanBance" ); endpoint.hello("LoadBance" ); } } }
随机负载均衡 Dubbo
中随机负载均衡策略,会受每个服务结点权重的影响。由于随机负载均衡策略的实现比较简单,每一步的作用,可查看注释,这里就不详细讲解。代码如下:
RandomLoadBalance.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class RandomLoadBalance extends AbstractLoadBalance { @Override protected Endpoint doSelect (List<Endpoint> endpoints, String param) { int length = endpoints.size(); boolean sameWeight = true ; int [] weights = new int [length]; int firstWeight = getWeight(endpoints.get(0 )); weights[0 ] = firstWeight; int totalWeight = firstWeight; for (int i = 1 ; i < length; i++) { int weight = getWeight(endpoints.get(i)); weights[i] = weight; totalWeight += weight; if (sameWeight && weight != firstWeight) { sameWeight = false ; } } if (totalWeight > 0 && !sameWeight) { int offset = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0 ; i < length; i++) { offset -= weights[i]; if (offset < 0 ) { return endpoints.get(i); } } } return endpoints.get(ThreadLocalRandom.current().nextInt(length)); } }
将程序入口中的负载均衡替换为随机负载均衡的实现类,其输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 S0 -> LoadBance S0 -> LoadBance S2 -> LoadBance S0 -> LoadBance S0 -> LoadBance S0 -> LoadBance S2 -> LoadBance S0 -> LoadBance S0 -> LoadBance S1 -> LoadBance S1 -> LoadBance S0 -> LoadBance S1 -> LoadBance S0 -> LoadBance S1 -> LoadBance S0 -> LoadBance S0 -> LoadBance S1 -> LoadBance
可发现其调用每个结点的规律大致与所占权重的比例相同。
轮询负载均衡 轮询负载均衡就是依次调用所有的服务结点,和随机负载均衡一样轮询负载均衡也有权重的概念。轮询负载均衡算法可以让服务调用严格的按照我们设置的权重比例来分配,并且跟调用的次数无关。缺点是慢的结点会累计请求,从而导致整个系统变慢。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 public class RoundRobinLoadBalance extends AbstractLoadBalance { private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap <>(); private AtomicBoolean updateLock = new AtomicBoolean (); private static final int RECYCLE_PERIOD = 60000 ; @Override protected Endpoint doSelect (List<Endpoint> endpoints, String param) { System.out.println("-----------------------------------------------------------------" ); String key = "com.exp.Endpoint.add" ; ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (Objects.isNull(map)) { methodWeightMap.putIfAbsent(key, new ConcurrentHashMap <>()); map = methodWeightMap.get(key); } int totalWeight = 0 ; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Endpoint selectedEndpoint = null ; WeightedRoundRobin selectedWRR = null ; for (Endpoint endpoint : endpoints) { String name = endpoint.name(); WeightedRoundRobin weightedRoundRobin = map.get(name); int weight = getWeight(endpoint); if (Objects.isNull(weightedRoundRobin)) { weightedRoundRobin = new WeightedRoundRobin (); weightedRoundRobin.setWeight(weight); map.putIfAbsent(name, weightedRoundRobin); } if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } long cur = weightedRoundRobin.increaseCurrent(); System.out.println(String.format("节点%s -> 内存中权重:%s,真实权重:%s,该次调用权重:%s,当前最大权重:%s" , name, weightedRoundRobin.current, weight, cur, maxCurrent)); weightedRoundRobin.setLastUpdate(now); if (cur > maxCurrent) { maxCurrent = cur; selectedEndpoint = endpoint; selectedWRR = weightedRoundRobin; } totalWeight += weight; } if (!updateLock.get() && endpoints.size() != map.size()) { if (updateLock.compareAndSet(false , true )) { try { ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap <>(); newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); methodWeightMap.put(key, newMap); } finally { updateLock.set(false ); } } } if (Objects.nonNull(selectedEndpoint)) { long tmpWeight = selectedWRR.current.get(); selectedWRR.sel(totalWeight); System.out.println(String.format("本地调用选择 %s,同时调整 %s 节点在内存中的权重为 %s - %s = %s" , selectedEndpoint.name(), selectedEndpoint.name(), tmpWeight, totalWeight, selectedWRR.current)); System.out.println("此时内存中各节点权重:" ); map.entrySet().forEach(item-> System.out.println(String.format(" %s -> 权重:%s" , item.getKey(), item.getValue().current))); System.out.println("-----------------------------------------------------------------" ); return selectedEndpoint; } System.out.println("-----------------------------------------------------------------" ); return endpoints.get(0 ); } protected static class WeightedRoundRobin { private AtomicLong current = new AtomicLong (0 ); private int weight; private long lastUpdate; public int getWeight () { return weight; } public void setWeight (int weight) { this .weight = weight; } public long getLastUpdate () { return lastUpdate; } public void setLastUpdate (long lastUpdate) { this .lastUpdate = lastUpdate; } public long increaseCurrent () { return current.addAndGet(weight); } public void sel (int total) { current.addAndGet(-1 * total); } public AtomicLong getCurrent () { return current; } } }
为了方便的观察每次调用各个结点的是如何选择的,程序中打印了对应的日志。修改程序入口中的负载均衡策略为轮询负载均衡,观察其输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 ----------------------------------------------------------------- 节点S0 -> 内存中权重:3,真实权重:3,该次调用权重:3,当前最大权重:-9223372036854775808 节点S1 -> 内存中权重:2,真实权重:2,该次调用权重:2,当前最大权重:3 节点S2 -> 内存中权重:1,真实权重:1,该次调用权重:1,当前最大权重:3 本地调用选择 S0,同时调整 S0 节点在内存中的权重为 3 - 6 = -3 此时内存中各节点权重: S0 -> 权重:-3 S1 -> 权重:2 S2 -> 权重:1 ----------------------------------------------------------------- S0 -> LoadBance ----------------------------------------------------------------- 节点S0 -> 内存中权重:0,真实权重:3,该次调用权重:0,当前最大权重:-9223372036854775808 节点S1 -> 内存中权重:4,真实权重:2,该次调用权重:4,当前最大权重:0 节点S2 -> 内存中权重:2,真实权重:1,该次调用权重:2,当前最大权重:4 本地调用选择 S1,同时调整 S1 节点在内存中的权重为 4 - 6 = -2 此时内存中各节点权重: S0 -> 权重:0 S1 -> 权重:-2 S2 -> 权重:2 ----------------------------------------------------------------- S1 -> LoadBance ----------------------------------------------------------------- 节点S0 -> 内存中权重:3,真实权重:3,该次调用权重:3,当前最大权重:-9223372036854775808 节点S1 -> 内存中权重:0,真实权重:2,该次调用权重:0,当前最大权重:3 节点S2 -> 内存中权重:3,真实权重:1,该次调用权重:3,当前最大权重:3 本地调用选择 S0,同时调整 S0 节点在内存中的权重为 3 - 6 = -3 此时内存中各节点权重: S0 -> 权重:-3 S1 -> 权重:0 S2 -> 权重:3 ----------------------------------------------------------------- S0 -> LoadBance ----------------------------------------------------------------- 节点S0 -> 内存中权重:0,真实权重:3,该次调用权重:0,当前最大权重:-9223372036854775808 节点S1 -> 内存中权重:2,真实权重:2,该次调用权重:2,当前最大权重:0 节点S2 -> 内存中权重:4,真实权重:1,该次调用权重:4,当前最大权重:2 本地调用选择 S2,同时调整 S2 节点在内存中的权重为 4 - 6 = -2 此时内存中各节点权重: S0 -> 权重:0 S1 -> 权重:2 S2 -> 权重:-2 ----------------------------------------------------------------- S2 -> LoadBance ----------------------------------------------------------------- 节点S0 -> 内存中权重:3,真实权重:3,该次调用权重:3,当前最大权重:-9223372036854775808 节点S1 -> 内存中权重:4,真实权重:2,该次调用权重:4,当前最大权重:3 节点S2 -> 内存中权重:-1,真实权重:1,该次调用权重:-1,当前最大权重:4 本地调用选择 S1,同时调整 S1 节点在内存中的权重为 4 - 6 = -2 此时内存中各节点权重: S0 -> 权重:3 S1 -> 权重:-2 S2 -> 权重:-1 ----------------------------------------------------------------- S1 -> LoadBance ----------------------------------------------------------------- 节点S0 -> 内存中权重:6,真实权重:3,该次调用权重:6,当前最大权重:-9223372036854775808 节点S1 -> 内存中权重:0,真实权重:2,该次调用权重:0,当前最大权重:6 节点S2 -> 内存中权重:0,真实权重:1,该次调用权重:0,当前最大权重:6 本地调用选择 S0,同时调整 S0 节点在内存中的权重为 6 - 6 = 0 此时内存中各节点权重: S0 -> 权重:0 S1 -> 权重:0 S2 -> 权重:0 ----------------------------------------------------------------- S0 -> LoadBance ...
通过观察可发现,S0
、S1
、S2
结点的选择确实是严格按照其对应的权重比例来进行分配的。
最少活跃调用数负载均衡 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差,使慢的机器收到更少。
Dubbo
中计算活跃调用数是通过 Dubbo 的 Filter 机制来实现的。具体可以参考 ActiveLimitFilter
类的源码。本文示例直接采用简单的调用前后的耗时进行累加来表示每个结点的活跃值。
改造 Endpoint 接口 由于最少活跃调用数需要获取活跃调用数,故 Endpoint
接口新增返回调用活跃数的方法。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public interface Endpoint { void hello (String param) ; String name () ; int weight () ; int active () ; }
各个结点实现类实现对活跃调用数的累加 S0Endpoint.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class S0Endpoint implements Endpoint { private int activeValue = 0 ; @Override public void hello (String param) { long start = System.currentTimeMillis(); try { Thread.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("S0 -> " + param); long remain = (System.currentTimeMillis() - start); activeValue += remain; } @Override public String name () { return "S0" ; } @Override public int weight () { return 3 ; } @Override public int active () { return activeValue; } }
S1Endpoint.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class S1Endpoint implements Endpoint { private int activeValue = 0 ; @Override public void hello (String param) { long start = System.currentTimeMillis(); try { Thread.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("S1 -> " + param); long remain = (System.currentTimeMillis() - start); activeValue += remain; } @Override public String name () { return "S1" ; } @Override public int weight () { return 2 ; } @Override public int active () { return activeValue; } }
S2Endpoint.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class S2Endpoint implements Endpoint { private int activeValue = 0 ; @Override public void hello (String param) { long start = System.currentTimeMillis(); try { Thread.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("S2 -> " + param); long remain = (System.currentTimeMillis() - start); activeValue += remain; } @Override public String name () { return "S2" ; } @Override public int weight () { return 1 ; } @Override public int active () { return activeValue; } }
实现最小活跃调用数的负载均衡类 LeastActiveLoadBalance.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 public class LeastActiveLoadBalance extends AbstractLoadBalance { @Override protected Endpoint doSelect (List<Endpoint> endpoints, String param) { System.out.println("-----------------------------------------------------------------" ); int length = endpoints.size(); int leastActive = -1 ; int leastCount = 0 ; int [] leastIndexes = new int [length]; int [] weights = new int [length]; int totalWeight = 0 ; int firstWeight = 0 ; boolean sameWeight = true ; for (int i = 0 ; i < length; i++) { Endpoint endpoint = endpoints.get(i); int active = endpoint.active(); System.out.println(String.format("i = %s, 活跃值:%s" , i, active)); int weight = getWeight(endpoint); weights[i] = weight; if (leastActive == -1 || active < leastActive) { leastActive = active; leastCount = 1 ; leastIndexes[0 ] = i; System.out.println(String.format("i = %s,此时由于当前结点活跃值最小或该结点为第一遍历的结点" , i)); totalWeight = weight; firstWeight = weight; sameWeight = true ; } else if (active == leastActive) { System.out.println(String.format("i = %s,此时由于当前结点的活跃值等于最小活跃值" , i)); leastIndexes[leastCount++] = i; totalWeight += weight; if (sameWeight && i > 0 && weight != firstWeight) { sameWeight = false ; } } } if (leastCount == 1 ) { return endpoints.get(leastIndexes[0 ]); } if (!sameWeight && totalWeight > 0 ) { int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0 ; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0 ) { System.out.println("-----------------------------------------------------------------" ); return endpoints.get(leastIndex); } } } System.out.println("-----------------------------------------------------------------" ); return endpoints.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
将程序入口中的负载均衡替换为随机负载均衡的实现类,其输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 ----------------------------------------------------------------- i = 0, 活跃值:0 i = 0,此时由于当前结点活跃值最小或该结点为第一遍历的结点 i = 1, 活跃值:0 i = 1,此时由于当前结点的活跃值等于最小活跃值 i = 2, 活跃值:0 i = 2,此时由于当前结点的活跃值等于最小活跃值 ----------------------------------------------------------------- S1 -> LoadBance ----------------------------------------------------------------- i = 0, 活跃值:0 i = 0,此时由于当前结点活跃值最小或该结点为第一遍历的结点 i = 1, 活跃值:1 i = 2, 活跃值:0 i = 2,此时由于当前结点的活跃值等于最小活跃值 ----------------------------------------------------------------- S0 -> LoadBance ----------------------------------------------------------------- i = 0, 活跃值:2 i = 0,此时由于当前结点活跃值最小或该结点为第一遍历的结点 i = 1, 活跃值:1 i = 1,此时由于当前结点活跃值最小或该结点为第一遍历的结点 i = 2, 活跃值:0 i = 2,此时由于当前结点活跃值最小或该结点为第一遍历的结点 ----------------------------------------------------------------- S2 -> LoadBance ...
通过观察日志可发现,确实实现了最少活跃调用数有限,相同最小活跃数的则随机
一致性哈希负载均衡 Dubbo 中的一致性哈希负载均衡是唯一一个与权重无关的负载均衡算法。使用一致性 Hash 算法,让相同参数的请求总是发到同一结点,当某一结点崩溃时,原本发往该结点的请求,基于虚拟节点,平摊到其它结点,不会引起剧烈变动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 public class ConsistentHashLoadBalance extends AbstractLoadBalance { private final ConcurrentMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap <>(); @Override protected Endpoint doSelect (List<Endpoint> endpoints, String param) { String key = "com.exp.Endpoint.add" ; int identityHashCode = System.identityHashCode(endpoints); System.out.println(String.format("identityHashCode : %s" , identityHashCode)); ConsistentHashSelector selector = selectors.get(key); if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector (endpoints, identityHashCode)); selector = selectors.get(key); } return selector.select(param); } private static final class ConsistentHashSelector { private final TreeMap<Long, Endpoint> virtualEndpoints; private final int identityHashCode; private final int replicaNumber; public ConsistentHashSelector (List<Endpoint> endpoints, int identityHashCode) { this .virtualEndpoints = new TreeMap <>(); this .identityHashCode = identityHashCode; this .replicaNumber = 4 ; for (Endpoint endpoint : endpoints) { for (int i = 0 ; i < replicaNumber / 4 ; i++) { byte [] digest = md5(endpoint.name() + i); for (int j = 0 ; j < 4 ; j++) { long m = hash(digest, j); virtualEndpoints.put(m, endpoint); } } } } public Endpoint select (String param) { byte [] digest = md5(param); long key = hash(digest, 0 ); System.out.println(String.format("param key : %s" , key)); return selectForKey(key); } private Endpoint selectForKey (long hash) { Map.Entry<Long, Endpoint> entry = virtualEndpoints.ceilingEntry(hash); if (entry == null ) { entry = virtualEndpoints.firstEntry(); } return entry.getValue(); } private long hash (byte [] digest, int number) { return (((long ) (digest[3 + number * 4 ] & 0xFF ) << 24 ) | ((long ) (digest[2 + number * 4 ] & 0xFF ) << 16 ) | ((long ) (digest[1 + number * 4 ] & 0xFF ) << 8 ) | (digest[number * 4 ] & 0xFF )) & 0xFFFFFFFFL ; } private byte [] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5" ); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException (e.getMessage(), e); } md5.reset(); byte [] bytes = value.getBytes(StandardCharsets.UTF_8); md5.update(bytes); return md5.digest(); } } }
替换程序入口中的负载均衡算法为一致性哈希负载均衡的实现类,观察输出
1 2 3 4 5 6 7 8 9 10 identityHashCode : 1018547642 param key : 119807787 S1 -> LoadBance identityHashCode : 1018547642 param key : 119807787 S1 -> LoadBance identityHashCode : 1018547642 param key : 119807787 S1 -> LoadBance ...
通过观察发现对于相同的参数,程序永远调用了 S1 结点。