一文彻底弄懂ConcurrentHashMap

article/2025/8/22 7:22:16

一文彻底弄懂ConcurrentHashMap

  • 导读
  • 前言
    • synchronized
    • volatile(非锁)
    • 自旋锁
    • 分段锁
    • ReentrantLock
    • CAS
  • ConcurrentHashMap 实现原理
    • JDK1.7 中的 ConcurrentHashMap
      • Segment
      • ConcurrentHashMap 并发读写的几种情况
      • get 方法
      • put 方法
    • JDK1.8 中的 ConcurrentHashMap
      • put 方法
      • initTable 初始化数组
      • helpTransfer 协助扩容
      • transfer 扩容
      • addCount 扩容判断
      • get 方法
  • 总结

导读

在这里插入图片描述

前言

前面分析 HashMap 的文章,提到过 HashMap 是线程不安全的,其主要原因还是在链表扩容。

JDK1.7 的 HashMap 的扩容操作用到两个方法:resize()transfer(),主要是重新定位每个桶的下标,并采用头插法将元素迁移到新的数组中。假设有多个线程都对 HashMap 进行扩容,有可能扩容后的散列表中链表成环,如果这时候执行 get() 方法查询,就会导致死循环。并且 HashMap 在并发执行 put() 操作时扩容,可能会导致结点丢失,会导致数据不准的情况。

JDK1.8 中已经很好地解决了JDK1.7 中的问题了,如果看过 JDK1.8 的源码,你就会发现 JDK1.8 中摒弃了 transfer 函数的,是直接在 resize 函数中完成数据的迁移,并且在 JDK1.8 中插入元素时是使用的尾插法。但是,JDK1.8 会带来数据覆盖的线程不安全。

我们知道了 HashMap 是非线程安全的,如果想在多线程下安全的操作 map,有哪些解决方法呢?

  • Hashtable:它是线程安全的,它在所有涉及到多线程操作的都加上了 synchronized 关键字来锁住整个 table,这就意味着所有的线程都在竞争一把锁,在多线程的环境下,它是安全的,但是无疑是效率低下的。
  • Collections.synchronizedMap:里面使用对象锁来保证多线程场景下,操作安全,本质也是对 HashMap 进行全表锁!在竞争激烈的多线程环境下性能依然也非常差,所以也不推荐使用!

ConcurrentHashMap 是 HashMap 的线程安全版本,内部也是使用数组 + 链表 + 红黑树的结构来存储元素。相比于同样线程安全的 HashtableCollections.synchronizedMap 来说,效率等各方面都有极大地提高。本文我们一起学习一下 ConcurrentHashMap

先简单介绍一下各种锁,以便下文讲到相关概念时能有个印象。

synchronized

Java 中的关键字,内部实现为监视器锁,主要是通过对象监视器在对象头中的字段来表明的。

synchronized 从旧版本到现在已经做了很多优化了,在运行时会有三种存在方式:偏向锁,轻量级锁,重量级锁

  • 偏向锁:是指一段同步代码一直被一个线程访问,那么这个线程会自动获取锁,降低获取锁的代价。
  • 轻量级锁:是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。
  • 重量级锁:若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。重量级锁会使其他线程阻塞,性能降低。

volatile(非锁)

Java 中的关键字,当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。

volatilesynchronized 最主要的区别是 volatile 仅能实现变量的修改可见性,不能保证原子性;而 synchronized 则可以保证变量的修改可见性和原子性。

自旋锁

自旋锁,是指尝试获取锁的线程不会阻塞,而是循环的方式不断尝试,这样的好处是减少线程的上下文切换带来的开锁,提高性能,缺点是循环会消耗 CPU。

分段锁

分段锁,是一种锁的设计思路,它细化了锁的粒度,主要运用在 ConcurrentHashMap 中,实现高效的并发操作,当操作不需要更新整个数组时,就只锁数组中的一项就可以了。

ReentrantLock

可重入锁,是指一个线程获取锁之后再尝试获取锁时会自动获取锁,可重入锁的优点是避免死锁。其实,synchronized 也是可重入锁。

CAS

CAS 全称 Compare And Swap(比较与交换),是一种无锁算法。其作用是让 CPU 比较内存中某个值是否和预期的值相同,如果相同则将这个值更新为新值,不相同则不做更新。

CAS 算法涉及到三个操作数:

  • 需要读写的内存值 V
  • 进行比较的值 A
  • 要写入的新值 B

当且仅当 V 的值等于 A 时,CAS 通过原子方式用新值 B 来更新 V 的值(「比较+更新」整体是一个原子操作),否则不会执行任何操作。

java.util.concurrent 包中的原子类就是通过 CAS 来实现了乐观锁。

ConcurrentHashMap 实现原理

ConcurrentHashMap 在 JDK1.7 和 JDK1.8 的实现方式是不同的。

先来看下 JDK1.7。

JDK1.7 中的 ConcurrentHashMap

JDK1.7 中的 ConcurrentHashMap 是由 Segment 数组结构HashEntry 数组结构组成,即 ConcurrentHashMap 把哈希桶数组切分成小数组(Segment ),每个小数组有 n 个 HashEntry 组成。

如下图所示,首先将数据分为一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问,实现了真正的并发访问。

在这里插入图片描述

整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 代表”部分“或”一段“的意思,所以很多地方都会将其描述为分段锁

简单来说就是,ConcurrentHashMap 是一个 Segment数组Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 Segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。

Segment

Segment 本身就相当于一个 HashMap 对象。

HashMap 一样,Segment 包含一个 HashEntry 数组,数组中的每一个 HashEntry 既是一个键值对,也是一个链表的头节点。

单一的 Segment 结构如下:

在这里插入图片描述

像这样的 Segment 对象,在 ConcurrentHashMap 集合中有多少个呢?

concurrencyLevel:并行级别、并发数、Segment 数,怎么翻译不重要,理解它。默认是 16,也就是说默认情况下 ConcurrentHashMap 有 16 个 Segments,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。最大不超过 MAX_SEGMENTS 也就是2^16

默认是 16 个,共同保存在一个名为 segments 的数组当中。 因此整个ConcurrentHashMap 的结构如下:

在这里插入图片描述

可以说,ConcurrentHashMap 是一个二级哈希表。在一个总的哈希表下面,有若干个子哈希表。

这样的二级结构,和数据库的水平拆分有些相似。

ConcurrentHashMap 并发读写的几种情况

Case1:不同 Segment 的并发写入「可以并发执行」

在这里插入图片描述

Case2:同一 Segment 的一写一读「可以并发执行」

在这里插入图片描述

Case3:同一 Segment 的并发写入

在这里插入图片描述

Segment 的写入是需要上锁的,因此对同一 Segment 的并发写入会被阻塞。

由此可见,ConcurrentHashMap 中每个 Segment各自持有一把锁。在保证线程安全的同时降低了锁的粒度,让并发操作效率更高。

接下来,我们就来看下 ConcurrentHashMap 读写的过程。

get 方法

  1. 计算 key 的 hash 值
  2. 通过 hash 值,定位到对应的 Segment 对象
  3. 再次通过 hash 值,定位到 Segment 当中数组的具体位置。

put 方法

  1. 计算 key 的 hash 值
  2. 通过 hash 值,定位到对应的 Segment 对象 s
  3. 插入新值到槽 s 中
    • 获取可重入锁
    • 再次通过 hash 值,定位到 Segment 当中数组的具体位置
    • 插入或覆盖 HashEntry 对象
    • 释放锁

JDK1.8 中的 ConcurrentHashMap

虽然 JDK1.7 中的 ConcurrentHashMap 解决了 HashMap 并发的安全性,但是当冲突的链表过长时,在查询遍历的时候依然很慢!

在 JDK1.8 中,HashMap 引入了红黑二叉树设计,当冲突的链表长度大于 8时,会将链表转化成红黑二叉树结构,红黑二叉树又被称为平衡二叉树,在查询效率方面,又大大的提高了不少。

在数据结构上, JDK1.8 中的 ConcurrentHashMap 选择了与 HashMap 相同的数组+链表+红黑树结构;在锁的实现上,抛弃了原有的 Segment 分段锁,采用 CAS + synchronized 实现更加细粒度的锁。

将锁的级别控制在了更细粒度的哈希桶数组元素级别,也就是说只需要锁住这个链表头节点或者红黑树的根节点,就不会影响其他的哈希桶数组元素的读写,大大提高了并发度。

我们再来看看 JDK1.8 中 ConcurrentHashMap 的整体结构,内容如下:

在这里插入图片描述
下面来具体分析一下 JDK1.8 中的 ConcurrentHashMap 源码。

put 方法

public V put(K key, V value) {return putVal(key, value, false);
}final V putVal(K key, V value, boolean onlyIfAbsent) {// key和value都不能为nullif (key == null || value == null) throw new NullPointerException();// 计算hash值int hash = spread(key.hashCode());//用于记录相应链表的长度int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 如果tab未初始化或者个数为0,则初始化node数组if (tab == null || (n = tab.length) == 0)// 初始化数组,后面会详细介绍tab = initTable();// 找该 hash 值对应的数组下标,得到第一个节点 felse if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 如果数组该位置为空,// 如果使用CAS插入元素时,发现已经有元素了,则进入下一次循环,重新操作// 如果使用CAS插入元素成功,则break跳出循环,流程结束if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;                   // no lock when adding to empty bin}// 如果要插入的元素所在的tab的第一个元素的hash是MOVED,说明其他线程再扩容,则当前线程帮忙一起迁移元素else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {// 到这里就是说,f 是该位置的头结点,而且不为空,也不在迁移元素也就是存在hash冲突,锁住链表或者红黑树的头结点。V oldVal = null;// 获取数组该位置的头结点的监视器锁synchronized (f) {if (tabAt(tab, i) == f) {// 头结点的 hash 值大于 0,说明是链表if (fh >= 0) {// 用于累加,记录链表的长度binCount = 1;//遍历链表for (Node<K,V> e = f;; ++binCount) {K ek;// 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;//插入链表尾部if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}//红黑树else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;// 调用红黑树的插值方法插入新节点if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}// 如果binCount不为0,说明成功插入了元素或者寻找到了元素if (binCount != 0) {// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 成功插入元素,元素个数加1(是否要扩容在这个里面)addCount(1L, binCount);return null;
}

当进行 put 操作时,流程大概可以分如下几个步骤:

  • 首先会判断 key、value 是否为空,如果为空就抛异常;
  • 接着会判断容器数组是否为空,如果为空就调用 initTable() 初始化数组;
  • 进一步判断,如果没有 hash 冲突就直接 CAS 插入
  • 在接着判断 f.hash == -1 是否成立,如果成立,说明当前 f 是ForwardingNode 节点,表示有其它线程正在扩容,则一起进行扩容操作
  • 如果存在 hash 冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入
  • 节点插入完成之后,如果该链表的数量大于阈值 8,就要先转换成黑红树的结构,break 再一次进入循环
  • 最后,插入完成之后调用 addCount() 方法统计 size,并且检查是否需要扩容。

put 的主流程看完了,但是至少留下了几个问题:

  • 初始化
  • 扩容
  • 帮助数据迁移

这些我们都会在下面进行一一介绍。

initTable 初始化数组

private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {// 如果sizeCtl<0说明正在初始化或者扩容,让出CPUif ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spin// CAS 一下,将 sizeCtl 设置为 -1,代表抢到了锁,设置成功则当前线程进入初始化else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环// 如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU// 如果下一次循环更新完毕了,则table.length!=0,退出循环try {// 再次检查table是否为空,防止ABA问题if ((tab = table) == null || tab.length == 0) {// DEFAULT_CAPACITY 默认初始容量是 16int n = (sc > 0) ? sc : DEFAULT_CAPACITY;// 新建数组@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];// 把tab数组赋值给table table = tab = nt;// 设置sc为数组长度的0.75倍// n - (n >>> 2) = n - n/4 = 0.75n// 可见这里装载因子和扩容门槛都是写死了的// 这也正是没有threshold和loadFactor属性的原因sc = n - (n >>> 2);}} finally {// 把sc赋值给sizeCtl,这时存储的是扩容门槛sizeCtl = sc;}break;}}return tab;
}

初始化流程如下:

  • 使用 CAS 锁控制只有一个线程初始化 tab 数组;
  • sizeCtl 在初始化后存储的是扩容门槛;
  • 扩容门槛写死的是 tab 数组大小的 0.75 倍,tab 数组大小即 map 的容量,也就是最多存储多少个元素。

sizeCtl 这个属性使用的场景很多,不过只要跟着文章的思路来,就不会被它搞晕了。

helpTransfer 协助扩容

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {int rs = resizeStamp(tab.length);// sizeCtl<0,说明正在扩容while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;// 扩容线程数加1if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {// 当前线程帮忙迁移元素transfer(tab, nextTab);break;}}return nextTab;}return table;
}

其实 helpTransfer() 方法的目的就是调用多个工作线程一起帮助进行扩容,这样的效率就会更高,而不是只有检查到要扩容的那个线程进行扩容操作,其他线程就要等待扩容操作完成才能工作。既然这里涉及到扩容的操作,我们也一起来看看扩容方法 transfer()

transfer 扩容

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16// stride 可以理解为”步长“,有 n 个位置是需要进行迁移的,// 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) {            // initiatingtry {@SuppressWarnings("unchecked")// 容量翻倍Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;// ForwardingNode 翻译过来就是正在被迁移的 Node// 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED// 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,// 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了// 所以它其实相当于是一个标志。ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTab// i 是位置索引,bound 是边界,注意是从后往前for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 下面这个 while 真的是不好理解// advance 为 true 表示可以进行下一个位置的迁移了// 简单理解结局:i 指向了 transferIndex,bound 指向了 transferIndex-stridewhile (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;// 将 transferIndex 值赋给 nextIndex// 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {// 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前                   bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {// 所有的迁移操作已经完成nextTable = null;// 将新的 nextTab 赋值给 table 属性,完成迁移table = nextTab;// 重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍sizeCtl = (n << 1) - (n >>> 1);// 跳出死循环return;}// 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,// 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了finishing = advance = true;i = n; // recheck before commit}}// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);// 该位置处是一个 ForwardingNode,代表该位置已经迁移过了else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;// 头结点的 hash 大于 0,说明是链表的 Node 节点if (fh >= 0) {// 下面这一块和 Java7 中的 ConcurrentHashMap 迁移是差不多的,// 需要将链表一分为二,// 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的// lastRun 之前的节点需要进行克隆,然后分到两个链表中int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 其中的一个链表放在新数组的位置 isetTabAt(nextTab, i, ln);// 另一个链表放在新数组的位置 i+nsetTabAt(nextTab, i + n, hn);// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了setTabAt(tab, i, fwd);// advance 设置为 true,代表该位置已经迁移完毕advance = true;}// 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}// 如果一分为二后,节点数少于 6,那么将红黑树转换回链表ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;// 将 ln 放置在新数组的位置 isetTabAt(nextTab, i, ln);// 将 hn 放置在新数组的位置 i+nsetTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}
}

说到底,transfer 这个方法并没有实现所有的迁移任务,每次调用这个方法只实现了 transferIndex 往前 stride 个位置的迁移工作,其他的需要由外围来控制。

addCount 扩容判断

private final void addCount(long x, int check) {CounterCell[] as; long b, s;//更新baseCount,table的数量,counterCells表示元素个数的变化if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {//如果多个线程都在执行,则CAS失败,执行fullAddCount,全部加入countfullAddCount(x, uncontended);return;}if (check <= 1)return;// 计算元素个数s = sumCount();}// check >=0 表示需要进行扩容操作if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;// 如果元素个数达到了扩容门槛,则进行扩容// 注意,正常情况下sizeCtl存储的是扩容门槛,即容量的0.75倍while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {// rs是扩容时的一个标识int rs = resizeStamp(n);if (sc < 0) {// sc<0说明正在扩容中if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)// 扩容已经完成了,退出循环break;// 扩容未完成,则当前线程加入迁移元素中// 并把扩容线程数加1if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))// 进入迁移元素transfer(tab, null);// 重新计算元素个数s = sumCount();}}
}

大致流程如下:

  • 利用 CAS 方法更新 baseCount 的值
  • 检查是否需要扩容,check >= 0,需要检查;
  • 如果满足扩容条件,判断当前是否正在扩容,如果是正在扩容就一起扩容;
  • 如果不在扩容,将 sizeCtl 更新为负数,并进行扩容处理。

put 的流程现在已经分析完了,你可以从中发现,他在并发处理中使用的是乐观锁,当有冲突的时候才进行并发处理,而且流程步骤很清晰,但是细节设计的很复杂,毕竟多线程的场景也复杂。

get 方法

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// 计算hashint h = spread(key.hashCode());// 判断数组是否为空,通过key定位到数组下标是否为空;if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// 判断头结点是否就是我们需要的节点if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}// hash小于0,说明是树或者正在扩容else if (eh < 0)// 使用find寻找元素,find的寻找方式依据Node的不同子类有不同的实现方式return (p = e.find(h, key)) != null ? p.val : null;// 遍历整个链表寻找元素while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}

步骤如下:

  1. 计算 hash 值
  2. 根据 hash 值找到数组对应位置: (n - 1) & h
  3. 根据该位置处结点性质进行相应查找
    • 如果该位置为 null,那么直接返回 null 就可以了
    • 如果该位置处的节点刚好就是我们需要的,返回该节点的值即可
    • 如果该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树,后面我们再介绍 find 方法
    • 如果以上 3 条都不满足,那就是链表,进行遍历比对即可

总结

虽然 HashMap 在多线程环境下操作不安全,但是在 java.util.concurrent 包下,java 为我们提供了 ConcurrentHashMap 类,保证在多线程下 HashMap 操作安全!

在 JDK1.7 中,ConcurrentHashMap 采用了分段锁策略,将一个 HashMap 切割成 Segment 数组,其中 Segment 可以看成一个 HashMap, 不同点是 Segment 继承自 ReentrantLock,在操作的时候给 Segment 赋予了一个对象锁,从而保证多线程环境下并发操作安全。

但是 JDK1.7 中,HashMap 容易因为冲突链表过长,造成查询效率低,所以在 JDK1.8 中,HashMap 引入了红黑树特性,当冲突链表长度大于8时,会将链表转化成红黑二叉树结构。

在 JDK1.8 中,与此对应的 ConcurrentHashMap 也是采用了与 HashMap 类似的存储结构,但是 JDK1.8 中 ConcurrentHashMap 并没有采用分段锁的策略,而是在元素的节点上采用 CAS + synchronized 操作来保证并发的安全性,源码的实现比 JDK1.7 要复杂的多。


http://chatgpt.dhexx.cn/article/c4qYBHge.shtml

相关文章

ConcurrentHashMap杂谈

为什么使用ConcurrentHashMap 在并发编程中使用HashMap可能导致程序死循环&#xff0c;而使用线程安全的HashTable效率又非常低下 线程不安全的HashMap 在多线程环境下&#xff0c;使用HashMap进行put操作会引起死循环&#xff0c;导致CPU利用率接近100% 死循环案例&#xf…

图解ConcurrentHashMap

曾经研究过jkd1.5新特性&#xff0c;其中ConcurrentHashMap就是其中之一&#xff0c;其特点&#xff1a;效率比Hashtable高&#xff0c;并发性比hashmap好。结合了两者的特点。 集合是编程中最常用的数据结构。而谈到并发&#xff0c;几乎总是离不开集合这类高级数据结构的支…

Java集合:ConcurrentHashMap

本篇内容包括&#xff1a;ConcurrentHashMap 概述、ConcurrentHashMap 底层数据结构、ConcurrentHashMap 的使用以及相关知识点。 一、ConcurrentHashMap 概述 ConcurrentHashMap 是 HashMap 的线程安全版本&#xff0c;其内部和 HashMap 一样&#xff0c;也是采用了数组 链表…

Hashtable与ConcurrentHashMap区别

ConcurrentHashMap融合了hashtable和hashmap二者的优势。 hashtable是做了同步的&#xff0c;hashmap未考虑同步。所以hashmap在单线程情况下效率较高。hashtable在的多线程情况下&#xff0c;同步操作能保证程序执行的正确性。 但是hashtable每次同步执行的时候都要锁住整个结…

ConcurrentHashMap 面试题

作者&#xff1a;程序员库森 链接&#xff1a;https://www.nowcoder.com/discuss/591527?source_idprofile_create_nctrack&channel-1 来源&#xff1a;牛客网 本文汇总了常考的 ConcurrentHashMap 面试题&#xff0c;面试 ConcurrentHashMap&#xff0c;看这一篇就够了…

Hashmap和ConcurrentHashmap的区别

HashTable &#xff08;1&#xff09;底层数组链表实现&#xff0c;无论key还是value都不能为null&#xff0c;线程安全&#xff0c;实现线程安全的方式是在修改数据时锁住整个HashTable&#xff0c;效率低&#xff0c;ConcurrentHashMap做了相关优化 &#xff08;2&#xff0…

ConcurrentHashMap的作用与用法

ConcurrentHashMap的作用与用法 一.ConcurrentHashMap简介 ConcurrentHashMap是属于JUC工具包中的并发容器之一&#xff0c;在多线程开发中很经常会使用到这个类&#xff0c;它与HashMap的区别是HashMap是线程不安全的&#xff0c;在高并发的情况下&#xff0c;使用HashMap进行…

Java并发包concurrent——ConcurrentHashMap

目录 1. ConcurrentHashMap的实现——JDK7版本 1.1 分段锁机制 1.2 ConcurrentHashMap的数据结构 1.3 ConcurrentHashMap的初始化 1.3.1 初始化ConcurrentHashMap 1.3.2 初始化Segment分段 1.4 定位Segment 1.5 ConcurrentHashMap的操作 1.5.1 get 1.5.2 put 1.5.3 …

Java8 ConcurrentHashMap详解

点个赞&#xff0c;看一看&#xff0c;好习惯&#xff01;本文 GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录&#xff0c;这是我花了 3 个月总结的一线大厂 Java 面试总结&#xff0c;本人已拿大厂 offer。 另外&#xff0c;原创文章首发在我的个人博客&#x…

HashMap与ConcurrentHashMap的区别

从JDK1.2起&#xff0c;就有了HashMap&#xff0c;正如前一篇文章所说&#xff0c;HashMap不是线程安全的&#xff0c;因此多线程操作时需要格外小心。 在JDK1.5中&#xff0c;伟大的Doug Lea给我们带来了concurrent包&#xff0c;从此Map也有安全的了。 ConcurrentHashMap具体…

concurrenthashmap实现原理

1.JDK 1.7 ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成 Segment 继承自 ReentrantLock&#xff0c;是一种可重入锁&#xff1b;其中&#xff0c;HashEntry 是用于真正存储数据的地方 static final class Segment<K,V> extends ReentrantLock i…

HashMap和ConcurrentHashMap

前言 Map 这样的 Key Value 在软件开发中是非常经典的结构&#xff0c;常用于在内存中存放数据。 本篇主要想讨论 ConcurrentHashMap 这样一个并发容器&#xff0c;在正式开始之前我觉得有必要谈谈 HashMap&#xff0c;没有它就不会有后面的 ConcurrentHashMap。 Hash 表 在…

深入浅出ConcurrentHashMap详解

文章目录 1、前言2、什么是ConcurrentHashMap3、Put 操作4、Get 操作5、高并发线程安全6、JDK8 的改进6.1 结构改变6.2 HashEntry 改为 Node6.3 Put 操作的变化6.4 Get 操作的变化6.5 总结 1、前言 学习本章之前&#xff0c;先学习&#xff1a;深入浅出HashMap详解&#xff08;…

ConcurrentHashMap

ConcurrentHashMap 1.ConcurrentHashMap的出现 我们最常用的集合框架一定包括HashMap&#xff0c;但是都知道它不是线程安全的。在并发插入元素的时候&#xff0c;有可能出现带环链表&#xff0c;让下一次读操作出现死循环。 而想要次避免HashMap的线程安全问题有很多办法&am…

ConcurrentHashMap详解

文章目录 什么是ConcurrentHashMapConcurrentHashMap结构如何高效的执行并发操作如何进行锁的选择Node节点类型与作用扩容的方式 源码分析putVal()方法spread()方法&#xff0c;获取槽位。initTable()方法&#xff0c;初始化容器addCount() &#xff0c;计算成员数量transfer()…

Hudi(四)集成Flink(2)

6、读取方式 6.1、流读&#xff08;Streaming Query&#xff09; 当前表默认是快照读取&#xff0c;即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式&#xff0c;通过 read.start-commit 参数指定起始消费位置&#xff0c;支持指定 …

Spring Boot锦集(三):Spring Boot整合Kafka | Zookeeper/Kafka的安装和配置 | 总结的很详细

前言 在学习本章节前&#xff0c;务必做好以下准备工作&#xff1a; 1、安装并启动了Zookeeper[官网]&#xff0c;如需帮助&#xff0c;点击进入&#xff1b; 2、安装并启动了Kafka[官网]&#xff0c;如需帮助&#xff0c;点击进入。 注&#xff1a;zk和kafka的安装与介绍&…

Flink系列之:Flink CDC深入了解MySQL CDC连接器

Flink系列之&#xff1a;Flink CDC深入了解MySQL CDC连接器 一、增量快照特性1.增量快照读取2.并发读取3.全量阶段支持 checkpoint4.无锁算法5.MySQL高可用性支持 二、增量快照读取的工作原理三、全量阶段分片算法四、Chunk 读取算法五、Exactly-Once 处理六、MySQL心跳事件支持…

大数据面试重点之kafka(三)

Kafka如何保证全局有序&#xff1f; 可回答&#xff1a;1&#xff09;Kafka消费者怎么保证有序性&#xff1f;2&#xff09;Kafka生产者写入数据怎么保证有序&#xff1f;3&#xff09;Kafka可以保证 数据的局部有序&#xff0c;如何保证数据的全局有序&#xff1f;4&#xff0…

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

文章目录 官方说明参数解读CodePOM依赖配置文件生产者消费者单元测试测试earliestlatest(默认&#xff09;noneexception 源码地址 官方说明 https://kafka.apache.org/documentation/ 选择对应的版本&#xff0c;我这里选的是 2.4.X https://kafka.apache.org/24/documenta…