ZooKeeper客户端源码(一)——向服务端建立连接+会话建立+心跳保持长连接

article/2025/9/18 2:18:05

首发CSDN:徐同学呀,原创不易,转载请注明源链接。我是徐同学,用心输出高质量文章,希望对你有所帮助。

文章大纲

一、从ZooKeeper实例初始化开始

ZooKeeper 提供了原生的客户端库,虽然不好用,但是能够更好理解客户端与服务端建立连接和通信的过程。比较流行的Apache Curator也是对原生库的再封装。

向服务端建立连接,只需要实例化一个ZooKeeper对象,将服务器地址列表传进去即可。因为发起连接请求是一个异步的过程,所以实例化ZooKeeper时可以传一个Watcher,会话建立成功之后,客户端会生成一个 “已经建立连接(SyncConnected)” 的事件,进行回调通知。只有会话建立成功之后,才能与服务端进行通信。

String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
ZooKeeper zooKeeper = new ZooKeeper(connectString, 20000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {System.out.println("会话建立成功");}}
});

ZooKeeper实例初始化流程如下:

image-20220309175725090

1、创建HostProvider

HostProvider顾名思义就是“服务地址提供器”,默认实现类StaticHostProviderStaticHostProvider核心思想是,将服务地址列表打乱,然后构成一个虚拟环,轮询向外提供服务地址。

如果StaticHostProvider不满足需求,可以自定义实现HostProvider

(1)打乱服务地址列表

打乱地址就很简单了,用了Java官方提供的工具java.util.Collections#shuffle

// org.apache.zookeeper.client.StaticHostProvider#shuffle
private List<InetSocketAddress> shuffle(Collection<InetSocketAddress> serverAddresses) {List<InetSocketAddress> tmpList = new ArrayList<>(serverAddresses.size());tmpList.addAll(serverAddresses);Collections.shuffle(tmpList, sourceOfRandomness);return tmpList;
}

随机源生成:

Random sourceOfRandomness = new Random(System.currentTimeMillis() ^ this.hashCode())

随机源的种子是当前时间毫秒值掺杂(^)当前StaticHostProvider实例的hashCode,使得每次生成的随机源更公平。

(2)构建虚拟环轮询负载

如何将一个服务地址列表构成一个环轮询负载的呢?

有两个游标,currentIndexlastIndex是实现“虚拟环轮询负载”的关键:

  • currentIndex,指向当前选择的位置。每选择一次就加一,如果等于服务地址列表长度,就重置为0,这样就形成了一个环。
  • lastIndex,上次选择的位置。会话建立成功之后会将currentIndex赋值给lastIndex
public InetSocketAddress next(long spinDelay) {boolean needToSleep = false;InetSocketAddress addr;synchronized (this) {// 省略部分无关代码// currentIndex自增,如果等于服务地址列表长度,就重置为0++currentIndex;if (currentIndex == serverAddresses.size()) {currentIndex = 0;}addr = serverAddresses.get(currentIndex);// 两个游标currentIndex、lastIndex// currentIndex 当前选择的位置,lastIndex上次选择的位置// lastIndex 什么时候设置呢?会话建立成功之后调用 onConnected,将currentIndex赋值给lastIndexneedToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);if (lastIndex == -1) {lastIndex = 0;}}// 如果 currentIndex和lastIndex且spinDelay>0,就需要休眠spinDelay时间,if (needToSleep) {try {Thread.sleep(spinDelay);} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);}}// 解析InetSocketAddress,// 如果一个主机映射了多个ip地址(InetAddress)// 就打乱选择其中一个地址返回return resolve(addr);
}

currentIndexlastIndex相等时,且spinDelay>0,就会休眠spinDelay毫秒,然后再将选择的服务地址返回,为什么会有这个逻辑呢?

这是因为,如果轮询了一圈服务地址都没有成功建立连接,与其一味不停地重试,还不如休眠一段时间再试,可能成功的概率更高一些。

2、创建ConnectStringParser

ConnectStringParser 就是将 connectString 按一定格式解析成 InetSocketAddress 列表。connectString格式如下:

127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/test

如果在服务地址后面指定了路径,后续的操作都是在该路径下进行。

public ConnectStringParser(String connectString) {// parse out chroot, if any// 解析chroot// connectString 可以指定某个路径,// 如127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/testint off = connectString.indexOf('/');if (off >= 0) {String chrootPath = connectString.substring(off);// ignore "/" chroot spec, same as nullif (chrootPath.length() == 1) {this.chrootPath = null;} else {PathUtils.validatePath(chrootPath);this.chrootPath = chrootPath;}connectString = connectString.substring(0, off);} else {this.chrootPath = null;}// 按 , 分割List<String> hostsList = split(connectString, ",");for (String host : hostsList) {int port = DEFAULT_PORT;String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);if (hostAndPort.length != 0) {host = hostAndPort[0];if (hostAndPort.length == 2) {port = Integer.parseInt(hostAndPort[1]);}} else {int pidx = host.lastIndexOf(':');if (pidx >= 0) {// otherwise : is at the end of the string, ignoreif (pidx < host.length() - 1) {port = Integer.parseInt(host.substring(pidx + 1));}host = host.substring(0, pidx);}}// 封装未解析的InetSocketAddressserverAddresses.add(InetSocketAddress.createUnresolved(host, port));}
}

3、创建并启动ClientCnxn

ClientCnxn是对客户端连接的抽象和封装,负责网络连接管理和watcher管理。

还记得从ZooKeeper构造器传入的Watcher吗?它会作为默认Watcher传给ZKWatchManager,后续其他请求注册watcher时,可以不用再定义,直接使用默认的Watcher

初始化SendThread时会传入一个ClientCnxnSocketClientCnxnSocket是对网络底层的封装,默认实现类为ClientCnxnSocketNIO

image-20220309180338984

ClientCnxn创建好后会进行启动操作,就是启动SendThreadEventThread两个线程。后续的网络连接建立和通信都是由SendThread线程负责。

二、向服务端建立连接

SendThread启动后,会进入一个循环状态,首先判断是否已经建立连接,如果没有就通过hostProvider选择一个服务地址发起连接。

image-20220309212527148

网络底层处理类ClientCnxnSocket,以ClientCnxnSocketNIO实现为准,如下图是建立连接的过程:

建立连接过程.drawio

ClientCnxnSocketNIO底层是创建了非阻塞的SocketChannel,然后注册OP_CONNECT事件,并发起连接,如果此时能立刻连上,就继续进行会话建立流程。

image-20220309212424388

如下模拟服务器一直连不上,多次通过hostProvider轮询选择服务器重连的效果:

服务器地址选择了一圈以后,会休眠spinDelay毫秒,也符合源码逻辑。

image-20220309211717688

三、会话建立请求

连接建立之后,需要继续进行会话建立请求,因为每条连接都是有状态的,临时节点的归属就是以会话为依托,连接断开,会话失效,临时节点也就跟随着清除。

每条连接跟会话绑定,如果连接因为网络等问题断开,在会话有效期内重连上,就可以恢复之前的工作场景,而如果在会话失效之后重连上,就是非法连接,需要重新进行会话建立。

会话建立.drawio

1、发起会话建立请求

发起会话建立,首先构建一个ConnectRequest请求体:

ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);

ConnectRequest需要传入客户端最近一次的事务zxid,会话超时时间,会话id,会话密码。首次建立会话,sessionId为0,sessionPasswd为空。

其次,将ConnectRequest请求体包装进Packet对象,PacketZooKeeper通信的最小单元,所有请求体都会包装进一个Packet对象再序列化发送给服务端。

Packet(RequestHeader requestHeader,ReplyHeader replyHeader,Record request,Record response,WatchRegistration watchRegistration,boolean readOnly)

对于ConnectRequest包装的Packet,请求头RequestHeader为null,并且会被放在请求队列outgoingQueue的首位。

outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));

接着注册网络IO读写事件,后续请求包就会被发送给服务端了,发送完成后再将会话建立的PacketoutgoingQueue中移除。

image-20220309222231305

注:发送到服务端的会话创建流程,暂时不需要了解,后面讲解服务端源码时会详细讲述。

2、会话建立响应

会话建立响应,需要和普通请求响应分开处理,如果接收到响应信息,判断客户端还未初始化完成,就认为这个响应一定是会话建立响应。

image-20220309223709509

首先从底层网络缓冲区读取数据反序列化构建ConnectResponse响应体,解析出经过服务器协商好的sessionTimeout以及sessionIdsessionPasswd

image-20220309224130473

根据协商的sessionTimeout重新设置readTimeoutconnectTimeoutreadTimeoutnegotiatedSessionTimeout的2/3,connectTimeout是协商negotiatedSessionTimeout与服务地址列表个数平均。

image-20220309224446200

最后生成一个SyncConnectedwatcher事件,交由EventThread线程进行回调,这是唯一一个不需要向服务端注册的watcher事件,完全由客户端自己生成和触发。

四、心跳保持长连接

网络建立连接,会话建立都完成后,就可以与服务端通信了,为了保持长连接的会话一直有效,在没有向服务端发送请求的一段时间内会发送心跳请求。而一段时间是多久?如下是计算下一次发送心跳请求时间的算法:

// clientCnxnSocket.getIdleSend为距离上次发送的时长
// readTimeout = sessionTimeout * 2 / 3
int timeToNextPing = readTimeout / 2- clientCnxnSocket.getIdleSend()- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);

根据计算,每隔sessionTimeout/3如果没有发送任何请求,就发送一次心跳。多减1秒是为了防止因为竞态情况而丢失心跳请求。

发送的心跳请求数据很简单,没有请求体,只有请求头:

image-20220309233736903

如下是在源码中加了日志后的心跳过程:

协商sessionTimeout为9999,readTimeout计算得6666,timeToNextPing为3333,每次会多减1秒,大概每隔3秒发一次心跳。

image-20220309233219366

五、总结

  1. 客户端与服务端建立的是长连接,如果连接失败,服务地址列表会轮询重试,直到连接成功,官方提供了默认的服务地址负载算法 StaticHostProvider,也可以自己实现。

  2. 每条连接是有状态的,只有建立了会话,才能真正开始与服务端通信。会话建立成功之后,会生成一个SyncConnected事件进行回调通知。

  3. 会话是临时节点的基础,在会话有效期内断开重连,可以恢复上一次工作场景。

  4. 为了保持长连接的会话一直有效,在没有向服务端发送请求的一段时间内会发送心跳请求,心跳间隔时间为sessionTimeout/3

如若文章有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。


http://chatgpt.dhexx.cn/article/Ws1uhMom.shtml

相关文章

会话劫持安全攻击

什么是会话劫持&#xff1f; TCP 会话劫持是对受保护网络上的用户会话的安全攻击。会话劫持最常见的方法称为 IP 欺骗&#xff0c;攻击者使用源路由 IP 数据包将命令插入网络上两个节点之间的活动通信中&#xff0c;并将自己伪装成经过身份验证的用户之一。这种类型的攻击是可能…

HTML sessionStorage会话存储

sessionStorage会话存储 sessionStorage 是HTML5新增的一个会话存储对象&#xff0c;用于临时保存同一窗口(或标签页)的数据&#xff0c;在关闭窗口或标签页之后将会删除这些数据。本篇主要介绍 sessionStorage(会话存储)的使用方式。包括添加、修改、删除等操作。 目录 1. 介…

【windows】会话(Session)、窗口站(WindowsStation)、桌面、窗口

序 一个系统可以同时登录多个用户(包括远程用户登录)&#xff08;多用户系统&#xff09;。 一个用户拥有一个会话&#xff08;远程用户被称为远程会话&#xff09;。 一个会话拥有多个工作站和窗口。只能拥有一个交互式工作站(Winsta0)。&#xff08;Window Station 0…

WindTerm导出会话

WindTerm的session配置并无法被适用于其他软件中&#xff0c;所有的session保存在WindTerm安装路径下的profiles文件夹内&#xff0c;如果需要更新版本或回退&#xff0c;将所安装的版本覆盖即可&#xff0c;或是将profiles复制于新路径下。 图标闪烁设置、会话保存-导入导出 …

Spring Security中的会话【Session】管理与防御以及会话的并发控制

众所周知&#xff0c;HTTP本身是没有任何关于当前用户状态的内容&#xff0c;也就是两个HTTP请求之间是没有任何的关联可言&#xff0c;用户在和服务器交互的过程中&#xff0c;站点无法确定是哪个用户访问&#xff0c;也因此不能对其提供相应的个性化服务。Session的诞生就是为…

web基础:会话

一、会话概述 HttpSession对象可以保存跨同一个客户多个请求的会话状态。即与一个特定客户的整个会话期间&#xff0c;HttpSession会持久存储。对于会话期间客户做的所有请求&#xff0c;从中得到的所有信息&#xff0c;都可以使用HttpSession对象保存。 会话的工作方式&#…

会话固定攻击(session fixation attack)及解决办法

1 Cookie 的工作过程 Cookie的传递用到了两个字段: 请求头字段Cookie和响应头字段Set-Cookie。 当用户浏览器第一次访问服务器的时候&#xff0c;服务器肯定是不知道他的身份的。所以&#xff0c;就要创建一个独特的身份识别数据&#xff0c;格式是"keyvalue"&…

2022年会话推荐综述

title: 2022年会话推荐综述 最近对于会话推荐有了新的兴趣 文章题目&#xff1a; A Survey on Session-based Recommender Systems 0. 前言 提供了一个统一的框架来对SBRSs研究进行分类 SBRS的统一问题陈述&#xff0c;其中SBRS建立在正式概念之上&#xff1a;用户、项目、…

会话保持原理

1. 什么是会话保持&#xff1f; 会话保持是负载均衡最常见的问题之一&#xff0c;也是一个相对比较复杂的问题。会话保持有时候又叫做粘滞会话(Sticky Sessions)。会话保持是指在负载均衡器上的一种机制&#xff0c;可以识别客户端与服务器之间交互过程的关连性&#xff0c;在…

linux会话session

linux会话 什么是linux session 我们常见的 Linux session 一般是指 shell session。Shell session 是终端中当前的状态&#xff0c;在终端中只能有一个 session。 当我们打开一个新的终端时&#xff0c;总会创建一个新的 shell session。这表明会话是我们和shell交互的一个过…

会话令牌写入URL

目录 一. 漏洞描述 二. 漏洞修复 一. 漏洞描述 会话令牌即Token&#xff0c;关于Token&#xff0c;传送门-》如何理解Session、Cookie与Token 。 用户在进行get请求将用户的令牌写入url&#xff0c;导致中间人攻击获取令牌进行登陆。如下 二. 漏洞修复 禁止将令牌写入url

PDU会话流程

1.PDU会话的概念 1.1概述 5G系统的一个关键功能&#xff1a;为UE提供一个到达DN的连接会话管理功能&#xff08;SMF&#xff09;职责&#xff1a; 设置UE到DN的连接对该连接的用户面进行管理 5G系统的设计目标是支持大量的5G案例&#xff1a; 支持不同的PDU类型多个可选的…

会话固定漏洞

目录 漏洞原理 漏洞检测 漏洞利用 漏洞修复 漏洞原理 Session 是应用系统对浏览器客户端身份认证的属性标识&#xff0c;在用户退出应用系统时&#xff0c;应将客户端 Session 认证属性标识清空。如果未能清空客户端 Session 标识&#xff0c;在下次登录系统时&#xff0c…

《Oracle Java EE编程自学和面试指南》09-02:HttpSession接口

深入了解IT/互联网行业及岗位&#xff0c;请参阅通用IT/互联网岗位招聘计划&#xff08;最新全岗版&#xff09;。 深入了解职业晋升及学习路线&#xff0c;请参阅最优职业晋升路线和课程学习指南&#xff08;最新全栈版&#xff09;。 内容导航&#xff1a; 前言1、HttpSessi…

【LWIP的mDNS】

一&#xff0e;mdns 1.什么是mdns&#xff1f; mDNS协议适用于局域网内没有DNS服务器时的域名解析&#xff0c;设备通过组播的方式交互DNS记录来完成域名解析&#xff0c;约定的组播地址是&#xff1a;224.0.0.251&#xff0c;端口号是5353 主要用于在同一局域网内&#xff0c;…

DNS DDNS NBNS mDNS LLMNR LLDPDU SSDP协议

DNS DNS只是提供了域名和IP地址之间的静态对应关系&#xff0c;当IP地址发生变化时&#xff0c;DNS无法动态的更新域名和IP地址之间的对应关系&#xff0c;从而导致访问失败。但是DDNS系统是将用户的动态IP地址映射到一个固定的域名解析服务上 DDNS DDNS用来动态更新DNS服务…

mDNSResponder介绍与移植

mDNSResponder是苹果的Bonjour项目的一部分。 Bonjour是法语“你好”的意思。 Bonjour软件源自正IETF零配置网络工作。零配置工作有三个要求&#xff1a; 1.分配IP地址&#xff08;即使没有分配DHCP服务器的IP地址&#xff09; 2.提供名称到地址的转换&#xff08;即使没有DNS服…

启用Ubuntu 服务器上的 mDNS

陈拓 2022/03/18-2022/03/18 在动态分配IP地址的情况下&#xff0c;在局域网中使用mDNS访问Ubuntu服务器就不需要知道IP地址了。 我的Ubuntu系统版本 先用IP地址登录Ubuntu服务器。 具体操作见《Win10命令窗口的SSH和SFTP操作》 Win10命令窗口的SSH和SFTP操作_晨之清风的博…

Bonjour手把手搭建一:mDNS(apple multicastdns.org)

mDNS(Multicast DNS)——From Apple https://support.apple.com/kb/TA20999?localezh_CN&viewlocaleen_US Multicast DNS, one of the features incorporated in Bonjour, which was introduced in Mac OS X 10.2. Bonjour的一个新特性&#xff0c;在Mac OS X10.2后引入…

SpyCast:一款功能强大的跨平台mDNS枚举工具

关于SpyCast SpyCast是一款功能强大的跨平台mDNS枚举工具&#xff0c;该工具支持在主动模式下下递归查询服务&#xff0c;也可以在被动模式下仅侦听多播数据包。因此&#xff0c;广大研究人员可以使用该工具测试mDNS协议和本地网络的安全性。 mDNS介绍 mDNS&#xff0c;即多…