侧边栏壁纸
博主头像
Terry

『LESSON 5』

  • 累计撰写 90 篇文章
  • 累计创建 21 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

ConcurrentHashMap源码分析(JDK1.8)

Terry
2020-10-17 / 0 评论 / 0 点赞 / 656 阅读 / 6,667 字 / 正在检测是否收录...

简述

ConcurrentHashMap是我们在环并发环境下经常使用的Map,以前自己只是会用而没有仔细看ConcurrentHashMap的实现,今天查看下源码,并记录下源码中的重点操作。

ConcurrentHashMap是什么

ConcurrentHashMap是用于并发环境的一种Map结构,继承了AbstractMap,跟HashMap很类似,但是HashMap是线程不安全的,ConcurrentHashMap是线程安全的。因为ConcurrentHashMap采用了CAS+synchronized保证了并发的安全性。
ConcurrentHashMap中很多成员变量都用了volatile修饰,包括了Node节点中的val和next。和HashMap一样,如果链表大于8的时候会转换成红黑树。

ConcurrentHashMap源码分析

下面开始ConcurrentHashMap源码分析

构造函数

看源码,我们首先看它的构造函数,构造函数一般都含有重要的参数

    public ConcurrentHashMap() {
    }

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

最后的构造函数的concurrencyLevel我这边说下。concurrencyLevel是JDK1.7中所使用的的,用于控制Segment的个数。这里提一下,因为本人还没研究JDK1.7。可以看出,构造函数和HashMap没差异。

查询

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // 计算key的hash
        int h = spread(key.hashCode());
        // 根据计算出来的hash值去查询桶的位置,如果桶有值则进行下面操作
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                // 如果和桶上,链表首节点相同,则返回
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // 如果hash小于0,则代表是红黑树,进行红黑树查询
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            // 否则链表查询
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        // 返回空
        return null;
    }

代码中的注释就是查询的时候逻辑,很简单,也没有多余锁之类操作,所以性能很高。当查询的时候,value刚好改变,get出来的会是之前的value。查询操作流程概述:

  1. 根据key计算出hash值。
  2. 通过hash值寻址,查到桶的位置。
  3. 如果桶上的key-value正好是想查询的数据,则返回。
  4. 判断如果是红黑树,则通过红黑树查询;如果是链表,则遍历链表查询。

插入

   public V put(K key, V value) {
       return putVal(key, value, false);
   }
 
   final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        // 根据key计算出hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 循环table
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 如果table为空,则初始化table
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 如果hash寻址到的节点为空,则使用CAS尝试写入,失败则自旋,保证成功
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 如果当前node的hash值为MOVED状态,则需要进行扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            // 否则使用synchronized锁插入数据
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        // 大于0代表使用链表,小于0代表使用红黑树
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 如果key-value本身存在
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    // 如果onlyIfAbsent值为false,则代表需要替换旧值,否则跳出循环
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 找到next为null的节点,后插链表
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 如果数据结构为红黑树,则进行红黑树插入处理
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            // 这里是防止下面的转红黑树操作
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    // 如果链表长度大于等于8,则链表转成红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //添加计数,里面有扩容
        addCount(1L, binCount);
        // 返回null
        return null;
    }

代码中的注释就是ConcurrentHashMap插入逻辑解释。插入的操作比查询复杂得多,里面包含了大量的CAS操作。总的来说,大致可以分为以下几步:

  1. 根据key计算出hash值。
  2. 判断table,如果table为空则初使用CAS初始化table。
  3. 如果hash寻址到的节点为空,则使用CAS写入。
  4. 如果当前node的hash值为MOVED状态,则进行扩容。
  5. 否则,使用synchronized锁插入数据
  6. 插入的时候,如果是红黑树则使用红黑树插入/更新,如果是链表则遍历链表插入/更新
  7. 和HashMap一样,最后几步会判断下链表长度是否大于等于8,大于的话会把链表转成红黑树
  8. 添加技术,如果需要扩容则会进行扩容操作

总结

以上是我看了源码,对ConcurrentHashMap的理解。可以看出ConcurrentHashMap的put方法使用了大量的CAS操作来保证线程安全。ConcurrentHashMap中不同数组桶之间是线程安全的,所以只对了同一个数组桶采用了CAS+synchronized保证了并发的安全性,synchronized。锁的是链表的head节点。当然,扩容等之类的方法,也是需要保证安全性的,这次我就不多讲了,下一篇文章再详细介绍。

0

评论区