java
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelStreamDemo {
public static void main(String[] args) {
// 1. 准备数据:1~10 的整数列表
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 2. 使用 parallelStream 并行处理
List<Integer> squaredNumbers = numbers.parallelStream()
.map(number -> {
// 模拟耗时操作(如:复杂计算或远程调用)
try {
Thread.sleep(100); // 增加延迟观察并行效果
} catch (InterruptedException e) {
e.printStackTrace();
}
return number * number;
})
.collect(Collectors.toList()); // 3. 收集结果
// 4. 输出结果
System.out.println("原始数据: " + numbers);
System.out.println("计算结果: " + squaredNumbers);
}
}
Collectors.toList()
collect(Collectors.toList())
将并行处理后的结果汇总到新的 List
中。Collectors.toList()
内部使用线程安全的容器,无需额外同步。collect(Collectors.toCollection(ConcurrentLinkedQueue::new))
等线程安全容器。collect()
方法会阻塞当前线程,直到所有并行任务完成并返回最终结果集合。collect()
后直接输出结果(如示例代码),可观察到所有计算结果均已完整收集。Collectors.toList()
会按照原始数据顺序存储结果(1→1
, 2→4
, 3→9
,依此类推)。复制
原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
计算结果: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
避免副作用
不要在并行流中修改共享变量(如全局计数器),应使用纯函数式操作(如 map
、filter
)。
任务粒度控制
并行流适合处理耗时较长的任务,若单个任务极快(如简单算术),可能因线程调度开销反而降低性能。
自定义线程池
默认使用公共的 ForkJoinPool
,若需隔离资源,可通过自定义线程池执行:
java
复制
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> {
list.parallelStream().map(...).collect(...);
}).get();
通过上述案例,可以安全地利用 parallelStream
实现并行处理并正确获取结果。