当前位置: 首页 >Java技术 > java多线程-CompletableFuture

java多线程-CompletableFuture

java8比较实用的一个多线程api

  • 开启一个异步任务supplyAsync
public static void main(String[] args) {ExecutorService executor = Executors.newCachedThreadPool();Supplier<List<String>> task1 = () -> {info("task1 begin");sleep(2000);retu new ArrayList() {{add("1");}};};//supplyAsync方法入参(Supplier,Executor)//第一个参数是需要执行的任务,返回的类型和task的泛型一致//第二个参数是执行任务的线程池,这是个重载的方法,不传则使用默认线程池CompletableFuture<List<String>> completableFuture = CompletableFuture.supplyAsync(task1,executor);info(completableFuture.join());//join阻塞式获取结果,与get方法样info("主线程结束");executor.shutdown();}private static void sleep(int i) {//模拟任务执行时间try {Thread.sleep(i);} catch (InterruptedException e) {}}private static void info(List<String> info) {System.out.println(Thread.currentThread().getName() + "------" + info);}private static void info(String info) {System.out.println(Thread.currentThread().getName() + "------" + info);}

 类似方法:runAsync开启没有返回值的任务

 

  • 合并两个任务thenCombine
public static void main(String[] args) {ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());Supplier<List<String>> task1 = () -> {info("task1 begin");sleep(1000);retu new ArrayList() {{ add("1"); }};};Supplier<List<String>> task2 = () -> {info("task2 begin");sleep(2000);retu new ArrayList() {{ add("2"); }};};Supplier<List<String>> task3 = () -> {info("task3 begin");sleep(3000);retu new ArrayList() {{ add("3"); }};};BiFunction<List<String>,List<String>,List<String>> combineFunc = (task1Result,task2Result)->{info("combine begin");task1Result.addAll(task2Result);retu task1Result;};//thenCombine(CompletableFuture,BiFunction)合并另一个任务,第二个参数处理2个任务的结果,这个处理结果BiFunction由这2个任务中结束慢的线程执行(例中1,2任务一起跑,由2任务的线程执行合并结果的BiFunction)//thenCombineAsync(CompletableFuture,BiFunction,Executor)重载方法,合并结果的BiFunction由线程池中的线程异步执行,不传第三个线程池则使用默认线程池执行合并结果CompletableFuture<List<String>> completableFuture = CompletableFuture.supplyAsync(task1,executor).thenCombine(CompletableFuture.supplyAsync(task2,executor),combineFunc).thenCombine(CompletableFuture.supplyAsync(task3,executor),combineFunc);info(completableFuture.join());info("主线程结束");executor.shutdown();}

  类似方法:thenAcceptBoth接收两个任务的结果但没有返回值,thenAfterBoth不关心两个任务的结果且没有返回值

 

  • 顺序执行的任务

1.thenApply

public static void main(String[] args) {ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());Supplier<List<String>> task1 = () -> {info("task1 begin");sleep(1000);retu new ArrayList() {{ add("1"); }};};Function<List<String>,List<String>> applyFunc = (task1Result->{info("task2 begin");sleep(2000);task1Result.add("2");retu task1Result;});//thenApplyAsync 和 thenApply区别是,去掉async则把task2和task1合并在一个线程中执行CompletableFuture<List<String>> completableFuture = CompletableFuture.supplyAsync(task1,executor).thenApplyAsync(applyFunc,executor);info(completableFuture.join());info("主线程结束");executor.shutdown();}

类似方法:thenRun不接收task1结果且没有返回值,thenAccept接收task1结果且没有返回值

2.thenCompose

   public static void main(String[] args) {ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());Supplier<List<String>> task1 = () -> {info("task1 begin");sleep(1000);retu new ArrayList() {{ add("1"); }};};Function<List<String>,CompletableFuture<List<String>>> composeFunc = (task1Result->{info("compose task begin");sleep(2000);task1Result.add("com");retu CompletableFuture.supplyAsync(() -> {info("task2 begin");sleep(2000);task1Result.add("2");retu task1Result;},executor);});//composeFunc中有2段代码块,需要执行,一段是compose task;二段是task2;//task1执行完成后,执行compose task,最后执行task2,顺序执行(每段执行完后执行下一段)//thenComposeAsync和thenCompose的区别是:compose task的执行线程,thenCompose使用task1的线程执行,thenComposeAsync则使用第二个参数中线程池(不传入executor则使用默认线程池)执行CompletableFuture<List<String>> completableFuture = CompletableFuture.supplyAsync(task1,executor).thenComposeAsync(composeFunc,executor);info(completableFuture.join());info("主线程结束");executor.shutdown();}

 

  • 对两个任务中最先执行完成的结果进行处理applyToEither
public static void main(String[] args) {ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());Supplier<List<String>> task1 = () -> {info("task1 begin");sleep(1000);retu new ArrayList() {{ add("1"); }};};Supplier<List<String>> task2 = () -> {info("task2 begin");sleep(2000);retu new ArrayList() {{ add("2"); }};};Function<List<String>,List<String>> eitherFunc = (fasterTask->{info("fasterTask");fasterTask.add("f");retu fasterTask;});//applyToEither接收一个任务CompletableFuture和一个处理结果的Function,两个任务优先执行完的任务将结果传入Function处理//applyToEither 和 applyToEitherAsync区别是,applyToEither再较快task执行完成后继续用该线程执行eitherFunc,Async将eitherFunc放入新的线程执行CompletableFuture<List<String>> completableFuture = CompletableFuture.supplyAsync(task1,executor).applyToEitherAsync(CompletableFuture.supplyAsync(task2,executor),eitherFunc,executor);info(completableFuture.join());info("主线程结束");executor.shutdown();}

类似方法:acceptEither接收最快任务的结果但没有返回值,runAfterEither不关心最快任务的结果也没有返回值

 

  • 同时执行多个任务

1.allOf

public static void main(String[] args) {ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());Supplier<List<String>> task1 = () -> {info("task1 begin");sleep(1000);retu new ArrayList() {{ add("1"); }};};Supplier<List<String>> task2 = () -> {info("task2 begin");sleep(2000);retu new ArrayList() {{ add("2"); }};};Supplier<List<String>> task3 = () -> {info("task3 begin");sleep(5000);retu new ArrayList() {{ add("3"); }};};//同时执行task123,没有返回值CompletableFuture<Void> completableFuture =CompletableFuture.allOf(  CompletableFuture.supplyAsync(task1,executor),CompletableFuture.supplyAsync(task2,executor),CompletableFuture.supplyAsync(task3,executor));info("主线程继续");//join方法在这里阻塞至所有任务执行完成completableFuture.join();info("主线程结束");executor.shutdown();}

2.anyOf

public static void main(String[] args) {ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());Supplier<List<String>> task1 = () -> {info("task1 begin");sleep(1000);retu new ArrayList() {{ add("1"); }};};Supplier<List<String>> task2 = () -> {info("task2 begin");sleep(2000);retu new ArrayList() {{ add("2"); }};};Supplier<List<String>> task3 = () -> {info("task3 begin");sleep(5000);retu new ArrayList() {{ add("3"); }};};//同时执行task123,返回最先执行完成任务的结果//由于多个任务返回值可以不一致,用object接收CompletableFuture<Object> completableFuture = CompletableFuture.anyOf(CompletableFuture.supplyAsync(task1, executor), CompletableFuture.supplyAsync(task2, executor), CompletableFuture.supplyAsync(task3, executor));info("主线程继续");//阻塞至最快一个任务执行完成info((List<String>) completableFuture.join());info("主线程结束");executor.shutdown();}

 

  • 异常处理
public static void main(String[] args) {ExecutorService executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());Supplier<List<String>> task1 = () -> {info("task1 begin");sleep(1000);retu new ArrayList() {{ add("1"); }};};Supplier<List<String>> task2 = () -> {info("task2 begin");int i = 1/0;sleep(1000);retu new ArrayList() {{ add("2"); }};};Supplier<List<String>> task3 = () -> {info("task3 begin");sleep(1000);retu new ArrayList() {{ add("3"); }};};BiFunction<List<String>,List<String>,List<String>> combineFunc = (task1Result, task2Result)->{info("combine begin");task1Result.addAll(task2Result);retu task1Result;};//exceptionally可以处理CompletableFuture任务中抛出的异常,可以放在这个链式处理中的任意地方,相当于try catch的范围不一样//放在thenCombine(CompletableFuture.supplyAsync(task2,executor).exceptionally(throwable -> {retu new ArrayList<>();}),主线程结果main------[1, 3]//放在thenCombine(CompletableFuture.supplyAsync(task2,executor),combineFunc).exceptionally(throwable -> {retu new ArrayList<>();}),主线程结果main------[3]//放在thenCombine(CompletableFuture.supplyAsync(task3,executor),combineFunc).exceptionally(throwable -> {retu new ArrayList<>();}),主线程结果main------[]CompletableFuture<List<String>> completableFuture = CompletableFuture.supplyAsync(task1,executor).thenCombine(CompletableFuture.supplyAsync(task2,executor),combineFunc).thenCombine(CompletableFuture.supplyAsync(task3,executor),combineFunc).exceptionally(throwable -> {retu new ArrayList<>();});info(completableFuture.join());info("主线程结束");executor.shutdown();}

 

作者:扶不起的刘阿斗
来源链接:https://www.cnblogs.com/liuboyuan/p/16052633.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/java/117197.html

分享给朋友:

“java多线程-CompletableFuture” 的相关文章

yum安装步骤(网络下载安装) 2022年05月15日 10:23:29
SpringBoot整合Redis缓存 2022年05月15日 21:59:14
浅谈分布式与集群的概念 2022年05月15日 22:00:23
Centos 6.4最小化安装后的优化(2) 2022年05月16日 19:50:28
网络协议栈基本知识 2022年05月16日 20:33:52
java高级 2022年05月30日 21:31:04
深入理解Java类加载 2022年05月31日 21:40:17