Java 并发容器详解

Java 并发容器详解

引言

并发容器是java.util.concurrent包的核心组件,专为多线程环境设计,替代了传统的Collections.synchronizedXxx()同步包装器。本文将深入讲解三类最常用的并发容器。


一、为什么需要并发容器

1.1 同步容器的局限

// 同步容器:所有方法加 synchronizedList<String>list=Collections.synchronizedList(newArrayList<>());// 问题 1:复合操作不原子// ❌ 先检查再操作,中间可能被其他线程修改if(!list.contains("key")){list.add("key");// 可能重复添加}// 问题 2:迭代时需手动加锁// ❌ ConcurrentModificationExceptionfor(Strings:list){// 其他线程修改 list → 异常}// ✅ 必须手动加锁synchronized(list){for(Strings:list){// 安全}}

1.2 并发容器的优势

优势说明
细粒度锁分段锁或 CAS,减少锁竞争
弱一致性迭代迭代时不抛 ConcurrentModificationException
原子复合操作putIfAbsent / computeIfAbsent 等
无锁读取ConcurrentHashMap 读操作不需要锁

二、ConcurrentHashMap

2.1 数据结构演进

Java 7:分段锁(Segment)

┌─────────────────────────────────────────────┐ │ Segment[0] │ Segment[1] │ ... │ Segment[15]│ │ ┌─────────┐ │ ┌─────────┐│ │ ┌─────────┐│ │ │ HashEntry│ │ │ HashEntry││ │ HashEntry││ │ │ 链表 │ │ │ 链表 ││ │ 链表 ││ │ └─────────┘ │ └─────────┘│ │ └─────────┘│ │ 锁1 │ 锁2 │ │ 锁16 │ └─────────────────────────────────────────────┘ 每个 Segment 一把锁,默认 16 个 Segment 并发度 = Segment 数量

Java 8+:CAS + synchronized(细粒度)

┌──────────────────────────────────────────────┐ │ Node[] table │ │ ┌────┬────┬────┬────┬────┬────┬────┬────┐ │ │ │ N │ N │ N │ N │ N │ N │ N │ N │ │ │ └────┴────┴─┬──┴────┴────┴────┴────┴────┘ │ │ │ │ │ ┌────┴────┐ │ │ │ 链表/红黑树│ │ │ └─────────┘ │ │ │ │ 锁粒度:每个桶(数组位置)一把 synchronized 锁 │ │ 读操作:无锁(volatile + CAS) │ │ 写操作:锁单个桶 │ └──────────────────────────────────────────────┘

2.2 核心操作

ConcurrentHashMap<String,Integer>map=newConcurrentHashMap<>();// 基本操作(与 HashMap 类似)map.put("a",1);map.get("a");// 1map.remove("a");map.size();map.containsKey("a");// 原子复合操作map.putIfAbsent("b",2);// 不存在才放入map.remove("b",2);// 键值都匹配才删除map.replace("b",2,3);// 旧值匹配才替换map.computeIfAbsent("c",k->3);// 不存在则计算并放入map.computeIfPresent("c",(k,v)->v+1);// 存在则计算替换map.compute("d",(k,v)->v==null?1:v+1);// 无论如何计算map.merge("e",1,Integer::sum);// 存在则合并,不存在则放入

2.3 computeIfAbsent 的妙用

// 场景:缓存计算结果ConcurrentHashMap<String,ExpensiveResult>cache=newConcurrentHashMap<>();// ❌ 非原子操作if(!cache.containsKey(key)){cache.put(key,expensiveCompute(key));// 可能重复计算}// ✅ 原子操作ExpensiveResultresult=cache.computeIfAbsent(key,k->expensiveCompute(k));// ✅ 映射到子 Map(多级缓存)ConcurrentHashMap<String,Map<String,Integer>>multiMap=newConcurrentHashMap<>();multiMap.computeIfAbsent("group1",k->newConcurrentHashMap<>()).put("item1",100);

2.4 批量操作(Java 8+)

// forEachmap.forEach(1,(k,v)->System.out.println(k+"="+v));// 第一个参数是并行度// search:找到第一个非 null 的结果Stringfound=map.search(1,(k,v)->v>10?k:null);// reduceIntegersum=map.reduceValues(1,Integer::sum);

2.5 弱一致性

// ConcurrentHashMap 的迭代器是弱一致性的:// - 不会抛 ConcurrentModificationException// - 可能反映也可能不反映迭代开始后的修改// - 适用于"最终一致"可接受的场景ConcurrentHashMap<String,Integer>map=newConcurrentHashMap<>();map.put("a",1);map.put("b",2);// 迭代过程中其他线程可以修改for(Map.Entry<String,Integer>entry:map.entrySet()){// 不会抛异常,但可能看不到最新的修改}

2.6 常见陷阱

① size() 和 isEmpty() 是近似值

// ConcurrentHashMap 的 size() 是各桶的近似求和// 在并发修改时可能不准确intsize=map.size();// 近似值,不保证精确

② key/value 不能为 null

map.put(null,1);// ❌ NullPointerExceptionmap.put("key",null);// ❌ NullPointerException

③ computeIfAbsent 中不要修改自身 Map

// ❌ 死锁 / 无限循环(Java 8 已部分修复,但仍不推荐)map.computeIfAbsent("a",k->{map.put("b",2);// 不要在计算函数中修改同一个 mapreturn1;});

三、CopyOnWriteArrayList

3.1 核心原理

写操作:复制整个数组 → 修改副本 → 替换引用 读操作:直接读取,无锁无等待 ┌──────────────────────────────────────────┐ │ 读线程1 → array ──→ [A, B, C, D, E] │ │ 读线程2 → array ──→ [A, B, C, D, E] │ │ │ │ 写线程:add("F") │ │ 1. 复制 → [A, B, C, D, E, F] │ │ 2. 替换引用 → array 指向新数组 │ │ 3. 旧数组由 GC 回收 │ └──────────────────────────────────────────┘

3.2 基本操作

CopyOnWriteArrayList<String>list=newCopyOnWriteArrayList<>();list.add("a");// 复制+添加list.add(0,"b");// 复制+插入list.set(0,"c");// 复制+替换list.remove("a");// 复制+删除list.get(0);// 无锁读取list.size();// 无锁读取list.contains("a");// 无锁遍历

3.3 适用场景

读远多于写(如 99:1): ├── 配置列表 ├── 监听器列表(Listener) ├── 白名单/黑名单 └── 数据字典 不适合的场景: ├── 写操作频繁 → 每次复制开销巨大 ├── 列表很大 → 复制耗时 + 内存压力 └── 需要强一致性 → 读写可能不一致

3.4 CopyOnWriteArraySet

// 基于 CopyOnWriteArrayList 实现的 Set// add 时遍历检查重复,性能更差CopyOnWriteArraySet<String>set=newCopyOnWriteArraySet<>();set.add("a");set.add("b");set.contains("a");// O(n) 遍历

3.5 注意事项

注意说明
写操作开销大每次写都复制整个数组,O(n)
内存占用写期间同时存在两个数组
最终一致性迭代器使用创建时的快照,不反映后续修改
不支持 listIterator迭代器不支持 add/set/remove

四、BlockingQueue

4.1 核心接口

publicinterfaceBlockingQueue<E>extendsQueue<E>{// 抛异常booleanadd(Ee);// 队列满抛 IllegalStateExceptionEremove();// 队列空抛 NoSuchElementException// 返回布尔booleanoffer(Ee);// 队列满返回 falseEpoll();// 队列空返回 null// 阻塞等待voidput(Ee)throwsInterruptedException;// 队列满则阻塞等待Etake()throwsInterruptedException;// 队列空则阻塞等待// 超时等待booleanoffer(Ee,longtimeout,TimeUnitunit);Epoll(longtimeout,TimeUnitunit);}

4.2 四种操作行为

操作抛异常返回值阻塞超时
插入add()offer()put()offer(timeout)
移除remove()poll()take()poll(timeout)
检查element()peek()--

4.3 七种 BlockingQueue 实现

实现数据结构有界特点
ArrayBlockingQueue数组✅ 有界公平/非公平可选
LinkedBlockingQueue链表可选(默认无界)吞吐量通常高于 Array
PriorityBlockingQueue无界按优先级出队
SynchronousQueue-不存储元素,直接交付
DelayQueue无界元素到期才能出队
LinkedTransferQueue链表无界transfer 直接交付给消费者
LinkedBlockingDeque双向链表可选双端队列

4.4 ArrayBlockingQueue

// 有界阻塞队列,创建时必须指定容量BlockingQueue<Task>queue=newArrayBlockingQueue<>(1000);// 公平模式:等待最久的消费者/生产者优先BlockingQueue<Task>fairQueue=newArrayBlockingQueue<>(1000,true);// 生产者queue.put(newTask());// 队列满时阻塞queue.offer(newTask(),5,SECONDS);// 超时返回 false// 消费者Tasktask=queue.take();// 队列空时阻塞Tasktask=queue.poll(5,SECONDS);// 超时返回 null

4.5 生产者-消费者模式

publicclassProducerConsumer{privatefinalBlockingQueue<String>queue=newArrayBlockingQueue<>(100);publicvoidstart(){newThread(this::producer).start();newThread(this::consumer).start();}voidproducer(){try{while(true){Stringdata=fetchData();queue.put(data);// 队列满时自动阻塞}}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}voidconsumer(){try{while(true){Stringdata=queue.take();// 队列空时自动阻塞processData(data);}}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}}

4.6 线程池 + BlockingQueue

// 自定义线程池使用有界队列ThreadPoolExecutorexecutor=newThreadPoolExecutor(10,20,60,SECONDS,newArrayBlockingQueue<>(500),// 有界队列newNamedThreadFactory("worker"),newCallerRunsPolicy()// 队列满时由调用者线程执行);

4.7 DelayQueue —— 延迟队列

// 元素必须实现 Delayed 接口publicclassDelayedTaskimplementsDelayed{privatefinalStringname;privatefinallongexecuteTime;publicDelayedTask(Stringname,longdelayMs){this.name=name;this.executeTime=System.currentTimeMillis()+delayMs;}@OverridepubliclonggetDelay(TimeUnitunit){returnunit.convert(executeTime-System.currentTimeMillis(),MILLISECONDS);}@OverridepublicintcompareTo(Delayedo){returnLong.compare(this.executeTime,((DelayedTask)o).executeTime);}}// 使用DelayQueue<DelayedTask>delayQueue=newDelayQueue<>();delayQueue.put(newDelayedTask("task1",5000));// 5秒后执行delayQueue.put(newDelayedTask("task2",3000));// 3秒后执行DelayedTasktask=delayQueue.take();// 阻塞直到最早的任务到期

4.8 SynchronousQueue —— 零容量队列

// 不存储元素,生产者必须等待消费者接收// 适用于直接交付场景(CachedThreadPool 使用)BlockingQueue<String>queue=newSynchronousQueue<>();// 线程1:生产者queue.put("data");// 阻塞,直到有消费者来取// 线程2:消费者Stringdata=queue.take();// 阻塞,直到有生产者放入

五、其他并发容器

5.1 ConcurrentSkipListMap / ConcurrentSkipListSet

// 基于跳表的有序并发 Map/Set// 插入/删除/查找 O(log n)ConcurrentSkipListMap<String,Integer>sortedMap=newConcurrentSkipListMap<>();sortedMap.put("c",3);sortedMap.put("a",1);sortedMap.put("b",2);// 自然排序:a=1, b=2, c=3ConcurrentSkipListSet<String>sortedSet=newConcurrentSkipListSet<>();

5.2 ConcurrentLinkedQueue / ConcurrentLinkedDeque

// 无界非阻塞队列,基于 CAS 实现// 适合高并发场景,但不支持阻塞操作ConcurrentLinkedQueue<String>queue=newConcurrentLinkedQueue<>();queue.offer("a");queue.offer("b");Stringhead=queue.poll();// "a",非阻塞

六、选型指南

场景推荐原因
高并发键值存储ConcurrentHashMap分段锁/CAS,读无锁
缓存计算结果ConcurrentHashMap + computeIfAbsent原子复合操作
读多写少列表CopyOnWriteArrayList写时复制,读无锁
监听器管理CopyOnWriteArrayList注册/注销少,遍历多
生产者-消费者ArrayBlockingQueue有界 + 阻塞
任务调度DelayQueue延迟出队
有序并发 MapConcurrentSkipListMap跳表 + CAS
高并发无阻塞队列ConcurrentLinkedQueueCAS,无锁

总结

容器核心机制读性能写性能一致性
ConcurrentHashMapCAS + synchronized弱一致
CopyOnWriteArrayList写时复制快照一致
ArrayBlockingQueueReentrantLock-强一致
ConcurrentSkipListMapCAS + 跳表弱一致