juc进阶-no.2 completablefuture_社畜阿藏405的博客-爱代码爱编程
文章目录
⭐NO.2 CompletableFuture
一. Future和Callable接口
1.Future
Future接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果,取消任务的执行,判断任务是否被取消,判断任务执行是否完毕等.
2.Callable
Callable接口中定义了需要有返回的任务需要实现的方法
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
比如主线程让一个子线程去执行任务,子线程比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会儿才去取子任务的执行结果
二. FutureTask的缺点
1.FutureTask::get无参方法,推荐放在最后使用 ->get方法阻塞 -> 不见不散
因为get方法不管是否计算完成都被阻塞,这样就不能算是异步操作了
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException
{
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in");
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
return 1024;
});
new Thread(futureTask,"t1").start();
System.out.println("我在继续");
//不见不散,只要出现get方法,不管是否计算完成都阻塞等待结果出来再运行
System.out.println(futureTask.get());
}
而高并发时需要克服阻塞,尽量少加锁不要加锁,以及思考什么可以替代锁
2.FutureTask:: get(long timeout, TimeUnit unit) 有参 -> 过时不候
如果子线程运行的时间超过了timeout参数设置的时间则会抛出TimeoutException异常
Exception in thread "main" java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204)
at com.atguigu.juc.cf.FutureTaskDemo.main
3.FutureTask::isDone() -> 什么可以替代锁 -> 轮询代替阻塞(个人看来是一种自我欺骗)
- 轮询方式会消耗无谓的CPU资源,而且也不见得能及时得到计算结果
- 如果想要异步获取结果,通常都会以轮询的方式去获取结果尽量不要阻塞
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1024;
});
new Thread(futureTask, "t1").start();
// System.out.println(futureTask.get());//不见不散,只要出现get方法,不管是否计算完成都阻塞等待结果出来再运行
// System.out.println(futureTask.get(2L,TimeUnit.SECONDS));//过时不候
System.out.println("我在继续");
//不要阻塞,尽量用轮询替代
while (true) {
if (futureTask.isDone()) {
System.out.println("----result: " + futureTask.get());
break;
} else {
System.out.println("还在计算中,别催,越催越慢,再催熄火");
}
}
}
4.更复杂的任务(下文中Future的改进解决方案)
- 应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
- 将两个异步计算合成一个一部计算,这两个异步计算互相独立,同时第二个又依赖第一个结果
- 当Future集合中某个任务最快结束时,返回结果
- 等待Future结合中的所有任务都完成
三. Future的改进 ->为了更复杂的任务
1.CompletableFuture和CompletionStage源码分别介绍
-
什么是CompletableFuture -> 是 Future++
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
-
在Java8中,CompletableFuture 提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture 的方法
可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
-
他可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作
-
它实现了Future和compleionStage
-
-
什么是CompletionStage
代表一部计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分割符传参数
- CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会出发另外一个阶段
- 一个阶段的计算执行可以是一个Function,Consumer或者Runnable
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
2.四个核心静态方法,来创建一个异步操作
-
runAsync无 返回值
public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(ASYNC_POOL, runnable); }
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
-
supplyAsync有 返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(ASYNC_POOL, supplier); }
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }
-
codeDemo1 -> future::get依然阻塞
public class CompletableFutureDemo { public static void main(String[] args) throws Exception { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName() + "\t" + "come.in")); System.out.println(future1.get()); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName() + "\t" + "come.in"), threadPoolExecutor); System.out.println(future2.get()); CompletableFuture<String> stringCompletableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "come.in"); return "supplyAsync1"; }); System.out.println(stringCompletableFuture1.get()); CompletableFuture<String> stringCompletableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "come.in"); return "supplyAsync2"; }, threadPoolExecutor); System.out.println(stringCompletableFuture2.get()); threadPoolExecutor.shutdown(); } }
输出结果
ForkJoinPool.commonPool-worker-1 come.in null pool-1-thread-1 come.in null ForkJoinPool.commonPool-worker-1 come.in supplyAsync1 pool-1-thread-1 come.in supplyAsync2
-
codeDemo2
public static void main(String[] args) throws Exception { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return "supplyAsync"; }, threadPoolExecutor) .thenApply(s -> s + 1) .whenComplete((s, throwable) -> { if (throwable == null) { System.out.println("result:" + s); } }).exceptionally(throwable -> throwable.toString()); // 如果没有使用自定义线程池的话,主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭 /* try { TimeUnit.SECONDS.sleep(6); } catch (InterruptedException e) { e.printStackTrace(); } */ System.out.println("main.thread.over"); threadPoolExecutor.shutdown(); }
3.CompletableFuture的优点
- 异步任务结束时,会自动回调某个对象的方法;
- 异步任务出错时,会自动会低啊某个对象的方法;
- 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
四. 函数式编程已经是主流
1.大厂面试题(百度)
- 怎么理解Java多线程?怎么处理并发?线程池有哪几个核心参数?
- Java加锁有哪几种锁?
- 简单说说lock
- hashmap的实现原理?hash冲突怎么解决?为什么使用红黑树?
- spring里面都使用了哪些设计模式?循环依赖怎么解决?
- 项目中哪个地方用了countdownlanch,怎么使用的?
- JVM项目了解过么?栈里面都放什么东西?
- 都用redis来做什么?aof和rdb都怎么做持久化缓存
- mysql的锁机制?mysql的索引是怎么实现的?
- spring实现事务的几种方式
- zookeeper怎么实现分布式锁
- 算法: 求链表倒数第k个元素?
2.Lambda+Stream+链式调用+Java8函数式编程
-
Runnable
无参数无返回值
@FunctionalInterface public interface Runnable { public abstract void run(); }
-
Function
接受一个参数,并且有返回值
@FunctionalInterface public interface Function<T, R> { R apply(T t); }
-
Consumer
接受一个没有返回值
@FunctionalInterface public interface Consumer<T> { void accept(T t); }
-
Supplier
无参有返回值
@FunctionalInterface public interface Supplier<T> { T get(); }
-
BiConsumer
两个参数无返回值
@FunctionalInterface public interface BiConsumer<T, U> { void accept(T t, U u); }
3.CompletaFuture的get()和join()方法的区别
join和get是一样的,唯一的区别就是join不抛出异常
4.电商比价需求
部分代码demo
public static List<String> getPriceByASync(List<NetMall> list,String productName)
{
return list
.stream()
.map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " is %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
五. 常用方法
1.获得结果和触发计算
-
获取结果
-
不见不散
public T get() throws InterruptedException, ExecutionException
-
过时不候
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
-
在没有完成计算的情况下,给我一个替代结果 -> 立即获得结果不阻塞( 如果计算完成,返会计算完成的结果; 如果没有计算完成,返回设置的valueIfAbsent)
public T getNow(T valueIfAbsent)
-
不见不散不抛出异常
public T join()
-
-
主动触发计算
打断计算 -> 返回值是boolean如果是true说明计算没有完成打断成功这时候返回设置的value,否则是false说明计算已经完成了,此时返回的值就是计算出来的值
public boolean complete(T value)
2.对计算结果进行处理
-
thenApply
计算结果存在依赖关系,这两个线程串行化,由于依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
-
handle
有一场也可以往下一步走,根据带的异常参数可以进一步处理
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
codeDemo
public static void m2() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); System.out.println(CompletableFuture.supplyAsync(() -> { return 1; }).handle((f, e) -> { int i = 10 / 0; System.out.println("-----1" + f); return f + 2; }).handle((f, e) -> { System.out.println("-----2" + f); return f + 3; }).handle((f, e) -> { if (e != null) { f = 10; } System.out.println("-----3" + f); return f + 4; }).whenComplete((v, e) -> { if (e == null) { System.out.println("----result: " + v); } else { System.out.println("----fail: " + v); } }).exceptionally(e -> { System.out.println("----fail.e"); e.printStackTrace(); return null; }).join()); threadPoolExecutor.shutdown(); }
控制台输出
-----2null -----310 ----result: 14 14
-
whenComplete和whenCompleteAsync的区别:
- whenComplete: 是执行当前任务的线程来继续执行whenComplete的任务
- whenCompleteAsync: 是执行把whenCompleteAsync这个任务提交给线程池来执行
3.对计算结果进行消费
-
thenAccept
接受任务的处理结果,并消费处理,五返回结果
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
-
补充 -> 代码任务之间的顺序执行
-
thenRun
上个任务执行完继续执行,且本任务不需要上个任务结果
public CompletableFuture<Void> thenRun(Runnable action)
-
thenAccept
上个任务执行完继续执行,需要上个任务的结果,但是本任务无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
-
thenApply
上个任务执行完继续执行,需要上个任务的结果,本任务有返回值
public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn)
-
-
codeDemo
public static void m3() { CompletableFuture.supplyAsync(() -> { return 1; }).thenApply(f -> { return f + 2; }).thenApply(f -> { return f + 3; }).thenAccept(r -> System.out.println(r)); System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join()); }
控制台
6 null null resultA resultB
4.对计算速度进行选用(业务在大家找茬,跑得快出现的多)
public <U> CompletableFuture<U> applyToEither
那个快哪个就输出哪个的结果(思考是否可以用到抢购上,以及如何实行)
codeDemo
/**
* 对计算速度进行选用
*/
public static void m4() {
System.out.println(CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
}), r -> {
return r;
}).join());
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
控制台
1
5.对计算结果进行合并
-
thenCombine
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理,先完成的CompletionStage先等着,等待其他分支任务完成,之后进行处理
-
thenCompose
public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)
thenApply和thenCompose的区别
- thenapply: 是返回的是非CompletableFuture类型
- thenCompose: 用来连接两个CompletableFuture,返回值是新的CompletableFuture