从源码全面解析 dubbo 服务端服务调用的来龙去脉

article/2025/11/7 19:24:52
  • 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主
  • 📕系列专栏:Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列、duubo源码系列
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人
  • 📝联系方式:hls1793929520,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

在这里插入图片描述

文章目录

    • 一、引言
    • 二、服务端调用流程
      • 1、启动封装
        • 1.1 过滤器的封装
        • 1.2 Hander的封装
      • 2、服务调用
        • 2.1 Handler调用
          • 2.1 MultiMessageHandler
          • 2.2 HeartbeatHandler
          • 2.3 AllChannelHandler
          • 2.4 DecodeHandler
          • 2.5 HeaderExchangeHandler
        • 2.2 过滤器调用
        • 2.3 方法调用
      • 三、流程图
    • 四、总结

一、引言

对于 Java 开发者而言,关于 dubbo ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。

但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。

本期 dubbo 源码解析系列文章,将带你领略 dubbo 源码的奥秘

本期源码文章吸收了之前 SpringKakfaJUC源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

废话不多说,发车!

二、服务端调用流程

对于整个服务端调用来说,主要分为两部分:

  • 服务端启动时:封装的 HandlerFilter
  • 服务端调用时:经过 Handler,然后经过 Filter,最后调用目标方法

1、启动封装

我们启动的时候,会经过 Exporter<?> exporter = protocolSPI.export(invoker) ,走 Dubbo SPI 的扩展机制

1.1 过滤器的封装

首先是我们的过滤器的封装:ProtocolFilterWrapper

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());// buildInvokerChain:return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {// 获取所有的过滤器filters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModels.get(0)).getActivateExtension(url, key, group);// 如果当前的过滤器不为空if (!CollectionUtils.isEmpty(filters)) {// 将所有的过滤器封装成链表(last)的形式for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);final Invoker<T> next = last;last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);}// 将过滤器链表封装成CallbackRegistrationInvoker类型return new CallbackRegistrationInvoker<>(last, filters);} 
}

到这里,我们将过滤器封装成一个链表并且将其封装成 CallbackRegistrationInvoker 形式

我们直接跳到 DubboProtocol.export 中:

public <T> Exporter<T> export(Invoker<T> invoker){// 得到当前注册Zookeeper的URLURL url = invoker.getUrl();// 根据URL得到唯一的key:cn/com.common.service.IUserService:1.0.0.test:20883// 分组(group) + 接口(intfenerce) + 版本(1.0.0.test) + 端口号(20883)String key = serviceKey(url);// DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
}public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {super(invoker);this.key = key;this.exporterMap = exporterMap;// 将当前的过滤器放至exporterMap中,方便我们后面的获取exporterMap.put(key, this);// 打开服务openServer(url);optimizeSerialization(url);return exporter;
}

到这里,我们的过滤器就封装到了 exporterMap 里面,后面会用到,我们后面再聊

1.2 Hander的封装

在我们上面封装完过滤器之后,我们会进行 openServer 打开服务这个操作,该操作会进行 Handler 的封装并启动我们的 Netty 服务

这里的Handler 一共封装成下面的流程:

NettyServerhandler -> NettyServer -> MultiMessageHandler--->HeartbeatHandler---->AllChannelHandler -> DecodeHandler  -> HeaderExchangeHandler -> ExchangeHandlerAdapter

最终会走到 NettyServerdoOpen 方法:这里对 Netty 不太清楚的,可以看博主的 Netty 源码文章:【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码

// 典型的Netty启动的流程
protected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();bossGroup = createBossGroup();workerGroup = createWorkerGroup();final NettyServerHandler nettyServerHandler = createNettyServerHandler();channels = nettyServerHandler.getChannels();// 初始化我们的服务端启动器initServerBootstrap(nettyServerHandler);ChannelFuture channelFuture = bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel = channelFuture.channel();
}protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);bootstrap.group(bossGroup, workerGroup).channel(NettyEventLoopFactory.serverSocketChannelClass()).option(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.SO_KEEPALIVE, keepalive).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {int idleTimeout = UrlUtils.getIdleTimeout(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));}// 这里添加Netty的Handler责任链ch.pipeline()// 解码器.addLast("decoder", adapter.getDecoder())// 编码器.addLast("encoder", adapter.getEncoder()).addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))// 把上面封装的Handler放到Netty中,便于我们的调用执行.addLast("handler", nettyServerHandler);}});
}

这里如果对 Netty 责任链不熟悉的可参考:【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?

2、服务调用

我们上篇文章剖析了 消费端 是如何进行的服务调用:从源码全面解析 dubbo 消费端服务调用的来龙去脉

这篇我们来看下服务端是如何进行服务调用的

2.1 Handler调用

我们直接跳到 NettyServerHandlerchannelRead 的方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);// 主要看这个Handler做的事情handler.received(channel, msg);ctx.fireChannelRead(msg);
}

image-20230627221354346

我们从上图看到,重点是这五个 Handler

  • MultiMessageHandler:处理多个数据包发送的消息,也称为“多包消息”
  • HeartbeatHandler:定期发送“心跳”消息,以确保连接仍然存在并响应正常
  • AllChannelHandler:管理所有通道的打开和关闭
  • DecodeHandler:将二进制数据解码为消息对象。
  • HeaderExchangeHandler:负责协议头部的交换和处理

我们挨个去看看他们实际的作用

2.1 MultiMessageHandler
  • 如果当前的请求是多个的话,需要进行切割成单个请求往下传递
  • 如果是单个请求的话,直接向下传递即可
public void received(Channel channel, Object message) throws RemotingException {if (message instanceof MultiMessage) {MultiMessage list = (MultiMessage) message;for (Object obj : list) {try {handler.received(channel, obj);}}} else {handler.received(channel, message);}
}
2.2 HeartbeatHandler
  • 服务端:判断当前的请求是不是心跳请求,如果是心跳请求的话,发送心跳请求
  • 消费端:判断当前的请求是不是心跳请求,处理心跳请求
public void received(Channel channel, Object message) throws RemotingException {// 记录最近读取的时间setReadTimestamp(channel);// 判断当前的请求是不是心跳请求if (isHeartbeatRequest(message)) {Request req = (Request) message;if (req.isTwoWay()) {// 如果当前是同一个心跳检测则返回同一个响应Response res = new Response(req.getId(), req.getVersion());res.setEvent(HEARTBEAT_EVENT);channel.send(res);if (logger.isDebugEnabled()) {int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);}}return;}// 消费端使用:处理心跳响应if (isHeartbeatResponse(message)) {return;}handler.received(channel, message);
}
2.3 AllChannelHandler
  • 为每一个服务端的请求从线程池中分配一个线程执行
public void received(Channel channel, Object message) throws RemotingException {// 获取线程ExecutorService executor = getPreferredExecutorService(message);try {// 将当前的Handler丢进线程池里面执行executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));}
}

image-20230627224241967

2.4 DecodeHandler
  • 根据当前的响应请求进行解码操作
public void received(Channel channel, Object message) throws RemotingException {if (message instanceof Decodeable) {decode(message);}if (message instanceof Request) {decode(((Request) message).getData());}if (message instanceof Response) {decode(((Response) message).getResult());}handler.received(channel, message);
}
2.5 HeaderExchangeHandler
  • 调用我们的过滤器并等待数据的返回
  • 将数据通过 Channel 返回至客户端
public void received(Channel channel, Object message) throws RemotingException {final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);// 服务端:当前的消息是请求if (message instanceof Request) {Request request = (Request) message;if (request.isEvent()) {handlerEvent(channel, request);} else {// 进行请求的解析if (request.isTwoWay()) {handleRequest(exchangeChannel, request);} else {handler.received(exchangeChannel, request.getData());}}} else if (message instanceof Response) {handleResponse(channel, (Response) message);} else {handler.received(exchangeChannel, message);}
}void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());// Request [id=17, version=2.0.2, twoWay=true, event=false, broken=false, data=RpcInvocation [methodName=getUserById, parameterTypes=[class java.lang.Long]]]Object msg = req.getData();// 执行过滤器的操作CompletionStage<Object> future = handler.reply(channel, msg);// 等待数据的返回future.whenComplete((appResult, t) -> {try {if (t == null) {res.setStatus(Response.OK);res.setResult(appResult);} else {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(t));}// 没有问题的话,将我们的数据返回channel.send(res);} });
}

服务端解码之后的消息:

image-20230627225422895

2.2 过滤器调用

我们直接跳到 DubboProtocolreply 方法

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message){Invocation inv = (Invocation) message;// 得到过滤器的invokerInvoker<?> invoker = getInvoker(channel, inv);// 执行过滤器Result result = invoker.invoke(inv);// 返回结果return result.thenApply(Function.identity());
}Invoker<?> getInvoker(Channel channel, Invocation inv){// 根据组+版本号+接口确定唯一的keyString serviceKey = serviceKey(port,path,(String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),(String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY));// 得到过滤器(我们在上面进行过对应的添加)DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);// 返回过滤器return exporter.getInvoker();
}

这里的过滤器总共十个,这里不带大家细看每一个的源码了,了解意思即可

  • ContextFilter:负责将请求上下文传递给请求处理流程中的其他组件
  • ProfilerServerFilter:负责性能分析和监视
  • EchoFilter:将请求消息作为响应消息返回
  • ClassLoaderFilter:负责加载和管理类加载器
  • GenericFilter:提供一种通用的请求处理机制
  • ExceptionFilter:负责处理异常并生成错误响应消息
  • MonitorFilter:负责监视请求处理过程中的状态和信息
  • TimeoutFilter:负责处理请求处理超时
  • TraceFilter:跟踪请求处理流程中的各个阶段和信息
  • ClassLoaderCallbackFilter:提供了回调函数的机制

2.3 方法调用

最终我们会到 AbstractProxyInvokerinvoke 方法

public Result invoke(Invocation invocation){// 执行我们动态代理的方法Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());// 等待返回的结果CompletableFuture<Object> future = wrapWithFuture(value, invocation);// 封装请求CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {AppResponse result = new AppResponse(invocation);if (t != null) {if (t instanceof CompletionException) {result.setException(t.getCause());} else {result.setException(t);}} else {result.setValue(obj);}// 返回数据// AppResponse [value=User(id=2, name=天涯, age=12), exception=null]return result;});return new AsyncRpcResult(appResponseFuture, invocation);
}

这里可以参考这篇文章:从源码全面解析 dubbo 服务暴露的来龙去脉

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {try {final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);return new AbstractProxyInvoker<T>(proxy, type, url) {@Override// proxy:实现类// methodName:getUserById// parameterTypes:class java.lang.Long// arguments:2// 执行相对应的方法即可protected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};}
}

三、流程图

高清图片可私信博主

在这里插入图片描述

四、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

如果你也对 后端架构和中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一起学习,一起成长

我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。

我们下期再见。

我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。

往期文章推荐:

  • 美团二面:聊聊ConcurrentHashMap的存储流程
  • 从源码全面解析Java 线程池的来龙去脉
  • 从源码全面解析LinkedBlockingQueue的来龙去脉
  • 从源码全面解析 ArrayBlockingQueue 的来龙去脉
  • 从源码全面解析ReentrantLock的来龙去脉
  • 阅读完synchronized和ReentrantLock的源码后,我竟发现其完全相似
  • 从源码全面解析 ThreadLocal 关键字的来龙去脉
  • 从源码全面解析 synchronized 关键字的来龙去脉
  • 阿里面试官让我讲讲volatile,我直接从HotSpot开始讲起,一套组合拳拿下面试

http://chatgpt.dhexx.cn/article/bEFp232w.shtml

相关文章

Dubbo原理和面试问题

今天来说一说dubbo的原理&#xff0c;首先我们要知道dubbo到底是什么&#xff0c;都能提供些什么服务&#xff1f; 一、dubbo是什么&#xff1f; dubbo是⼀个分布式服务框架&#xff0c;提供⾼性能和透明化的RPC远程服务调⽤⽅案&#xff0c;以及SOA服务治理方案。说白了其实…

springboot启动dubbo客户端连接服务端过程

前言 如果我问你,dubbo客户端启动的时候是如何连接服务器端的&#xff1f;这个过程比较复杂&#xff0c;今天我们一起学习起来~ 本文分以下几个部分 1、springboot启动dubbo需要配置 2、初始化Reference过程 3、小结 一、项目应用 1、引入jar包 <dependency><g…

dubbo实现原理机制

Dubbo的总体架构如图所示&#xff1a; 框架分层架构中&#xff0c;各个层次的设计要点&#xff1a; 服务接口层&#xff08;Service&#xff09;&#xff1a;该层是与实际业务逻辑相关的&#xff0c;根据服务提供方和服务消费方的业务设计对应的接口和实现。 配置层&#xff0…

Dubbo源码分析(三):Dubbo之服务端(Service)

如上图所示的Dubbo的暴露服务的过程&#xff0c;不难看出它也和消费者端很像&#xff0c;也需要一个像reference的对象来维护service关联的所有对象及其属性&#xff0c;这里的reference就是provider。由于ServiceBean实现了 &#xfeff;&#xfeff; InitializingBean接口&am…

Dubbo——初识RPC、Dubbo框架、使用直连方式实现Dubbo

文章目录&#xff1a; 1.RPC & 软件架构 1.1 单一应用架构 1.2 分布式微服务架构 1.3 RPC 2.Dubbo概述 2.1基本架构 2.2 dubbo支持的协议 3.直连方式实现dubbo 3.1 服务提供者的创建 3.2 服务消费者的创建 3.3 启动测试&#xff01;&#xff01;&#xff01; 1.…

分布式基本理解与Dubbo基本概念

学习dubbo之前&#xff0c;先要了解一下什么是分布式 分布式基础理论 什么是分布式系统 分布式系统是若干独立计算机的集合&#xff0c;这些计算机对于用户来说就像单个相关系统。 随着互联网的发展&#xff0c;网站应用的规模不断扩大&#xff0c;常规的垂直应用架构已无法…

Dubbo的原理和机制

Dubbo :是一个RPC框架&#xff0c;SOA框架&#xff1a; Dubbo缺省协议采用单一长连接和NIO异步通讯&#xff0c;适合于小数据量大并发的服务调用&#xff0c;以及服务消费者机器数远大于服务提供者机器数的情况。 作为RPC&#xff1a;支持各种传输协议&#xff0c;如dubbo,hes…

dubbo实现原理介绍

一、什么是dubbo Dubbo是Alibaba开源的分布式服务框架&#xff0c;它最大的特点是按照分层的方式来架构&#xff0c;使用这种方式可以使各个层之间解耦合&#xff08;或者最大限度地松耦合&#xff09;。从服务模型的角度来看&#xff0c; Dubbo采用的是一种非常简单的模型…

Dubbo-聊聊Dubbo协议

前言 Dubbo源码阅读分享系列文章&#xff0c;欢迎大家关注点赞 SPI实现部分 Dubbo-SPI机制 Dubbo-Adaptive实现原理 Dubbo-Activate实现原理 Dubbo SPI-Wrapper 注册中心 Dubbo-聊聊注册中心的设计 Dubbo-时间轮设计 通信 Dubbo-聊聊通信模块设计 什么是协议 在网…

dubbo客户端的实现

业界微服务大行其道。服务与服务之间的同学主要有有以下两大类。 阿里RPC框架&#xff1a;dubboRestFull风格的Http调用 我们知道Http接口我们找到PostMan这种Http客户端。 但是dubbo似乎并没有想关的客户端&#xff0c;我们调试的时常常需要同时打开两个以上的服务。 dubbo是…

dubbo组成原理-http服务消费端如何调用

dubbo协议已经用的很多了&#xff0c;这里来稍微介绍一下http协议&#xff0c;官方对http协议的说明简直少的让人发指。哈哈 百度大部分都只是讲了http服务端的配置 那就先从服务端的配置说起 dubbo需要的jar包这里就不说明了&#xff0c;网上找些maven的pom就可以 web.xml…

Dubbo基本原理机制

分布式服务框架&#xff1a; –高性能和透明化的RPC远程服务调用方案–SOA服务治理方案 -Apache MINA 框架基于Reactor模型通信框架&#xff0c;基于tcp长连接 Dubbo缺省协议采用单一长连接和NIO异步通讯&#xff0c;适合于小数据量大并发的服务调用&#xff0c;以及服务消费…

Dubbo基本原理与机制

1、什么是Dubbo Dubbo 是一款高性能、轻量级的开源 RPC 框架&#xff0c;提供服务自动注册、自动发现等高效服务治理方案&#xff0c; 可以和 Spring 框架无缝集成。 2、Dubbo依赖关系 1、服务消费者&#xff08;Consumer&#xff09;: 调用远程服务的服务消费方&#xff0c…

dubbo原理和机制

Dubbo 框架是用来处理分布式系统中&#xff0c;服务发现与注册以及调用问题的&#xff0c;并且管理调用过程。 一&#xff0c;工作流程&#xff1a; 服务提供者在启动的时候&#xff0c;会通过读取一些配置将服务实例化。Proxy 封装服务调用接口&#xff0c;方便调用者调用。…

Dubbo的原理与机制

​ Dubbo 前言 在介绍Dubbo之前先了解一下基本概念&#xff1a; Dubbo是一个RPC框架&#xff0c;RPC&#xff0c;即Remote Procedure Call&#xff08;远程过程调用&#xff09;&#xff0c;相对的就是本地过程调用&#xff0c;在分布式架构之前的单体应用架构和垂直应用架…

Dubbo基础及原理机制

1. 什么是Dubbo&#xff1f; Dubbo是 阿里巴巴公司开源的一个高性能RPC 分布式服务框架&#xff0c;使得应用可通过高性能的 RPC 实现服务的输出和输入功能&#xff0c;可以和 Spring框架无缝集成&#xff0c;现已成为 Apache 基金会孵化项目。 2. 为什么要用Dubbo&#xff1…

Dubbo原理和机制详解(非常全面)

Dubbo是一款Java RPC框架&#xff0c;致力于提供高性能的RPC远程服务调用方案。Dubbo 作为主流的微服务框架之一&#xff0c;为开发人员带来了非常多的便利。 本文我们重点详解 Dubbo 的原理机制 mikechen 目录 Dubbo核心功能Dubbo核心组件Dubbo的架构设计Dubbo调用流程 1️…

大地测量学基础(复习)第一部分

写在前面 这篇博文是用来复习课程“大地测量学基础”的&#xff0c;里面仅列出本人觉得这门课程比较重要的部分&#xff0c;希望能够帮助到有需要的朋友。 课本采用《大地测量学基础》孔祥元&#xff0c;武汉大学出版社。 前面已经有一篇“大地测量学基础&#xff08;复习&…

深入理解ArcGIS的地理坐标系、大地坐标系

1、引言 地理坐标&#xff1a;为球面坐标。 参考平面地是 椭球面,坐标单位:经纬度 大地坐标&#xff1a;为平面坐标。参考平面地是 水平面,坐标单位&#xff1a;米、千米等 地理坐标转换到大地坐标的过程可理解为投影。 &#xff08;投影&#xff1a;将不规则的地球曲面转…

GNSS定位中的不同高度概念及计算

文章目录 高度相关的几个基本概念RTKLIB中高度设置与计算参考文献 由于在GNSS定位中由多种高度表示&#xff0c;不同的高度概念很容易混淆&#xff0c;中英文对应有时候也容易搞混。因此整理了一下常用的两种高度——椭球高、正高的概念与计算&#xff0c;并且标注了对应的英文…