线程池作用、用法以及原理

article/2025/9/28 23:24:19

线程池

  • 作用
  • 用法
    • 建议设定大小
    • 快捷构造线程池
    • submit与execute
    • shutdown与shutdownNow
    • Future与FutureTast
    • 代码
  • 状态
  • 底层原理
    • 继承关系
    • 主要参数
    • 工作原理
    • 饱和策略
    • 连接复用

作用

1,降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗
2,提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行
3,提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

尽量通过Executor来启动线程,这种方法比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。

用法

使用ThreadPoolExecutor构造线程池,将线程(任务)传入

创建线程池时需要设定特殊参数,如核心线程池大小、最大线程池大小、缓冲队列大小等

获取结果也是线程池使用的一大难点,普通的Future接口以及FutureTask、ListenableFuture等都可以实现接受结果
在这里插入图片描述

建议设定大小

1、CPU计算较多的时候,被叫做CPU密集型应用,核心线程数设置为N+1,N为CPU个数
2、IO操作较多时,被叫做IO密集型应用,设置为2*N

那么这个大小到底是如何确定下来的呢?

通过Little’s Law(利特尔法则)确定的,这个法则说一个系统请求数等于请求的到达率与平均每个单独请求花费的时间之乘积,也就是说知道三点即可确定线程数:请求CPU处理速度、一个请求所消耗的时间、CPU数目
在这里插入图片描述
以上过程比较理论化,在实战中会有很多特殊情况发生,比如下午3,4点的流量,和 12 点左右午饭时的流量不一样,而美团给出了动态化配置的解决方案

在源码中有一些诡异的方法,我们一般不会注意到,比如:

    public void setCorePoolSize(int corePoolSize) {if (corePoolSize < 0)throw new IllegalArgumentException();int delta = corePoolSize - this.corePoolSize;this.corePoolSize = corePoolSize;if (workerCountOf(ctl.get()) > corePoolSize)interruptIdleWorkers();else if (delta > 0) {// We don't really know how many new threads are "needed".// As a heuristic, prestart enough new workers (up to new// core size) to handle the current number of tasks in// queue, but stop if queue becomes empty while doing so.int k = Math.min(delta, workQueue.size());while (k-- > 0 && addWorker(null, true)) {if (workQueue.isEmpty())break;}}}

该方法让我们可以在线程池运行时修改核心线程数,JDK不止提供了这个方法,最大线程数等都可以修改。这就叫动态配置

快捷构造线程池

只建议用ThreadPoolExecutor来创建线程池
不建议使用Executors中的以下四种方法创建
前两种队列大小可达INTEGER_VALUE,后两种线程多少可达INTEGER_VALUE,而这两种都会消耗系统资源
在源码中以下方法也只是返回参数固定的ThreadPoolExecutor对象

1,FixedThreadPool:固定线程数的线程池
2,SingleThreadExecutor:只有一个线程的线程池
3,CachedThreadPool:主线程提交任务的速度高于线程处理任务的速度时,会不断创建新的线程
4,ScheduledThreadPoolExecutor:定时执行任务

submit与execute

submit可以提交Callable子类对象并获得一个Future类型的对象,比如FutrueTask作为返回值。可以通过Future的get方法来获取返回值,不过get会阻塞调用该方法的线程,因此是同步的,get方法可以带时间,时间一过就会抛出异常

execute方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否,同时由于Runnable的run方法没有抛出异常,因此Runnable实现的线程也不会抛出异常

shutdown与shutdownNow

shutdown:关闭线程池,线程池的状态变为SHUTDOWN,不再接受新任务,并执行所有在队列中的任务

shutdownNow:立即关闭线程池,线程池的状态变为STOP,停止执行所有任务,返回在队列中的任务链表

同时,还有isShutdown()方法与isTerminated()方法来判断它是否执行shutdown方法以及是否抛出了所有队列进入了terminated状态

Future与FutureTast

Future是一个接口,里面定义了一些方法:

boolean cancel(boolean mayInterruptIfRunning);如果任务已经启动,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务
boolean isCancelled();
boolean isDone();
V get();
V get(long timeout, TimeUnit unit);

FutureTask除了实现了Future接口外还实现了Runnable接口(即可以通过Runnable接口实现线程,也可以通过Future取得线程执行完后的结果),因此FutureTask也可以直接提交给Executor执行

代码

public class test4 {private static final int CORE_POOL_SIZE = 6;private static final int MAX_POOL_SIZE = 10;private static final int QUEUE_CAPACITY = 100;private static final Long KEEP_ALIVE_TIME = 1L;public static void main(String[] args) throws InterruptedException {TreadTest t = new TreadTest();ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,new ArrayBlockingQueue<>(QUEUE_CAPACITY),new ThreadPoolExecutor.CallerRunsPolicy());for(int i = 0; i < 30; i++) {
//        	TreadTest t = new TreadTest();executor.submit(t);}executor.shutdown();while (!executor.isTerminated()) {}System.out.println("Finished all threads");}
}class TreadTest implements Callable<Integer> {volatile int j = 0;@Overridepublic Integer call() {for(int i = 0; i < 10; i++) {System.out.println("Thread Name= "+Thread.currentThread().getName() + "number = "+ j++);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}//formatter pattern is changed here by thread, but it won't reflect to other threadsreturn j;}
}

状态

线程池有5种状态

1,RUNNING:线程池一旦被创建,就处于 RUNNING 状态,任务数为0,能够接收新任务,对已排队的任务进行处理。

2,SHUTDOWN:不接收新任务,但能处理已排队的任务。调用线程池的shutdown()方法,线程池由RUNNING转变为SHUTDOWN状态。

3,STOP:不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()方法,线程池由RUNNING或SHUTDOWN转变为STOP状态。

4,TIDYING(整理):

SHUTDOWN状态下,任务数为 0,其他所有任务已终止,线程池会变为TIDYING状态,会执行terminated方法。线程池中的terminated()方法是空实现,可以重写该方法进行相应的处理
线程池在SHUTDOWN状态,任务队列为空且执行中任务为空,线程池就会由SHUTDOWN转变为TIDYING状态
线程池在STOP状态,线程池中执行中任务为空时,就会由STOP转变为TIDYING状态

5,TERMINATED(结束):线程池彻底终止。线程池在TIDYING状态执行完terminated()方法就会由TIDYING转变为TERMINATED状态

在这里插入图片描述

底层原理

继承关系

Executor接口 ->
ExecutorServer接口 ->
AbstractExecutorServer接口 ->
ThreadPoolExecutor类
如果加入已计划的线程池,就成了如下结果:
在这里插入图片描述
ScheduledExecutorService主要是用来做定时任务的

以下是ExecutorService接口的所有方法(Execute接口只有一个execute方法)
在这里插入图片描述
可以看到该接口就是表示了线程池的生命周期,这个线程池服务的invokeAll方法是将集合中所有的Callable都执行,invokeAny提交所有但是只返回一个最先完成的结果,其他的主要方法下面会说

以下源码主要来自ThreadPoolExecutor类

主要参数

corePoolSize:核心线程数,线程池有多少线程同时运行

maximumPoolSize:最大线程数,当提交的任务超过队列大小时,当前可以同时运行的线程数量变为最大线程数

workQueue:缓冲队列,指最大可以存放的任务数,注意,线程执行任务时任务还在队列中,可以把缓冲队列设置为0看看效果

handler:饱和策略,如果缓冲队列慢了会怎么处理提交的任务

工作原理

1,核心线程数未满时,提交任务,无可用线程时线程数加一
2,核心线程数已满时,提交任务,到达缓冲队列
3,缓冲队列已满时,提交任务,创建新线程直到到达最大线程数
4,继续提交任务,线程池使用饱和策略,饱和策略可以在构造线程池时设定

以上四步也是ThreadPoolExecutor的execute方法的过程

但是在开始之前需要来一点预备知识

	// ctl 表示了线程的状态以及当前激活的线程数,用一个值表示了两种东西,很离谱对不对private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// Integer.SIZE 就是32,COUNT_BITS 就是29private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 这里就是线程池运行时状态private static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;// 提供读取当前线程数、当前运行状态的方法private static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }

来解释一下,ctl 的低29位用于存放当前的线程数,因此一个线程池在理论上最大的线程数是(2^29)-1;高3位是用于表示当前线程池的状态,其中高三位的值和状态对于如下:

  • 111: RUNNING
  • 000: SHUTDOWN
  • 001: STOP
  • 010: TIDYING
  • 011: TERMINATED

在以后的处理需要多个数据的问题的时候,也可以模仿这种优雅的写法。接下来来看看execute方法

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();// ctl 是个原子类,拿到ctl的值int c = ctl.get();// 如果核心线程数大于现在正在执行的线程数,workerCountOf方法用于获取当前正在执行的线程数if (workerCountOf(c) < corePoolSize) {// 调用addWorker创建一个线程并返回,如果创建失败再获取一个ctlif (addWorker(command, true))return;c = ctl.get();}// 如果当前线程池在跑并且将command成功加入了队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 如果当前线程池没在跑并且将command删除成功了,则执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// 如果发生什么事情删除失败了或者当前线程池在运行中,则会判断工作线程是否为0 ,如果过为0 就创建一个非核心线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 尝试创建一个工作线程,如果线程池挂了或者大于最大线程数,执行拒绝策略else if (!addWorker(command, false))reject(command);// 其他情况不做处理}

其中addWorker方法用于创建新线程,它使用HashSet来存放线程,里面放的是Worker,只有持有全局锁mainLock的前提下才能访问此集合。同时这个方法返回true则表示线程创建成功,false表示失败

    private final ReentrantLock mainLock = new ReentrantLock();// 线程池的最大大小private int largestPoolSize;// 工作线程放在这里private final HashSet<Worker> workers = new HashSet<>();private boolean addWorker(Runnable firstTask, boolean core) {//这个retry用于跳出两层循环,两次循环创建失败后再次进行资格判断retry:for (;;) {//条件判断int c = ctl.get();int rs = runStateOf(c);if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//以下for循环主要为了将workcount的数量加1for (;;) {//获取线程池中工作的线程的数量int wc = workerCountOf(c);// core参数为true的话表明队列也满了,线程池大小变为 maximumPoolSizeif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//原子操作将workcount的数量加1if (compareAndIncrementWorkerCount(c))break retry;// 如果线程的状态改变了就再次执行上述操作c = ctl.get();if (runStateOf(c) != rs)continue retry;}}// 标记工作线程是否启动成功boolean workerStarted = false;// 标记工作线程是否创建成功boolean workerAdded = false;Worker w = null;//以下过程尝试去创建工作线程try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {// 加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//获取线程池状态int rs = runStateOf(ctl.get());//rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中//(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker// firstTask == null证明只新建线程而不执行任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);//更新当前工作线程的最大容量int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;// 工作线程是否启动成功workerAdded = true;}} finally {mainLock.unlock();}// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例if (workerAdded) {t.start();// 标记线程启动成功workerStarted = true;}}} finally {// 线程启动失败,需要从工作线程中移除对应的Workerif (! workerStarted)addWorkerFailed(w);}return workerStarted;}

饱和策略

1,拒绝执行任务并抛出异常
2,使用调用线程池的线程执行任务
3,丢弃此任务
4,丢弃第一个等待的任务
还有更多处理方法,可以自行百度

连接复用

private Runnable getTask() {boolean timedOut = false;for (;;) {int c = ctl.get();int wc = workerCountOf(c);// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;// 对于超过核心线程数量的这些线程,需要进行超时控制boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if (timed && timedOut) {// 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时,那么尝试将workerCount减1,即当前活动线程数减1,// 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 注意workQueue中的poll()方法与take()方法的区别// poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null// take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

上面的代码实现了连接池的线程复用,以及超过核心线程数的线程如何被销毁,核心线程如何被保存,思路就是把Runnable放进BlockingQueue里,用一定方式调用线程去拿任务

如果当前活动线程数大于核心线程数,当去缓存队列中取任务的时候,如果缓存队列中没任务了,则等待keepAliveTime的时长,此时还没任务就返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数就会这样一个一个地销毁这些多余的线程

如果当前活动线程数小于等于核心线程数,同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态,直到能取出任务为止,因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的

所以最大线程与核心线程的不同,就是调用获取任务的方法不同,一个take一个是poll,这两个方法都由BlockingQueue友情提供


http://chatgpt.dhexx.cn/article/eBBbFnDB.shtml

相关文章

线程池-线程池的好处

1.线程池的好处。 线程使应用能够更加充分合理的协调利用cpu 、内存、网络、i/o等系统资源。 线程的创建需要开辟虚拟机栈&#xff0c;本地方法栈、程序计数器等线程私有的内存空间。 在线程的销毁时需要回收这些系统资源。频繁的创建和销毁线程会浪费大量的系统资源&#xff0…

什么是线程池,线程池的作用

线程池&#xff0c;--其实就是一个 容纳多个线程的容器 &#xff0c;其中的线程可以反复使用&#xff0c;省去了频繁创建线程对象的操作 &#xff0c;--无需反复创建线程而消耗过多资源。 创建销毁线程是一个非常消耗性能的。 我们详细的解释一下为什么要使用线程池&#xff1f…

华为防火墙实战配置教程,太全了

防火墙是位于内部网和外部网之间的屏障&#xff0c;它按照系统管理员预先定义好的规则来控制数据包的进出。防火墙是系统的第一道防线&#xff0c;其作用是防止非法用户的进入。 本期我们一起来总结下防火墙的配置&#xff0c;非常全面&#xff0c;以华为为例。 防火墙的配置…

防火墙配置(命令)

拓扑图&#xff1a; 目的&#xff1a;PC1和PC2相互ping通。 配置命令&#xff1a; FW1: //添加端口IP [SRG-GigabitEthernet0/0/0]interface GigabitEthernet0/0/1 [SRG-GigabitEthernet0/0/1]ip add 192.168.1.1 24 [SRG]inter g0/0/2 [SRG-GigabitEthernet0/0/2]ip add 10.…

防火墙基础配置(二)

拓补图&#xff1a; 方案一&#xff08;子接口的形式&#xff09; 实验目的&#xff1a;解决防火墙上的接口不足以为其他区域服务的问题&#xff0c;比方说防火墙上只有两个接口&#xff0c;但是有三个区域&#xff0c;那这个实验的目的为了解决防火墙上的接口不足以为多区域提…

H3C防火墙基础配置2-配置安全策略

1 安全策略简介 安全策略对报文的控制是通过安全策略规则实现的&#xff0c;规则中可以设置匹配报文的过滤条件&#xff0c;处理报文的动作和对于报文内容进行深度检测等功能。 &#xff08;1&#xff09;规则的名称和编号 安全策略中的每条规则都由唯一的名称和编号标识。名称…

H3C防火墙-安全域配置举例

1. 组网需求 某公司以 Device 作为网络边界安全防护设备&#xff0c;连接公司内部网络和 Internet。公司只对内部提供Web 服务&#xff0c;不对外提供这些服务。现需要在设备上部署安全域&#xff0c;并基于以下安全需求进行域间策略 的配置。 • 与接口 GigabitEthernet1/0/1 …

Firewalld防火墙实例配置

文章目录 环境拓扑需求描述一、环境配置二、防火墙配置1、在网站服务器上配置防火墙2、网关服务器配置防火墙3、企业内网访问外网web服务器4、外网web服务器访问企业内部网站服务器 三、总结问题总结解决方案 环境拓扑 需求描述 1、 网关服务器连接互联网网卡ens33地址为100.1…

【Linux】配置网络和firewall防火墙(超详细介绍+实战)

&#x1f947;&#x1f947;【Liunx学习记录篇】&#x1f947;&#x1f947; 篇一&#xff1a;【Linux】VMware安装unbuntu18.04虚拟机-超详细步骤(附镜像文件&#xff09; 篇二&#xff1a;【Linux】ubuntu18.04系统基础配置及操作 篇三&#xff1a;【Linux】用户与组的操作详…

防火墙配置

防火墙&#xff08;Firewall&#xff09;&#xff0c;也称防护墙。它是一种位于内部网络与外部网络之间的网络安全系统。一项信息安全的防护系统&#xff0c;依照特定的规则&#xff0c;允许或是限制传输的数据通过。防火墙对于我们的网络安全的重要性不言而喻 但是在实际的开发…

H3C防火墙-安全策略典型配置举例

基于 IP 地址的安全策略配置举例 1.组网需求 • 某公司内的各部门之间通过 Device 实现互连&#xff0c;该公司的工作时间为每周工作日的 8 点到 18点。 • 通过配置安全策略规则&#xff0c;允许总裁办在任意时间、财务部在工作时间通过 HTTP 协议访问财务数据库服务器的 Web…

华为防火墙配置教程

01 了解防火墙基本机制 配置防火墙之前请了解防火墙基本工作机制。 1.1 什么是防火墙 防火墙是一种网络安全设备&#xff0c;通常位于网络边界&#xff0c;用于隔离不同安全级别的网络&#xff0c;保护一个网络免受来自另一个网络的攻击和入侵。这种“隔离”不是一刀切&#x…

防火墙基本配置

防火墙 种类 1.包过滤技术 「 静态防火墙 动态防火墙 」 netfilter 真正的配置 位于linux内核的包过滤功能体系 称为linux防火墙的“内核态” iptables 防火墙的配置 工具 主要针对 网络层 针对IP数据包 「体现在对包的IP地址、端口等信息处理」 链表结构 链 ---- 容纳 规则…

防火墙详解(三)华为防火墙基础安全策略配置(命令行配置)

实验要求 根据实验要求配置防火墙&#xff1a; 合理部署防火墙安全策略以及安全区域实现内网用户可以访问外网用户&#xff0c;反之不能访问内网用户和外网用户均可以访问公司服务器 实验配置 步骤一&#xff1a;配置各个终端、防火墙端口IP地址 终端以服务器为例&#x…

防火墙基础配置

状态防火墙 状态检测防火墙&#xff08;Stateful Firewall&#xff09;是一种网络安全设备&#xff0c;它可以检测和过滤网络流量&#xff0c;以保护网络不受未经授权的访问和攻击。 与传统的包过滤防火墙不同&#xff0c;状态检测防火墙可以跟踪网络连接的状态&#xff0c;并…

10分钟教你完全掌握防火墙配置!!!!!

今日提问 1.防火墙支持那些NAT技术&#xff0c;主要应用场景是什么&#xff1f; 2.当内网PC通过公网域名解析访问内网服务器时&#xff0c;会存在什么问题&#xff0c;如何解决&#xff1f;请详细说明 3.防火墙使用VRRP实现双机热备时会遇到什么问题&#xff0c;如何解决&…

H3C简单的防火墙配置

这里写目录标题 实验拓扑实验需求配置过程1.配置ip地址&#xff08;略&#xff09;2.配置去往公网的默认路由3.将端口绑定在信任域和不信任域4.配置ipv4安全模板5.配置ospf将内网的连通性完成6.配置nat &#xff08;easy ip的方式&#xff09;使内网PC可以访问外网 测试 实验拓…

防火墙的基础配置(一)

拓补图&#xff1a; 注意事项&#xff1a; 不要连接着防火墙的g0/0/0口&#xff0c;这个口是防火墙的管理端口一定要将接口划分区域&#xff0c;防火墙有四个区域&#xff0c;分别是local、trust、untrust、dmz&#xff0c;优先级分别是100、85、50、5&#xff0c;优先级越高说…

三、防火墙配置(1)---防火墙常规配置

一、实验内容 1、在GNS中创建如下图所示的网络拓扑结构。 2、给路由器和防火墙按照拓扑图中的规划&#xff0c;配置好IP地址和路由表。给R1、R2、R4、R6开启远程连接。3、验证防火墙默认安全规则&#xff0c;高安全级别接口&#xff08;inside&#xff09;可主动访问低安全级别…

华为防火墙网管配置实例

今天给大家带来华为USG6000防火墙的网管配置实例。本文简单的搭建了一个实验拓扑图&#xff0c;通过配置&#xff0c;实现了对华为防火墙的Telnet管理配置、SSH管理配置和Web管理配置。 一、实验拓扑及目的 实验拓扑如上所示&#xff0c;要求对防火墙FW1进行合适的配置&#x…