一、前言
在我们使用dubbo进行远程服务消费时,可以通过@Reference注解或dubbo:reference来配置引用的接口,最终会生成远程服务的代理类,转化成ReferenceBean,这样我们就可以像调用本地接口方法一样使用远程服务提供的功能;
二、实现服务调用的类;
ReferenceBean
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {@Overridepublic Object getObject() throws Exception {return get();}public synchronized T get() {if (destroyed) {throw new IllegalStateException("Already destroyed!");}if (ref == null) {//此方法是ReferenceConfig的init();init();}return ref;}
获取被Reference标识的类的代理对象,从getObject开始,之后进入init()进行初始化
PS:如果第一次debug时,发现ref不为空,进入不了init()方法,可以参考:ref==null
ReferenceConfig
private void init() {//判断是否已经初始化,因为是第一次执行,所以是falseif (initialized) {return;}initialized = true;if (interfaceName == null || interfaceName.length() == 0) {throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");}// get consumer's global configurationcheckDefault();appendProperties(this);if (getGeneric() == null && getConsumer() != null) {setGeneric(getConsumer().getGeneric());}if (ProtocolUtils.isGeneric(getGeneric())) {interfaceClass = GenericService.class;} else {try {//加载接口的class文件interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}checkInterfaceAndMethods(interfaceClass, methods);}//......省略了很多检测配置文件的方法(不是此方法的重点,所以不分析)Map<String, String> map = new HashMap<String, String>();//map存放配置属性,可见下图Map<Object, Object> attributes = new HashMap<Object, Object>();map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}if (!isGeneric()) {String revision = Version.getVersion(interfaceClass, version);if (revision != null && revision.length() > 0) {map.put("revision", revision);}//获取interfaceClass的方法名;String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();if (methods.length == 0) {logger.warn("NO method found in service interface " + interfaceClass.getName());map.put("methods", Constants.ANY_VALUE);} else {map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));}}map.put(Constants.INTERFACE_KEY, interfaceName);//把相关配置属性加入 AbstractConfig中;appendParameters(map, application);appendParameters(map, module);appendParameters(map, consumer, Constants.DEFAULT_KEY);appendParameters(map, this);String prefix = StringUtils.getServiceKey(map);if (methods != null && !methods.isEmpty()) {for (MethodConfig method : methods) {appendParameters(map, method, method.getName());String retryKey = method.getName() + ".retry";if (map.containsKey(retryKey)) {String retryValue = map.remove(retryKey);if ("false".equals(retryValue)) {map.put(method.getName() + ".retries", "0");}}appendAttributes(attributes, method, prefix + "." + method.getName());checkAndConvertImplicitConfig(method, map, attributes);}}String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);if (hostToRegistry == null || hostToRegistry.length() == 0) {//从本地加载registry的ip;hostToRegistry = NetUtils.getLocalHost();} else if (isInvalidLocalHost(hostToRegistry)) {throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);}map.put(Constants.REGISTER_IP_KEY, hostToRegistry);//attributes are stored by system context.StaticContext.getSystemContext().putAll(attributes);//创建代理对象ref = createProxy(map);//把代理对象,接口名和方法再进一步封装;ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());//交由应用程序管理;ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);}private T createProxy(Map<String, String> map) {URL tmpUrl = new URL("temp", "localhost", 0, map);final boolean isJvmRefer;if (isInjvm() == null) {if (url != null && url.length() > 0) { // if a url is specified, don't do local referenceisJvmRefer = false;} else if //查看是否存在本地服务;(InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {// by default, reference local service if there isisJvmRefer = true;} else {isJvmRefer = false;}} else {isJvmRefer = isInjvm().booleanValue();}//判断是否为Jvm本地引用;if (isJvmRefer) {URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);invoker = refprotocol.refer(interfaceClass, url);if (logger.isInfoEnabled()) {logger.info("Using injvm service " + interfaceClass.getName());}} else {if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);if (us != null && us.length > 0) {for (String u : us) {URL url = URL.valueOf(u);if (url.getPath() == null || url.getPath().length() == 0) {url = url.setPath(interfaceName);}if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));} else {urls.add(ClusterUtils.mergeUrl(url, map));}}}//由于我使用了注册中心调用远程服务,直接跳到这一步;// assemble URL from register center's configuration} else { // AbstractInterfaceConfig的loadRegistries(false);//registry的相关属性和使用的协议封装成URL;List<URL> us = loadRegistries(false);if (us != null && !us.isEmpty()) {for (URL u : us) {// AbstractInterfaceConfig的loadMonitor();URL monitorUrl = loadMonitor(u);if (monitorUrl != null) {map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));}urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));}}if (urls == null || urls.isEmpty()) {throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");}}//只有一条注册中心数据,即单注册中心; if (urls.size() == 1) {//此方法是RegistryProtocol的refer方法;//通过注册中心获取服务的代理对象;即将远程服务转为Invoker;invoker = refprotocol.refer(interfaceClass, urls.get(0));} else {//如果是多注册中心,就会存在多个Invoker,保存在List中;List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();URL registryURL = null;for (URL url : urls) {//遍历urls,构建invokerinvokers.add(refprotocol.refer(interfaceClass, url));if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {//会覆盖前遍历的注册中心,使用最后一条注册中心数据registryURL = url; // use last registry url}}if (registryURL != null) { // registry url is available// use AvailableCluster only when register's cluster is availableURL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);//StaticDirectory包装invoker,再通过cluster进行合并,并暴露其中一个invokerinvoker = cluster.join(new StaticDirectory(u, invokers));} else { // not a registry urlinvoker = cluster.join(new StaticDirectory(invokers));}}}Boolean c = check;if (c == null && consumer != null) {c = consumer.isCheck();}if (c == null) {c = true; // default true}if (c && !invoker.isAvailable()) {throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());}if (logger.isInfoEnabled()) {logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());}// create service proxy//根据SPI机制,实际调用的是JavassistProxyFactory.getProxy//利用动态代理,将Invoker转换成本地接口代理;return (T) proxyFactory.getProxy(invoker);}
map
RegistryProtocol
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);//连接到registry,获取registry实例;Registry registry = registryFactory.getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"//获取服务消费元数据Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); //从服务消费元数据获取分组信息String group = qs.get(Constants.GROUP_KEY);if (group != null && group.length() > 0) {if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1|| "*".equals(group)) {//执行Invoker转换工作return doRefer(getMergeableCluster(), registry, type, url);}}//执行Invoker转换工作return doRefer(cluster, registry, type, url);}private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {//RegistryDirectory服务发现,负责Invoker生成,配置信息变更监听等操作RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);//设置注册中心directory.setRegistry(registry);//设置协议directory.setProtocol(protocol);// all attributes of REFER_KEY//获取服务消费者的配置属性Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());//构建消费者的URLURL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);if (!Constants.ANY_VALUE.equals(url.getServiceInterface())&& url.getParameter(Constants.REGISTER_KEY, true)) {//向注册中心注册服务消费者,在consumers目录下创建新节点registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));}//再订阅注册中心的providers目录,之后触发DubboProtocol的refer方法//服务消费者订阅:服务提供端,动态配置,路由规则directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,Constants.PROVIDERS_CATEGORY+ "," + Constants.CONFIGURATORS_CATEGORY+ "," + Constants.ROUTERS_CATEGORY));// 对directory 进行包装,加上mock、failover功能//实际调用的是MockClusterWrapper的join; Invoker invoker = cluster.join(directory);//对invoker、directory信息简单聚合,存入本地mapProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);return invoker;}
RegistryDirectory
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {@Overridepublic synchronized void notify(List<URL> urls) {List<URL> invokerUrls = new ArrayList<URL>();List<URL> routerUrls = new ArrayList<URL>();List<URL> configuratorUrls = new ArrayList<URL>();for (URL url : urls) {String protocol = url.getProtocol();String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);if (Constants.ROUTERS_CATEGORY.equals(category)|| Constants.ROUTE_PROTOCOL.equals(protocol)) {routerUrls.add(url);} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {configuratorUrls.add(url);} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {invokerUrls.add(url);} else {logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());}}// configurators//更新服务端提供方配置if (configuratorUrls != null && !configuratorUrls.isEmpty()) {this.configurators = toConfigurators(configuratorUrls);}// routers//更新路由配置if (routerUrls != null && !routerUrls.isEmpty()) {List<Router> routers = toRouters(routerUrls);if (routers != null) { // null - do nothingsetRouters(routers);}}//加载服务提供方的服务信息List<Configurator> localConfigurators = this.configurators; // local reference// merge override parametersthis.overrideDirectoryUrl = directoryUrl;if (localConfigurators != null && !localConfigurators.isEmpty()) {for (Configurator configurator : localConfigurators) {this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);}}// providers//重新加载Invoker实例refreshInvoker(invokerUrls);}private void refreshInvoker(List<URL> invokerUrls) {if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {this.forbidden = true; // Forbid to accessthis.methodInvokerMap = null; // Set the method invoker map to nulldestroyAllInvokers(); // Close all invokers} else {this.forbidden = false; // Allow to accessMap<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local referenceif (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {invokerUrls.addAll(this.cachedInvokerUrls);} else {this.cachedInvokerUrls = new HashSet<URL>();this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison}if (invokerUrls.isEmpty()) {return;}//加载新的Invoker MapMap<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker mapMap<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map// state change// If the calculation is wrong, it is not processed.if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;}this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;this.urlInvokerMap = newUrlInvokerMap;try {destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn("destroyUnusedInvokers error. ", e);}private Map<String, Invoker<T>> toInvokers(List<URL> urls) {Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}Set<String> keys = new HashSet<String>();String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);for (URL providerUrl : urls) {// If protocol is configured at the reference side, only the matching protocol is selectedif (queryProtocols != null && queryProtocols.length() > 0) {boolean accept = false;String[] acceptProtocols = queryProtocols.split(",");for (String acceptProtocol : acceptProtocols) {if (providerUrl.getProtocol().equals(acceptProtocol)) {accept = true;break;}}if (!accept) {continue;}}if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {continue;}if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()+ ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));continue;}// 合并服务提供端配置数据URL url = mergeUrl(providerUrl);//过滤重复的服务提供端配置数据String key = url.toFullString(); // The parameter urls are sortedif (keys.contains(key)) { // Repeated urlcontinue;}keys.add(key);// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again// 缓存键是不与使用者端参数合并的url,无论使用者如何合并参数,如果服务器url更改,则再次引用Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local referenceInvoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);// 缓存无对应 invoker,再次调用 protocol#refer 是否有数据if (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(Constants.DISABLED_KEY)) {enabled = !url.getParameter(Constants.DISABLED_KEY, false);} else {enabled = url.getParameter(Constants.ENABLED_KEY, true);}if (enabled) {invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);}} catch (Throwable t) {logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}// 将新的 Invoker 缓存起来if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(key, invoker);}} else {// 缓存里有数据,则进行重新覆盖newUrlInvokerMap.put(key, invoker);}}keys.clear();return newUrlInvokerMap;
总结:RegistryDirectory 这个类实现了 NotifyListener 这个通知监听接口,当订阅的服务,配置或路由发生变化时,会接收到通知,进行相应改变:
DubboProtocol
```java@Overridepublic <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {optimizeSerialization(url);// create rpc invoker.//通过getClient获取客户端实例,实例是ExchangeClient,底层依赖Netty通信,连接默认为共享连接;DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);invokers.add(invoker);return invoker;}
JavassistProxyFactory
@Override@SuppressWarnings("unchecked")public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {//根据接口类生成代理对象,执行具体方法会通过InvokerInvocationHandler的invoke方法调用;return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));}
三、总结
1、Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用,而入口都是getObject方法;
2、ReferenceBean的getObject()调用ReferenceConfig的init方法,实现初始化 ,init方法会检查配置,并把相关属性的组装成map,调用createProxy(map);
3、ReferenceConfig的createProxy方法,会先判断服务引入的方式,如果是本地引入,则构建injvm协议的URL,如果是通过注册中心引入,则根据协议构建URL;确定服务引入的方式后,再判断URL的size,如果只有一个URL,则直接根据协议引入生成invoker,如果有多个则遍历生成invoker并构建StaticDirectory,再由cluster封装成invoker,再调用refer方法;
4、RegistryProtocol的refer方法,先连接到registry,再通过doRefer方法,构建RegistryDirectory,向注册中心注册自己;
5、执行 directory.subscribe方法,为消息消费者添加category=providers,configurators,routers属性后,然后向注册中心订阅该URL,关注该服务下的providers,configurators,routers发生变化时通知RegistryDirectory,以便及时发现服务提供者、配置、路由规则的变化,之后触发DubboProtocol的refer方法;
6、DubboProtocol的refer中,最主要的是getClient方法,获取客户端实例,跟远程服务进行网络调用,底层依赖Netty进行网络通信;
7、对directory 进行包装,加上mock、failover功能;
8、最后通过JavassistProxyFactory的getProxy,生成代理对象,执行具体方法通过InvokerInvocationHandler的invoke方法调用;
















