// 方式1:继承Thread类class MyThread extends Thread { @Override public void run() { System.out.println("线程运行: " + Thread.currentThread().getName()); }}// 方式2:实现Runnable接口class MyRunnable implements Runnable { @Override public void run() { System.out.println("Runnable运行: " + Thread.currentThread().getName()); }}public class Main { public static void main(String[] args) { // 启动线程 new MyThread().start(); // 输出:线程运行: Thread-0 new Thread(new MyRunnable()).start(); // 输出:Runnable运行: Thread-1 // Java 8 Lambda简化 new Thread(() -> System.out.println("Lambda线程: " + Thread.currentThread().getName())) .start(); }}
import java.io.*;import java.net.URL;public class ParallelDownloader { public static void main(String[] args) { String[] urls = { "https://example.com/file1.zip", "https://example.com/file2.jpg", "https://example.com/file3.pdf" }; for (String url : urls) { new Thread(() -> downloadFile(url)).start(); } } private static void downloadFile(String url) { try (InputStream in = new URL(url).openStream(); FileOutputStream out = new FileOutputStream( new File(url.substring(url.lastIndexOf('/') + 1))) { byte[] buffer = new byte[1024]; int bytesRead; while ((bytesRead = in.read(buffer)) != -1) { out.write(buffer, 0, bytesRead); } System.out.println("下载完成: " + url); } catch (IOException e) { System.err.println("下载失败: " + url + " - " + e.getMessage()); } }}
import java.util.concurrent.*;public class ThreadPoolDemo { public static void main(String[] args) { // 创建固定大小线程池 ExecutorService fixedPool = Executors.newFixedThreadPool(3); // 提交10个任务 for (int i = 1; i <= 10; i++) { int taskId = i; fixedPool.execute(() -> { System.out.println("执行任务" + taskId + " - " + Thread.currentThread().getName()); try { Thread.sleep(1000); // 模拟任务耗时 } catch (InterruptedException e) { e.printStackTrace(); } }); } fixedPool.shutdown(); // 优雅关闭 }}
import java.util.concurrent.*;import java.util.concurrent.atomic.*;public class SeckillSystem { // 商品库存(原子操作保证线程安全) private static final AtomicInteger stock = new AtomicInteger(100); public static void main(String[] args) throws InterruptedException { // 创建线程池(建议根据压测结果调整参数) ThreadPoolExecutor executor = new ThreadPoolExecutor( 20, 50, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy() ); // 模拟5000个并发请求 CountDownLatch latch = new CountDownLatch(5000); for (int i = 0; i < 5000; i++) { executor.execute(() -> { try { if (stock.decrementAndGet() >= 0) { System.out.println(Thread.currentThread().getName() + " 抢购成功"); } else { System.out.println("库存不足"); } } finally { latch.countDown(); } }); } latch.await(); executor.shutdown(); System.out.println("最终库存: " + stock.get()); }}
import java.util.concurrent.*;class CustomThreadFactory implements ThreadFactory { private final String namePrefix; private final AtomicInteger counter = new AtomicInteger(1); CustomThreadFactory(String namePrefix) { this.namePrefix = namePrefix; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, namePrefix + "-" + counter.getAndIncrement()); t.setDaemon(false); t.setPriority(Thread.NORM_PRIORITY); return t; }}// 使用示例ExecutorService service = new ThreadPoolExecutor( 5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new CustomThreadFactory("Order-Processor"));
public class ThreadPoolMonitor implements Runnable { private final ThreadPoolExecutor executor; public ThreadPoolMonitor(ThreadPoolExecutor executor) { this.executor = executor; } @Override public void run() { while (true) { System.out.printf( "监控: 活跃线程=%d, 队列大小=%d, 完成任务=%d%n", executor.getActiveCount(), executor.getQueue().size(), executor.getCompletedTaskCount() ); try { Thread.sleep(3000); } catch (InterruptedException e) { break; } } }}// 添加到线程池ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();new Thread(new ThreadPoolMonitor(executor)).start();
// 推荐策略(根据业务选择):ThreadPoolExecutor executor = new ThreadPoolExecutor( coreSize, maxSize, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), // 策略1:由提交线程直接执行(避免任务丢失) new ThreadPoolExecutor.CallerRunsPolicy() // 策略2:记录日志后丢弃(适合非关键任务) // new ThreadPoolExecutor.DiscardPolicy());
// 使用VisualVM或Arthas工具检测// 或添加超时控制Future<?> future = executor.submit(() -> { // 可能死锁的操作});try { future.get(5, TimeUnit.SECONDS); // 设置超时} catch (TimeoutException e) { future.cancel(true); // 中断任务}
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { for (int i = 0; i < 10_000; i++) { executor.submit(() -> { Thread.sleep(1000); // 自动挂起不阻塞OS线程 System.out.println(Thread.currentThread()); }); }} // 自动等待所有任务完成
CompletableFuture.supplyAsync(() -> queryDatabase(), executor) .thenApplyAsync(data -> processData(data), executor) .thenAcceptAsync(result -> sendNotification(result), executor) .exceptionally(ex -> { System.err.println("处理失败: " + ex.getMessage()); return null; });
测试场景:处理10,000个HTTP请求
方案 | 耗时(ms) | 内存占用(MB) |
---|---|---|
传统Thread | 15,200 | 1,024 |
FixedThreadPool(20) | 2,300 | 180 |
VirtualThreads | 1,800 | 95 |
new Thread()
,使用线程池// 推荐配置模板ThreadPoolExecutor executor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), // 核心线程数 Runtime.getRuntime().availableProcessors() * 2, // 最大线程数 60, TimeUnit.SECONDS, // 空闲线程存活时间 new LinkedBlockingQueue<>(1000), // 有界队列 new CustomThreadFactory("App-Thread"), // 命名线程 new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略);
通过合理使用线程和线程池,可以构建出既高效又稳定的Java并发系统。建议在实际项目中结合APM工具(如SkyWalking)持续监控线程池状态。