当前位置:首页 > Java技术 > java多线程之ScheduleThreadPoolExecutor

java多线程之ScheduleThreadPoolExecutor

2022年11月08日 09:35:05Java技术18

ScheduledThreadPoolExecutor 介绍

  ScheduledThreadPoolExecutor 是一个可以实现定时任务的 ThreadPoolExecutor(线程池)。比 timer 更加灵活,效率更高!

  ScheduledThreadPoolExecutor结果如下图所示。

java多线程之ScheduleThreadPoolExecutor _ JavaClub全栈架构师技术笔记

我们,ThreadPoolExecutor的execute和submit方法继承于AbstractExecutorService。而ScheduleExecutorService是一个接口,里面并没有execute和submit方法,ScheduleThreadPoolExecutor里面重写了execute和submit方法。

ScheduledThreadPoolExecutor的四个构造方法如下:

/**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if {@code threadFactory} is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    /**
     * Creates a new ScheduledThreadPoolExecutor with the given
     * initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if {@code handler} is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

    /**
     * Creates a new ScheduledThreadPoolExecutor with the given
     * initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if {@code threadFactory} or
     *         {@code handler} is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

  ScheduledThreadPoolExecutor 继承于 ThreadPoolExecutor ,从其构造方法可以看出,此线程池的线程也不会空闲超时(keepAliveTime = 0),同时使用队列是无边界的DelayedWorkQueue;要注意是,虽然此类继承自 ThreadPoolExecutor,但是有几个继承的调整方法对此类并无作用,特别是在此类中设置 maximumPoolSize 是没有意义的,因为ScheduleThreadPoolExecutor 使用了无边界的任务队列,所以根本不需要创建多于 corePoolsize 数量的线程。

ScheduleThreadPoolExecutor 主要的方法介绍

1. 零延时的 execute()、submit() 方法

   execute()、submit() 方法都被重写了,本质上调用的还是 schedule() 方法;从下面的源码可以看出,这两个方法提交的任务都是延时为0的 “实时任务”;

/**
     * Executes {@code command} with zero required delay.
     * This has effect equivalent to
     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
     * Note that inspections of the queue and of the list returned by
     * {@code shutdownNow} will access the zero-delayed
     * {@link ScheduledFuture}, not the {@code command} itself.
     *
     * <p>A consequence of the use of {@code ScheduledFuture} objects is
     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
     * called with a null second {@code Throwable} argument, even if the
     * {@code command} terminated abruptly.  Instead, the {@code Throwable}
     * thrown by such a task can be obtained via {@link Future#get}.
     *
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution because the
     *         executor has been shut down
     * @throws NullPointerException {@inheritDoc}
     */
    public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
    }

    // Override AbstractExecutorService methods

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);
    }

2. 提交一个延时任务的 schedule() 方法

方法描述:

     /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

创建并执行在给定延迟后启用的 ScheduledFuture。

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

建并执行在给定延迟后启用的一次性操作。

3、 提交周期性的任务 scheduleAtFixedRate() 和 scheduleWithFixedDelay()

方法描述:

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

initialDelay 是此周期任务的开始执行时的延时时间(即只在第一次开始执行时延时,此后周期性地执行这个任务)。

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

指定了首次执行前的初始延时时间,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。

两者的区别:

scheduleAtFixedRate: 固定的周期时间。此方法的 period 参数所指的间隔时间是 从上一周期的任务开始时间到当前周期的任务开始时间的间隔。当上一周期任务执行结束了,如果任务的执行时间大于 指定的周期时间period ,那么便可以开始此周期任务的下一周期的执行。否则,便是间隔时间还没有达到一周期的时间,还需要继续等待,直到周期时间到来;总的来说,可以分为以下两种情况:

  • 任务的执行时间 > period参数:那么周期运行的时间便是 任务的执行时间。
  • 任务的执行时间 < period参数:那么周期运行的时间便是 period参数。

scheduleWithFixedDelay: 固定的间隔时间。此方法的 delay 参数所指的间隔时间是 从上一周期的任务的执行结束时间到当前周期的任务开始时间的间隔,是指定任务的固定的运行间隔,与任务的执行时间无关。

@ Example1 scheduleAtFixedRate 测试

简单起见,下面创建了只有一个线程 ScheduledThreadPoolExecutor 对象,也只提交一个周期任务。 下面的例子中,任务的执行时间大于 period 参数。

public class ScheduledThreadPoolExecutorTest {

    public static void main(String[] args) {
        //池中只有一个线程
        ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
        //作为一个周期任务提交,period 为1000ms,任务执行时间为2000ms
        schedulePool.scheduleAtFixedRate(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);
    }

    static class MyRunnable implements Runnable {

        int period = 1;

        @Override
        public void run() {
            //为周期任务捕获异常,避免异常影响下一周期的任务执行
            try {
                System.out.println("---------------第 " + period + " 周期-------------");
                System.out.println("begin = " + System.currentTimeMillis() / 1000);////任务执行时间
                Thread.sleep(2000);
                System.out.println("end =   " + System.currentTimeMillis() / 1000);
                period++;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

运行结果:

---------------第 1 周期-------------
begin = 1563007610
end =   1563007612
---------------第 2 周期-------------
begin = 1563007612
end =   1563007614
---------------第 3 周期-------------
begin = 1563007614
end =   1563007616
---------------第 4 周期-------------
begin = 1563007616
end =   1563007618
---------------第 5 周期-------------
begin = 1563007618
end =   1563007620

从结果可以看出,任务的周期执行是连着的,没有间隔时间。这是因为任务的运行时间大于周期执行时间,即当任务还没结束时,周期时间已经到了,所以任务刚结束,就可以进行下一周期的执行。

@ Example2 scheduleWithFixedDelay 测试

同样也是上面的例子,将周期方法换成 scheduleWithFixedDelay( )

public static void main(String[] args) {
        //池中只有一个线程
        ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
        //作为一个周期任务提交,delay 为1000ms
        schedulePool.scheduleWithFixedDelay(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);

    }

运行结果:

---------------第 1 周期-------------
begin = 1563007901
end =   1563007903
---------------第 2 周期-------------
begin = 1563007904
end =   1563007906
---------------第 3 周期-------------
begin = 1563007907
end =   1563007909
---------------第 4 周期-------------
begin = 1563007910
end =   1563007912

上面的scheduleWithFixedDelay例子的任务是间隔一个固定的时间执行的,无论任务的执行时间是否大于周期时间。

4. 线程池关闭

两个关闭线程池的方法,一旦线程池被关闭,就会拒绝以后提交的所有任务

void shutdown():

在以前已提交任务的执行中发起一个有序的关闭,但是不接受新任务。线程池中的周期任务、延时任务,根据下面的两个策略来判断是否继续正常运行,还是停止运行。

List<Runnable> shutdownNow():

尝试停止所有正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表。对于正在运行,尝试通过中断该线程来结束线程。对于尚未运行的任务,则都不再执行。

线程池关闭(shutdown())下的两个策略的描述

  • void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value):
    在调用线程池调用了 shutdown()方法后,是否继续执行现有延时任务(就是通过 schedule()方法提交的延时任务 )的策略;默认值为false;在以下两种种的情况下,延时任务将会被终止:

  • void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
    在调用线程池调用了 shutdown()方法后,是否继续执行现有周期任务(通过 scheduleAtFixedRate、scheduleWithFixedDelay 提交的周期任务)的策略;默认值为false;在以下两种的情况下,周期任务将会被终止:

获取这个两个策略的设置值:

boolean getContinueExistingPeriodicTasksAfterShutdownPolicy():

取有关在此执行程序已 shutdown 的情况下、是否继续执行现有定期任务的策略。

boolean getExecuteExistingDelayedTasksAfterShutdownPolicy():

获取有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略

@ Example3 shoutdown下的周期任务测试

还是基于上面的例子进行改造,main线程休眠10秒后,shutdown线程池。在默认的情况下(策略为false),因为间隔为1s,任务执行时间为2s,所以 shutdown 后,最多能执行4个周期;但是下面的例子,将策略的值设置为true,shutdown后,周期任务也可以正常运行下去。

public static void main(String[] args) throws InterruptedException{

        //池中只有一个线程
        ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
        //shutdown时,周期任务的策略
        schedulePool.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
        //作为周期任务提交
        ScheduledFuture future = schedulePool.scheduleWithFixedDelay(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);

        Thread.sleep(10*1000);

        schedulePool.shutdown();

    }

运行结果:

---------------第 1 周期-------------
begin = 1563008226
end =   1563008228
---------------第 2 周期-------------
begin = 1563008229
end =   1563008231
---------------第 3 周期-------------
begin = 1563008232
end =   1563008234
---------------第 4 周期-------------
begin = 1563008235
end =   1563008237
---------------第 5 周期-------------
begin = 1563008238
end =   1563008240
---------------第 6 周期-------------
begin = 1563008241
end =   1563008243
---------------第 7 周期-------------
begin = 1563008244

5. 移除任务、取消任务

BlockingQueue getQueue():
返回此执行程序使用的任务队列。此队列中的每个元素都是一个 ScheduledFuture,包括用 execute 所提交的那些任务
boolean remove(Runnable task):
从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
void setRemoveOnCancelPolicy(boolean value):
此方法是在1.7引入的,是用于对调用cancel()的任务的处理策略:是否马上移除出队列;默认为false;
周期任务也可以通过 ScheduledFuture的 cancel()取消运行;

Executors 提供了两个常用的ScheduledThreadPoolExecutor

  这两个常用的ScheduledThreadPoolExecutor:SingleThreadScheduledExecutor(单线程的线程池)、ScheduledThreadPool(线程数量固定的线程池),下面是 Executors 对应的源代码。

/**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.
     * (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newScheduledThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     * @return the newly created scheduled executor
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    /**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.  (Note
     * however that if this single thread terminates due to a failure
     * during execution prior to shutdown, a new one will take its
     * place if needed to execute subsequent tasks.)  Tasks are
     * guaranteed to execute sequentially, and no more than one task
     * will be active at any given time. Unlike the otherwise
     * equivalent {@code newScheduledThreadPool(1, threadFactory)}
     * the returned executor is guaranteed not to be reconfigurable to
     * use additional threads.
     * @param threadFactory the factory to use when creating new
     * threads
     * @return a newly created scheduled executor
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @param threadFactory the factory to use when the executor
     * creates a new thread
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

 

作者:何其小静
来源链接:https://www.cnblogs.com/heqiyoujing/p/11181287.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/java/68463.html

分享给朋友:

“java多线程之ScheduleThreadPoolExecutor” 的相关文章

Java 日志框架详解

Java 日志框架详解

1. JUL学习 JUL全称Java util Logging是java原生的日志框架,使用时不需要另外引用第三方类库,相对其他日志框 架使用方便,学习简单,能够在小型应用中灵活使用。 1.1 架构介绍 Loggers...

Java实现素数的判断

素数的定义只能被1和它本身整除,不包括1 例 2.3.5.7.11.13 实现代码 Scanner in=new Scanner(System.in); int n ; n=in.nextInt(); for(int n1=2;n1&l...

Java实现1到n的倒数的累加和

Java实现1到n的倒数的累加和

从键盘读入一个数,然后进行运算 实现代码: public static void main(String[] args) { Scanner in=new Scanner(System.in); int n ; n=in....

Java开发手册精华总结

Java开发手册精华总结

阿里 Java 开发手册的思考总结 一个优秀的工程师和一个普通的工程师的区别,不是满天飞的架构图,他的功底体现在所写的每一行代码上。 -- 毕玄 1. 命名风格 【书摘】类名用 UpperCamelCase 风格,比如 DO/BO/VO...

我对java String的理解 及 源码浅析

我对java String的理解 及 源码浅析

摘要: 摘要: 原创出处: http://www.cnblogs.com/Alandre/ 泥沙砖瓦浆木匠 希望转载,保留摘要,谢谢! 每天起床告诉自己,自己的目标是 ”技术 + 英语 还有生活“! -泥沙砖瓦浆木匠 一...

JAVA UUID 生成唯一标识

Writer:BYSocket(泥沙砖瓦浆木匠) 微博:BYSocket 豆瓣:BYSocket Reprint it anywhere u want 需求     项目在设计表的时候,要处理并发多...

深入浅出: Java回调机制(异步)

Writer      :BYSocket(泥沙砖瓦浆木匠) 什么是回调?今天傻傻地截了张图问了下,然后被陈大牛回答道“就一个回调…”。此时千万个草泥马飞奔而过(逃 哈哈,看着源码,享受着这种回调在代码上的作用,...

java泛型通配符详解

java泛型通配符详解

前言 泛型带来的好处 泛型中通配符 常用的 T,E,K,V,? ?无界通配符 上界通配符 < ? extends E> 下界通配符 < ? super E>...

JAVA IO 以及 NIO 理解

JAVA IO 以及 NIO 理解

由于Netty,了解了一些异步IO的知识,JAVA里面NIO就是原来的IO的一个补充,本文主要记录下在JAVA中IO的底层实现原理,以及对Zerocopy技术介绍。 IO,其实意味着:数据不停地搬入搬出缓冲区而已(使用了缓冲区)。比如,用户程序发起读操作,导致“ syscall...

Java回顾之Spring基础

Java回顾之Spring基础

这是针对Java进行回顾的一系列文章,这篇主要是和Spring基础相关。   第一篇:Java回顾之I/O   第二篇:Java回顾之网络通信   第三篇:Java回顾之多线程   第四篇:Java回顾之多线程同步   第五篇:Java回顾之集...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。