motan源码分析五:cluster相关

article/2025/9/11 4:29:14

上一章我们分析了客户端调用服务端相关的源码,但是到了cluster里面的部分我们就没有分析了,本章将深入分析cluster和它的相关支持类。

1.clustersupport的创建过程,上一章的ReferConfig的initRef()方法中调用了相关的创建代码:

复制代码
        for(Iterator iterator = protocols.iterator(); iterator.hasNext();){ProtocolConfig protocol = (ProtocolConfig)iterator.next();LoggerUtil.info((new StringBuilder("ProtocolConfig's")).append(protocol.getName()).toString());Map params = new HashMap();params.put(URLParamType.nodeType.getName(), "referer");params.put(URLParamType.version.getName(), URLParamType.version.getValue());params.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));collectConfigParams(params, new AbstractConfig[] {protocol, basicReferer, extConfig, this});collectMethodConfigParams(params, getMethods());URL refUrl = new URL(protocol.getName(), localIp, 0, interfaceClass.getName(), params);ClusterSupport clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);//创建clustersupportclusterSupports.add(clusterSupport);clusters.add(clusterSupport.getCluster());//获取对应的clusterproxy = proxy != null ? proxy : refUrl.getParameter(URLParamType.proxy.getName(), URLParamType.proxy.getValue());}private ClusterSupport createClusterSupport(URL refUrl, ConfigHandler configHandler, List registryUrls){List regUrls = new ArrayList();if(StringUtils.isNotBlank(directUrl) || "injvm".equals(refUrl.getProtocol())){URL regUrl = new URL("local", "127.0.0.1", 0, com/weibo/api/motan/registry/RegistryService.getName());if(StringUtils.isNotBlank(directUrl)){StringBuilder duBuf = new StringBuilder(128);String dus[] = MotanConstants.COMMA_SPLIT_PATTERN.split(directUrl);String as[];int j = (as = dus).length;for(int i = 0; i < j; i++){String du = as[i];if(du.contains(":")){String hostPort[] = du.split(":");URL durl = refUrl.createCopy();durl.setHost(hostPort[0].trim());durl.setPort(Integer.parseInt(hostPort[1].trim()));durl.addParameter(URLParamType.nodeType.getName(), "service");duBuf.append(StringTools.urlEncode(durl.toFullStr())).append(",");}}if(duBuf.length() > 0){duBuf.deleteCharAt(duBuf.length() - 1);regUrl.addParameter(URLParamType.directUrl.getName(), duBuf.toString());}}regUrls.add(regUrl);} else//走注册中心的方式{if(registryUrls == null || registryUrls.isEmpty())throw new IllegalStateException(String.format("No registry to reference %s on the consumer %s , please config <motan:registry address=\"...\" /> in your spring config.", new Object[] {interfaceClass, "127.0.0.1"}));URL url;for(Iterator iterator = registryUrls.iterator(); iterator.hasNext(); regUrls.add(url.createCopy()))url = (URL)iterator.next();}URL url;for(Iterator iterator1 = regUrls.iterator(); iterator1.hasNext(); url.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(refUrl.toFullStr())))url = (URL)iterator1.next();return configHandler.buildClusterSupport(interfaceClass, regUrls);//调用simpleconfighandler的创建clustersupport方法}public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, registryUrls);//创建cluster支持类,将业务接口和注册中心信息传递进去clusterSupport.init();//初始化return clusterSupport;}
复制代码

2.clustersupport的init和prepare方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public  void  init() {
     prepareCluster();
     URL subUrl = toSubscribeUrl(url);
     for  (URL ru : registryUrls) { //循环注册中心的url
         String directUrlStr = ru.getParameter(URLParamType.directUrl.getName());
         // 如果有directUrl,直接使用这些directUrls进行初始化,不用到注册中心discover
         if  (StringUtils.isNotBlank(directUrlStr)) {
             List<URL> directUrls = parseDirectUrls(directUrlStr);
             if  (!directUrls.isEmpty()) {
                 notify(ru, directUrls);
                 LoggerUtil.info( "Use direct urls, refUrl={}, directUrls={}" , url, directUrls);
                 continue ;
             }
         }
         // client 注册自己,同时订阅service列表
         Registry registry = getRegistry(ru); //获取zookeeper的注册中心
         registry.subscribe(subUrl,  this ); //注册自己并订阅服务
     }
     boolean  check = Boolean.parseBoolean(url.getParameter(URLParamType.check.getName(), URLParamType.check.getValue()));
     if  (!CollectionUtil.isEmpty(cluster.getReferers()) || !check) {
         cluster.init(); //初始化集群
         if  (CollectionUtil.isEmpty(cluster.getReferers()) && !check) {
             LoggerUtil.warn(String.format( "refer:%s" this .url.getPath() +  "/"  this .url.getVersion()),  "No services" );
         }
         return ;
     }
     throw  new  MotanFrameworkException(String.format( "ClusterSupport No service urls for the refer:%s, registries:%s" ,
             this .url.getIdentity(), registryUrls), MotanErrorMsgConstant.SERVICE_UNFOUND);
}
private  void  prepareCluster() {
     String clusterName = url.getParameter(URLParamType.cluster.getName(), URLParamType.cluster.getValue()); //集群名称
     String loadbalanceName = url.getParameter(URLParamType.loadbalance.getName(), URLParamType.loadbalance.getValue()); //负载均衡名称
     String haStrategyName = url.getParameter(URLParamType.haStrategy.getName(), URLParamType.haStrategy.getValue()); //ha高可用名称
     cluster = ExtensionLoader.getExtensionLoader(Cluster. class ).getExtension(clusterName); //获取具体的集群对象
     LoadBalance<T> loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance. class ).getExtension(loadbalanceName); //获取具体的负载均衡方式,目前motan支持6种负载方式
     HaStrategy<T> ha = ExtensionLoader.getExtensionLoader(HaStrategy. class ).getExtension(haStrategyName); //获取高可用的方式,目前支持两种failfast和failover方式
     cluster.setLoadBalance(loadBalance);
     cluster.setHaStrategy(ha);
     cluster.setUrl(url);
}

3.负载均衡,motan支持6种方式,分别是:轮训、随机、hash、本地服务优先、权重可配置、低并发优先,具体代码可见com.weibo.api.motan.cluster.loadbalance目录,本文我们主要看一下轮训的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public  class  RoundRobinLoadBalance<T>  extends  AbstractLoadBalance<T> {
     private  AtomicInteger idx =  new  AtomicInteger( 0 );
     @Override
     protected  Referer<T> doSelect(Request request) {
         List<Referer<T>> referers = getReferers(); //获取所有服务器的引用
         int  index = idx.incrementAndGet(); //自增
         for  ( int  i =  0 ; i < referers.size(); i++) {
             Referer<T> ref = referers.get((i + index) % referers.size()); //利用自增数去模,达到轮训的目的
             if  (ref.isAvailable()) {
                 return  ref;
             }
         }
         return  null ;
     }
     @Override
     protected  void  doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
         List<Referer<T>> referers = getReferers();
         int  index = idx.incrementAndGet();
         for  ( int  i =  0 ; i < referers.size(); i++) {
             Referer<T> referer = referers.get((i + index) % referers.size());
             if  (referer.isAvailable()) {
                 refersHolder.add(referer);
             }
         }
     }
}

4.motan支持failfast和failover两种方式,failfast只调用一次,如果失败则直接返回失败,failover循环调用若干次,直到成功或循环结束后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public  Response call(Request request, LoadBalance<T> loadBalance) {
     List<Referer<T>> referers = selectReferers(request, loadBalance); //获取所有的引用
     if  (referers.isEmpty()) {
         throw  new  MotanServiceException(String.format( "FailoverHaStrategy No referers for request:%s, loadbalance:%s" , request,
                 loadBalance));
     }
     URL refUrl = referers.get( 0 ).getUrl();
     // 先使用method的配置
     int  tryCount =
             refUrl.getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.retries.getName(),
                     URLParamType.retries.getIntValue()); //获取重试次数
     // 如果有问题,则设置为不重试
     if  (tryCount <  0 ) {
         tryCount =  0 ;
     }
     for  ( int  i =  0 ; i <= tryCount; i++) {
         Referer<T> refer = referers.get(i % referers.size()); //循环调用
         try  {
             request.setRetries(i);
             return  refer.call(request);
         catch  (RuntimeException e) {
             // 对于业务异常,直接抛出
             if  (ExceptionUtil.isBizException(e)) {
                 throw  e; //业务异常退出调用
             else  if  (i >= tryCount) {
                 throw  e;
             }
             LoggerUtil.warn(String.format( "FailoverHaStrategy Call false for request:%s error=%s" , request, e.getMessage()));
         }
     }
     throw  new  MotanFrameworkException( "FailoverHaStrategy.call should not come here!" );
}

本章知识点总结:

1.一个cluster有一个cluster的支持类,有一个ha,有一个loadbalance;

2.motan支持6种负载均衡方式;

3.motan支持failover的ha方式;


http://chatgpt.dhexx.cn/article/BWi4hKip.shtml

相关文章

java 微博 开源_微博开源框架Motan初体验

前两天&#xff0c;我在开源中国的微信公众号看到新浪微博的轻量Rpc框架——Motan开源了。上网查了下&#xff0c;才得知这个Motan来头不小&#xff0c;支撑着新浪微博的千亿调用&#xff0c;曾经在2014年的春晚中有着千亿次的调用&#xff0c;对抗了春晚的最高峰值。 什么是Mo…

搭建新浪RPC框架motan Demo

motan是新浪微博开源的RPC框架&#xff0c;github官网是&#xff1a;https://github.com/weibocom/motan 今天就先搭建一个Hello world demo&#xff0c;本demo基于motan 0.2.1版本 首先先去github下载源代码&#xff08;motan-manager报错请忽略&#xff0c;eclipse的web Mod…

微博RPC框架Motan

原文来自&#xff1a;http://blog.csdn.net/autfish/article/details/51374798 从14年开始就陆续看到新浪微博RPC框架Motan的介绍&#xff0c;时隔两年后&#xff0c;微博团队终于宣布开源轻量级RPC框架Motan&#xff0c;项目地址&#xff1a; https://github.com/weibocom/mot…

motan rpc 接口统一异常处理

1.hello word 一个Motan扩展 大概需要下面的三点&#xff1a; 实现SPI扩展点接口 package com.weibo.api.motan.filter; Spi public interface Filter {Response filter(Caller<?> caller, Request request); }业务代码实现Filter public class PlsProviderExceptionF…

motan用户开发指南

目录 基本介绍 架构概述 模块概述 配置概述 使用Motan 工程依赖 处理调用异常 配置说明 协议与连接&#xff08;motan:protocol) 介绍 Motan协议 本地调用 注册中心与服务发现(motan:registry) 介绍 使用Consul作为注册中心 使用Zookeeper作为注册中心 不使用…

从motan看RPC框架设计

kris的文章开始 计算机科学领域的任何问题都可以通过增加一个间接的中间层来解决 从零开发一款RPC框架&#xff0c;说难也难说简单也简单。难的是你的设计将如何面对实际中的复杂应用场景&#xff1b;简单的是其思想可以仅仅浓缩成一行方法调用。motan是今年(2016年)新浪微博…

motan与zookeeper框架

新浪科技讯 2016年5月10日&#xff0c;微博方面宣布&#xff0c;支撑微博千亿调用的轻量级 RPC 框架 Motan 正式开源了。微博技术团队希望未来能有更多优秀的开源人入驻&#xff0c;并进一步完善优化。 搭建新浪RPC框架motan Demo&#xff1a;http://blog.csdn.net/linuu/arti…

java rpc motan_RPC框架motan使用

简介 motan是新浪微博开源的一套轻量级、方便使用的RPC框架 Hello World 使用的过程分为Server端和Client端&#xff0c;Server提供RCP的服务接口&#xff0c;Client端发起调用获取结果。 maven的pom文件配置 0.2.1 com.weibo motan-core ${motan.version} com.weibo motan-tra…

轻量级Rpc框架设计--motan源码解析一:框架介绍及框架使用初体验

一, 框架介绍 1.1 概况 motan是新浪微博开源出来的一套高性能、易于使用的分布式远程服务调用(RPC)框架。 1.2 功能 可以spring的配置方式与项目集成. 支持zookeeper服务发现组件, 实现集群环境下服务注册与发现. 保证高并发, 高负载场景下的稳定高性能, , 实现生产环境…

Motan原理、使用、JavaAPI简化、为什么使用Motan

前言&#xff0c;本文包括&#xff0c;rpc解释与为什么使用rpc、rpc性能对比、Motan依赖问题、Motan源码梳理、Motan功能、特点、使用。 主要中心&#xff1a;为什么使用Motan? 一、什么是RPC 官方解释&#xff1a;RPC&#xff08;Remote Procedure Call&#xff09;—远程…

jplayer自动播放

音乐网站的播放器一直都没有解决自动播放的问题&#xff0c;小哲说这样不行的&#xff0c;我也知道不可以这样&#xff0c;毕竟是自己提出要做的&#xff0c;所以要尽自己最大的能力去做好它&#xff01;本周末我一直都在围绕这个问题而研究。 我曾经想过在播放器初始化的时候…

JWPlayer

原文&#xff1a; http://www.cnblogs.com/yukui/archive/2009/03/12/1409469.html The JW MP3 Player (built with Adobes Flash) is the easiest way to add live music or podcasts to your website. It supports playback of a single MP3 file or an RSS, XSPF or ASX pla…

今天开始写些随笔,就从Jplayer开始吧

今天才开始用Jplayer&#xff0c;可能有点落伍了&#xff0c;但是看到网上千篇一律的使用说明&#xff0c;开始决定把自己的使用心得分享一下&#xff0c;废话不多说&#xff0c;开始吧。 Step1&#xff1a; 官网上有具体的搭建顺序&#xff0c;URL&#xff1a;http://www.jp…

关于播放器JPlayer的使用及遇到的问题

jPlayer是一个用于控制和播放mp3文件的jQuery插件。它在后台使用Flash来播放mp3文件&#xff0c;前台播放器外观完全可以使用XHML/CSS自定义。支持&#xff1a; 有一点比较好的是&#xff0c;在支持html5的浏览器上会使用html5的标签audio或者video&#xff0c;而不支持的浏览…

ijkplayer支持播放rtsp、jpeg、gif

ijkplayer版本&#xff1a;k.0.8.8 编译环境&#xff1a;Ubuntu 18.04.6 LTS 使用平台&#xff1a;android 支持rtsp播放 默认的ijkplayer并不支持rtsp流的播放&#xff0c;因为在编译ffmpeg的时候并没有开启rtsp的demuxer&#xff0c;所以在编译ffmpeg的时候需要开启rtsp的d…

【ijkplayer】介绍

【ijkplayer】介绍 0x1 系统架构 ijkplayer是由b站开源的播放器项目&#xff0c;底层基于ffmpeg, 支持Android和iOS。下面我们来简单介绍一下Android上的实现。 Android上的系统架构图如下。 下面分别对各个模块进行介绍&#xff1a; 0x11 ijkplayer-example app的实现&a…

一款简洁的 jplayer 音乐播放器完整版

一款简洁 jplayer 音乐播放器&#xff0c;做音乐站很漂亮&#xff0c;直接套用就好了。 效果图&#xff1a; 部分源代码&#xff1a; <div id"lei_jplayer"></div> <div id"jp_container_1"><div class"jp-controls">…

Ijkplayer Android介绍

Ijkplayer Android目录结构 Ijkplayer Android可以将该工程导入android studio进行编译&#xff0c;当它下载好需要的资源文件&#xff08;例如sdk build tool&#xff0c;gradle等&#xff09;就可以进行编译了&#xff0c;该工程的目录结构如下图所示: 链接库&#xff1a…

ijkplayer播放器

播放器系列 android播放器&#xff1a;MediaPlayer ExoPlayer ijkplayer_步基的博客-CSDN博客_mediacodec流程 一 概述 IJKPlayer是一款基于ffmpeg/ffplay的开源播放器&#xff0c;可支持rtmp/rtsp/hls等多种媒体协议&#xff0c;支持Android/IOS等移动平台。项目地址&#xff…

jplayer详解

下载官网&#xff1a;http://www.jplayer.org/ 当前版本&#xff1a;2.3.0 功能&#xff1a;视频播放&#xff08;可全屏&#xff09;、音乐播放 全部原教程&#xff0c;说明并不详细&#xff0c;要结合查看其网页源代码来学习&#xff1a;http://www.jplayer.org/latest/demos…