可昕之家

可昕之家

张先生

平淡如水,爱护家人,好好工作

57 文章数
0 评论数

Java线程与线程池实战指南:从基础到高并发优化

张清磊
2025-04-07 / 0 评论 / 18 阅读 / 0 点赞

1. 线程基础:快速入门

1.1 线程创建方式

// 方式1:继承Thread类class MyThread extends Thread {    @Override    public void run() {        System.out.println("线程运行: " + Thread.currentThread().getName());    }}// 方式2:实现Runnable接口class MyRunnable implements Runnable {    @Override    public void run() {        System.out.println("Runnable运行: " + Thread.currentThread().getName());    }}public class Main {    public static void main(String[] args) {        // 启动线程        new MyThread().start();  // 输出:线程运行: Thread-0        new Thread(new MyRunnable()).start(); // 输出:Runnable运行: Thread-1                // Java 8 Lambda简化        new Thread(() -> System.out.println("Lambda线程: " + Thread.currentThread().getName()))           .start();    }}

1.2 实际案例:多线程下载器

import java.io.*;import java.net.URL;public class ParallelDownloader {    public static void main(String[] args) {        String[] urls = {            "https://example.com/file1.zip",            "https://example.com/file2.jpg",            "https://example.com/file3.pdf"        };        for (String url : urls) {            new Thread(() -> downloadFile(url)).start();        }    }    private static void downloadFile(String url) {        try (InputStream in = new URL(url).openStream();             FileOutputStream out = new FileOutputStream(                 new File(url.substring(url.lastIndexOf('/') + 1))) {            byte[] buffer = new byte[1024];            int bytesRead;            while ((bytesRead = in.read(buffer)) != -1) {                out.write(buffer, 0, bytesRead);            }            System.out.println("下载完成: " + url);        } catch (IOException e) {            System.err.println("下载失败: " + url + " - " + e.getMessage());        }    }}

2. 线程池核心应用

2.1 Executor框架体系

import java.util.concurrent.*;public class ThreadPoolDemo {    public static void main(String[] args) {        // 创建固定大小线程池        ExecutorService fixedPool = Executors.newFixedThreadPool(3);                // 提交10个任务        for (int i = 1; i <= 10; i++) {            int taskId = i;            fixedPool.execute(() -> {                System.out.println("执行任务" + taskId + " - " + Thread.currentThread().getName());                try {                    Thread.sleep(1000); // 模拟任务耗时                } catch (InterruptedException e) {                    e.printStackTrace();                }            });        }                fixedPool.shutdown(); // 优雅关闭    }}

2.2 实际案例:电商秒杀系统

import java.util.concurrent.*;import java.util.concurrent.atomic.*;public class SeckillSystem {    // 商品库存(原子操作保证线程安全)    private static final AtomicInteger stock = new AtomicInteger(100);        public static void main(String[] args) throws InterruptedException {        // 创建线程池(建议根据压测结果调整参数)        ThreadPoolExecutor executor = new ThreadPoolExecutor(            20, 50, 60, TimeUnit.SECONDS,            new ArrayBlockingQueue<>(1000),            new ThreadPoolExecutor.CallerRunsPolicy()        );        // 模拟5000个并发请求        CountDownLatch latch = new CountDownLatch(5000);        for (int i = 0; i < 5000; i++) {            executor.execute(() -> {                try {                    if (stock.decrementAndGet() >= 0) {                        System.out.println(Thread.currentThread().getName() + " 抢购成功");                    } else {                        System.out.println("库存不足");                    }                } finally {                    latch.countDown();                }            });        }                latch.await();        executor.shutdown();        System.out.println("最终库存: " + stock.get());    }}

3. 线程池高级配置

3.1 自定义线程工厂

import java.util.concurrent.*;class CustomThreadFactory implements ThreadFactory {    private final String namePrefix;    private final AtomicInteger counter = new AtomicInteger(1);    CustomThreadFactory(String namePrefix) {        this.namePrefix = namePrefix;    }    @Override    public Thread newThread(Runnable r) {        Thread t = new Thread(r, namePrefix + "-" + counter.getAndIncrement());        t.setDaemon(false);        t.setPriority(Thread.NORM_PRIORITY);        return t;    }}// 使用示例ExecutorService service = new ThreadPoolExecutor(    5, 10, 60, TimeUnit.SECONDS,    new LinkedBlockingQueue<>(100),    new CustomThreadFactory("Order-Processor"));

3.2 监控线程池状态

public class ThreadPoolMonitor implements Runnable {    private final ThreadPoolExecutor executor;        public ThreadPoolMonitor(ThreadPoolExecutor executor) {        this.executor = executor;    }    @Override    public void run() {        while (true) {            System.out.printf(                "监控: 活跃线程=%d, 队列大小=%d, 完成任务=%d%n",                executor.getActiveCount(),                executor.getQueue().size(),                executor.getCompletedTaskCount()            );            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                break;            }        }    }}// 添加到线程池ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();new Thread(new ThreadPoolMonitor(executor)).start();

4. 常见问题解决方案

4.1 线程池参数如何设置?

  • CPU密集型:核心线程数 = CPU核心数 + 1
  • I/O密集型:核心线程数 = CPU核心数 × (1 + 平均等待时间/平均计算时间)
  • 混合型:通过压测动态调整

4.2 避免OOM的拒绝策略

// 推荐策略(根据业务选择):ThreadPoolExecutor executor = new ThreadPoolExecutor(    coreSize, maxSize, 60, TimeUnit.SECONDS,    new LinkedBlockingQueue<>(1000),    // 策略1:由提交线程直接执行(避免任务丢失)    new ThreadPoolExecutor.CallerRunsPolicy()    // 策略2:记录日志后丢弃(适合非关键任务)    // new ThreadPoolExecutor.DiscardPolicy());

4.3 线程池死锁检测

// 使用VisualVM或Arthas工具检测// 或添加超时控制Future<?> future = executor.submit(() -> {    // 可能死锁的操作});try {    future.get(5, TimeUnit.SECONDS); // 设置超时} catch (TimeoutException e) {    future.cancel(true); // 中断任务}

5. 现代Java并发实践

5.1 虚拟线程(Java 19+)

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {    for (int i = 0; i < 10_000; i++) {        executor.submit(() -> {            Thread.sleep(1000); // 自动挂起不阻塞OS线程            System.out.println(Thread.currentThread());        });    }} // 自动等待所有任务完成

5.2 CompletableFuture组合异步任务

CompletableFuture.supplyAsync(() -> queryDatabase(), executor)    .thenApplyAsync(data -> processData(data), executor)    .thenAcceptAsync(result -> sendNotification(result), executor)    .exceptionally(ex -> {        System.err.println("处理失败: " + ex.getMessage());        return null;    });

6. 性能优化对比

测试场景:处理10,000个HTTP请求

方案 耗时(ms) 内存占用(MB)
传统Thread 15,200 1,024
FixedThreadPool(20) 2,300 180
VirtualThreads 1,800 95

7. 总结与最佳实践

  1. 基础原则
    • 永远不要直接new Thread(),使用线程池
    • 根据任务类型选择线程池类型
    • 监控线程池关键指标
  2. 生产环境建议
    // 推荐配置模板ThreadPoolExecutor executor = new ThreadPoolExecutor(    Runtime.getRuntime().availableProcessors(), // 核心线程数    Runtime.getRuntime().availableProcessors() * 2, // 最大线程数    60, TimeUnit.SECONDS, // 空闲线程存活时间    new LinkedBlockingQueue<>(1000), // 有界队列    new CustomThreadFactory("App-Thread"), // 命名线程    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略);
  3. 扩展阅读方向
    • ForkJoinPool适合分治任务
    • ScheduledThreadPoolExecutor定时任务
    • 分布式环境考虑消息队列替代线程池

通过合理使用线程和线程池,可以构建出既高效又稳定的Java并发系统。建议在实际项目中结合APM工具(如SkyWalking)持续监控线程池状态。

上一篇 下一篇
评论
最新回复
    暂无内容
光阴似箭
今日已经过去小时
这周已经过去
本月已经过去
今年已经过去个月
文章目录
今日天气