简述
我们在使用HashMap迭代的时候,如果在迭代中做了remove处理,那么迭代下一个元素的时候会抛出ConcurrentModificationException
异常,这是HashMap的fail-fast机制。其实就是维护了一个修改次数,当增删改操作的时候,修改次数会加一。所以迭代时候获取下一个元素的时候,如果发现此修改次数和原本不一致,则抛异常。但是ConcurrentHashMap是不会抛异常的,ConcurrentHashmap没有使用fail-fast,使用了弱一致性机制。
什么是弱一致性
弱一致性
意思是:不能保证任何一次读取都能读取到最近一次写入的数据,但是能保证最终可以读取到写入的数据。
再通俗地解释,就是我读到的数据可能是旧数据,不一定是新的数据。
ConcurrentHashMap源码
我们可以从源码中看到ConcurrentHashMap的弱一致性实现。
主要代码
我们从迭代key开始吧。
KeyIterator
源码中3406行。
static final class KeyIterator<K,V> extends BaseIterator<K,V>
implements Iterator<K>, Enumeration<K> {
KeyIterator(Node<K,V>[] tab, int index, int size, int limit,
ConcurrentHashMap<K,V> map) {
super(tab, index, size, limit, map);
}
public final K next() {
Node<K,V> p;
if ((p = next) == null)
throw new NoSuchElementException();
// 返回下一个结点的key
K k = p.key;
lastReturned = p;
// 主要看这里,获取下一个结点
advance();
return k;
}
public final K nextElement() { return next(); }
}
advance
代码3292行。查询并指向下一个结点。如果存在则返回下一个有效结点,否则返回null。
static class Traverser<K,V> {
// 当前数组
Node<K,V>[] tab; // current table; updated if resized
// 下一个结点
Node<K,V> next; // the next entry to use
// 用于保存或者恢复正在扩容等进行中的节点。spare表示空闲的TableStack链表,stack表示正在使用中的
TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
// 下一次遍历的索引
int index; // index of bin to use next
// 初始化数组的索引,之后会和index一起改变
int baseIndex; // current index of initial table
// 初始化的数组长度
int baseLimit; // index bound for initial table
// 初始化数组的size
final int baseSize; // initial table size
Traverser(Node<K,V>[] tab, int size, int index, int limit) {
this.tab = tab;
this.baseSize = size;
this.baseIndex = this.index = index;
this.baseLimit = limit;
this.next = null;
}
/**
* Advances if possible, returning next valid node, or null if none.
*/
final Node<K,V> advance() {
Node<K,V> e;
// 如果下一个结点不为null,则获取下一个结点
if ((e = next) != null)
e = e.next;
for (;;) {
Node<K,V>[] t; int i, n; // must use locals in checks
// 如果不为null则返回,这时候next = e
if (e != null)
return next = e;
// 下面操作是e的下一个结点不为null,开始遍历其他数组中中结点
// 如果数组都遍历完了,则返回null
if (baseIndex >= baseLimit || (t = tab) == null ||
(n = t.length) <= (i = index) || i < 0)
return next = null;
// i=index,n=t.length。如果数组不为null,并且hash值小于0(如果正在rehash,是红黑树和reserve状态则小于0)
if ((e = tabAt(t, i)) != null && e.hash < 0) {
// 如果正在扩容,则更新table为扩容后的数组
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
e = null;
pushState(t, i, n);
continue;
}
// 如果是红黑树,则使用first结点遍历
else if (e instanceof TreeBin)
e = ((TreeBin<K,V>)e).first;
else
e = null;
}
// 如果stack不为空,表明正在扩容
if (stack != null)
//此时n是扩容后的数组的长度,第一次调用时会将index加上原数组的长度,下一次获取遍历的数组元素时就会获取该索引处的数组元素
//如果该处的数组元素遍历完成则再将index加上原数组的长度,就会进入while循环中,将index和table恢复成原来的,继续遍历原数组中的下一个元素
//即遇到发生扩容的节点,就遍历扩容的后的新数组的index处,再遍历新数组的index+原数组长度处,然后接着遍历老数组中下一个元素
recoverState(n);
// 如果没有扩容,则index和baseIndex都加一,遍历下一个元素。因为baseSize是等于n的,所以这里是肯定成立。遍历下一个数组的元素。
else if ((index = i + baseSize) >= n)
index = ++baseIndex; // visit upper slots if present
}
}
/**
* Saves traversal state upon encountering a forwarding node.
* 如果spare始终为null,则创建新的结点插入到stack链表的前面;赴日原则取出spare链表的头结点初始化,插入到stack链表的前面
*/
private void pushState(Node<K,V>[] t, int i, int n) {
TableStack<K,V> s = spare; // reuse if possible
if (s != null)
spare = s.next;
else
s = new TableStack<K,V>();
s.tab = t;
s.length = n;
s.index = i;
s.next = stack;
stack = s;
}
/**
* Possibly pops traversal state.
*
* @param n length of current table
*/
private void recoverState(int n) {
TableStack<K,V> s; int len;
// index表示已遍历的元素个数,stack的length表示扩容前的数组长度,n表示扩容后的数组长度
// //第一次调用时index加原数组的长度,然后返回,第二次调用时还是加上原数组的长度就满足了while条件
while ((s = stack) != null && (index += (len = s.length)) >= n) {
n = len;
// 恢复index和tab
index = s.index;
tab = s.tab;
// 将s的tab置为null,将其插入到spare链表的前面
s.tab = null;
TableStack<K,V> next = s.next;
s.next = spare; // save for reuse
// 继续遍历stack链表的下一个节点,正常stack只有一个节点,因为出现ForwardingNode后就会遍历新数组了
stack = next;
spare = s;
}
// index和baseIndex都加一,遍历下一个数组元素
if (s == null && (index += baseSize) >= n)
index = ++baseIndex;
}
}
ConcurrentHashMap实现线程安全的要点
- 无论是插入一个新的节点还是移除旧的节点或者修改一个已有节点的val,都会对该节点所属的数组元素,即对应的链表头或者TreeBin节点加锁,直到修改结束才释放锁,一旦加锁其他想要修改属于同一个数组元素的节点的线程就必须等待前面的修改完成。如果修改的是红黑树,为了避免查找的结果不准确,还需要对红黑树加写锁,此时如果正在查找红黑树中的某个节点,会等待所有执行查找的线程释放红黑树的读锁,会通过park将当前线程休眠或者自旋等待。
- 查找某个key值时不需要对数组元素加锁,但是如果数组元素对应的是一颗红黑树,需要对红黑树加读锁,避免其他线程对红黑树的修改,这是因为修改后,红黑树为了保持平衡会移动节点的位置,有可能导致某个已经存在的key查找不出来,读锁可以被多个线程同时获取,必须等所有读锁都释放了才能加写锁,才能修改红黑树中的节点。
- 在Map扩容时,ConcurrentHashMap会将原来的数组按照长度切割成多个段,每个线程占用其中的一个段,负责将其中的数组元素包含的节点转移到扩容后的新数组中,当修改Map时如果发现对应的数组元素的hash值是MOVED,就认为当前正在扩容,会尝试去申请一个段,如果所有的段都分配了,则自旋等待扩容完成,数组元素的hash值变成非MOVED,最后一个完成转移任务的线程会负责将扩容后已经完成节点转移的临时数组设置成Map的有效数组。注意原数组中的数组元素及其节点关系都未发生变更,即此时其他线程可以继续在原数组中遍历,直到感知到了临时数组被设置成新数组了。如果没有线程访问原数组,原数组因为没有根节点引用,就会整体被垃圾回收掉。
- 为了避免并行修改Map的键值对个数时出现误差,同时保证性能,ConcurrentHashMap采用了类似于LongAdder的实现机制,将对键值对个数的修改压力分散到多个CounterCell中,并行修改时通常一个线程命中一个CounterCell,最后将所有CounterCell的值累加起来即是实际的键值对总数。
总结
ConcurrentHashMap中的迭代是弱一致性的,不会抛异常,但是有可能获取到旧值,也不能保证获取到最近写入的值。代码中是ConcurrentHashMap弱一致性的实现,如果发现正在扩容,则会指向新的数组,实现弱一致性。
评论区