Overview
了解本部分请先参考: 6.28 并发控制 · GitBook
相关代码位于 dubbo-rpc-api
根据每个示例, 找到Dubbo对应实现代码的过程很简单. 由于Dubbo中几乎所有的配置都有一个Key, 所以直接在Constants
(dubbo-common)中找到对应的Key调用即可. 比如配置executes
对应的就是Constants.EXECUTES_KEY
. 查找调用关系可以直接定位到ExecuteLimitFilter
.
Filter加载过程
ExecuteLimitFilter
首先来看下Provider端ExecuteLimitFilter
的加载过程. 在dubbo-rpc-api中, src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter
有配置executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
.
Fillter的加载源于ProtocolFilterWrapper.buildInvokerChain
, 在这里断点可以看到调用是从ServiceBean
开始的:
1 | ServiceBean.export |
其中获取所有Filter的代码如下:
1 | ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); |
根据之前对于Dubbo SPI的了解, getActivateExtension
这一方法中, 扩展类需要满足ExtensionLoader.isMatchGroup, 以及isActive
时才会返回, 所以在Provider端中, 且配置了executes
属性时, 标记了@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
的ExecuteLimitFilter
会被放到整个Filter Chain中.
buildInvokerChain
中传入的invoker
是JDKProxyFactory$1 (AbstractProxyInvoker)
. 在此方法中, 返回了一个Invoker, 观察invoke(invocation)
方法的实现, 实际上是将invoker的invoke方法之外包裹了多层Filter, 而其中Filter中也会调用下一个Filter, 由此构成了一个责任链模式.
回想业务方法的栈1
2
3
4
5
6
7
8
9
10
11
12
13ChannelEventRunnable.run
-> DecodeHandler.received
-> HeaderExchangeHandler.recieved
-> HeaderExchangeHandler.handleRequest
-> DubboProtocol$1.reply // 匿名类ExchangeHandler
-> ProtocolFilterWrapper$1.invoke // 匿名类Invoker, Filter介入
-> EchoFilter.invoke
... 每个Filter和ProtocolFilterWrapper$1的反复
-> RegisterProtocol$InvokerDelegete.invoke
-> DelegateProviderMetaDataInvoker.invoke
-> JavassistProxyFactory$1.invoke
-> Wrapper1.invokeMethod
-> DemoService.sayHello
注: $1
表示代码中的匿名类.
ActiveLimitFilter
Consumer端的ActiveLimitFilter
加载过程类似
1 | ClassPathXmlApplicatonContext.getBean |
buildInvokerChain
的参数是一个DubboInvoker
.
涉及这一系列Filter的调用栈如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16$Proxy11.sayHello
-> InvokerInvocationHandler.invoke
-> MockClusterInvoker.invoke
-> FailoverClusterInvoker.invoke
-> RegisterDirectory$InvokerDelegate.invoke
-> ListenerInvokerWrapper.invoker
-> ProtocolFilterWrapper$1.invoke
-> ConsumerContextFilter.invoke
... ProtocolFilterWrapper$1.invoke与filter交替
-> DubboInvoker.invoke // DubboInvoker
另起一行, 开始数据交互
-> RefercnceCountExchangeClient.request
-> HeaderExchangeClient.request
-> HeaderExchangeChannel.request
-> NettyClient.send
并发控制实现
ExecuteLimitHandler
ExecuteLimitHandler
的实现非常简单. 注意, 并发控制是方法级别的: Dubbo为每个方法对应了一个RpcStatus
, 通过其中的Semaphore
对象来控制线程数量, 没有tryAcquire成功会抛出异常
:
1 | throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url |
值得注意的是, 每个方法会有一个对应的RPCStatus
. 所以executes
直接配置给某个类时, 指的是其中每个方法的并发限制, 而不是所有方法并发的总和.
这个基于Semaphore
的实现非常简单, 为了实现更彻底的控制和隔离, 有人接入了Hystrix, 也是通过Dubbo Filter进行扩展的. 请参考下方链接.
ActiveLimitFilter
ActiveLimitFilter
直接通过RPCStatus
的计数功能来进行并发控制, 如果达到指定active
数量, 会一直阻塞直到超时, 再抛出异常
1 | throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " |
这里的阻塞是直接通过RPCStatus
继承自Object
的wait
,notify
实现的.
另外值得注意的一点是虽然文档中表示:
限制 com.foo.BarService 的每个方法,每客户端并发执行(或占用连接的请求数)不能超过 10 个:
但实际上是, 客户端对每个Provider该方法的调用不能超过10个. 由于METHOD_STATISTICS
中的Key是URL
串, 比如 dubbo://10.1.1.1:20880/com.alibaba.dubbo.demo.DemoService
. 所以对于不同Provider的相同方法, 实际上获取到的是两个不同的RPCStatus
实例. 故可得到以上结论.