并行框架的使用背景
经常会有这样的调用场景:app(或web前端)调用后台的一个接口,该接口接到该请求后,需要调用其他多个微服务来获取数据,最终汇总一个最终结果返回给用户。
整如用户请求 我的订单”,后台在收到请求后,就需要去调用用户详情RPC、商品详情RPC、库存RPC,优惠券等等很多个服务。有些服务是可以并行去请求的,但有些服务是依赖于某个服务的返回值的(如查库存、优惠券,就依赖于商品详情回复到达后才能去请求)。
CompleteableFuture VS AsyncTool
CompleteableFuture大家都用过,里面有supply、then、combine、allOf等等方法,都可以用来接收一个任务,最终将多个任务汇总成一个结果。
但有一个问题,你supply一个任务后,这个任务就黑盒了。如果你编排了很多个任务,每一个任务的执行情况,执行到哪一步了,每一步的执行结果情况,我们是不知道的。只能等它最终执行完毕后,最后汇总结果。
一个并行框架,它最好是对每一步的执行都能监控。每一步的执行结果,无论成功与失败,它应该有个回调,才算完整。拥有回调的任务,可以监控任务的执行状况,如果执行失败、超时,可以记录异常信息或者处理个性化的默认值。
CompleteableFuture中也有一些回调方法,例如:thenAccept(),whenComplete(),handle(),exceptionally()等,这些方法也能支持任务的回调,但是前提是任务执行了,才能完成回调。在某些场景中,有些任务单元是可能被SKIP跳过不执行的,不执行的任务也应该有回调。
AsyncTool方法说明
Worker的定义如下,实现IWorker,ICallback函数式接口
,并重写下面的4个方法。
4个方法的说明如下:
Worker的定义如下,实现IWorker,ICallback函数式接口,并重写下面的4个方法。4个方法的说明如下:
- begin():Worker开始执行前,先回调begin()。
- action():Worker中执行耗时操作的地方,比如RPC接口调用。
- result():action()执行完毕后,回调result方法,可以在此处处理action中的返回值。
- defaultValue():整个Worker执行异常,或者超时,会回调defaultValue(),Worker返回默认值。
如果没有实现ICallback,会默认执行DefaultCallback的回调方法。DefaultCallback是一个空的回调,里面没有任何逻辑。
-
通过执行器类Async的beginWork方法提交任务执行。
-
Timeout:全组任务超时时间设定,如果Worker任务超时,则Worker结果使用defaultValue()默认值。
-
ExecutorService executorService:自定义线程池,不自定义的话,就走默认的COMMON_POOL。默认的线程池是不定长线程池。
-
WorkerWrapper… workerWrapper:起始任务,可以是多个。注意不要提交中间节点的任务,只需要提交起始任务即可,编排的后续任务会自动执行。
-
定义几个测试worker:
package com.itjing.base.concurrency.jdasync;
import cn.hutool.core.date.SystemClock;
import cn.hutool.json.JSONUtil;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
public class WorkerA implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
/**
* Worker开始的时候先执行begin
*/
@Override
public void begin() {
System.out.println("A - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
}
/**
* Worker中耗时操作在此执行RPC/IO
* @param object object
* @param allWrappers 任务包装
* @return
*/
@Override
public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
Integer res = object + 1;
return res;
}
/**
* action执行结果的回调
* @param success
* @param param
* @param workResult
*/
@Override
public void result(boolean success, Integer param, WorkResult<Integer> workResult) {
System.out.println("A - param:" + JSONUtil.toJsonStr(param));
System.out.println("A - result:" + JSONUtil.toJsonStr(workResult));
System.out.println("A - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
}
/**
* Worker异常时的回调
* @return
*/
@Override
public Integer defaultValue() {
System.out.println("A - defaultValue");
return 101;
}
}
package com.itjing.base.concurrency.jdasync;
import cn.hutool.core.date.SystemClock;
import cn.hutool.json.JSONUtil;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
public class WorkerB implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
/**
* Worker开始的时候先执行begin
*/
@Override
public void begin() {
System.out.println("B - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
}
/**
* Worker中耗时操作在此执行RPC/IO
*
* @param object object
* @param allWrappers 任务包装
* @return
*/
@Override
public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
Integer res = object + 2;
return res;
}
/**
* action执行结果的回调
*
* @param success
* @param param
* @param workResult
*/
@Override
public void result(boolean success, Integer param, WorkResult<Integer> workResult) {
System.out.println("B - param:" + JSONUtil.toJsonStr(param));
System.out.println("B - result:" + JSONUtil.toJsonStr(workResult));
System.out.println("B - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
}
/**
* Worker异常时的回调
*
* @return
*/
@Override
public Integer defaultValue() {
System.out.println("B - defaultValue");
return 102;
}
}
package com.itjing.base.concurrency.jdasync;
import cn.hutool.core.date.SystemClock;
import cn.hutool.json.JSONUtil;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
public class WorkerC implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
/**
* Worker开始的时候先执行begin
*/
@Override
public void begin() {
System.out.println("C - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
}
/**
* Worker中耗时操作在此执行RPC/IO
* @param object object
* @param allWrappers 任务包装
* @return
*/
@Override
public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
Integer res = object + 3;
return res;
}
/**
* action执行结果的回调
* @param success
* @param param
* @param workResult
*/
@Override
public void result(boolean success, Integer param, WorkResult<Integer> workResult) {
System.out.println("C - param:" + JSONUtil.toJsonStr(param));
System.out.println("C - result:" + JSONUtil.toJsonStr(workResult));
System.out.println("C - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
}
/**
* Worker异常时的回调
* @return
*/
@Override
public Integer defaultValue() {
System.out.println("C - defaultValue");
return 103;
}
}
开发常见场景
串行场景
next写法:
public static void main(String[] args) {
//引入Worker工作单元
WorkerA workerA = new WorkerA();
WorkerB workerB = new WorkerB();
WorkerC workerC = new WorkerC();
//包装Worker,编排串行顺序:C <- B <- A
//C是最后一步,它没有next
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(3)//3+3
.build();
//B的next是C
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(2)//2+2
.next(wrapperC)
.build();
//A的next是B
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(1)//1+1
.next(wrapperB)
.build();
try {
//Action
Async.beginWork(1000, wrapperA);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
Async.shutDown();
}
depend写法:
public static void main(String[] args) throws ExecutionException, InterruptedException {
//引入Worker工作单元
WorkerA workerA = new WorkerA();
WorkerB workerB = new WorkerB();
WorkerC workerC = new WorkerC();
//A没有depend
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(1)
.build();
//B的depend是A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(2)
.depend(wrapperA)
.build();
//C的depend是B
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(3)
.depend(wrapperB)
.build();
//begin
Async.beginWork(1000, wrapperA);
}
并行场景
public static void main(String[] args) {
//引入Worker工作单元
WorkerA workerA = new WorkerA();
WorkerB workerB = new WorkerB();
WorkerC workerC = new WorkerC();
/**
* 包装Worker,编排并行顺序
*/
//A
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(1)//1+1
.build();
//B
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(2)//2+2
.build();
//C
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(3)//3+3
.build();
try {
//3个WorkerWrapper一起begin
Async.beginWork(1000, wrapperA, wrapperB, wrapperC);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
阻塞等待 - 先串行,后并行
改造WorkerB、WorkerC的action:
public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
WorkerWrapper workerA = allWrappers.get("workerA");
System.out.println("获取workerA的结果:" + JSONUtil.toJsonStr(workerA));
Integer res = 0;
if (Objects.nonNull(workerA)) {
Integer result = (Integer) workerA.getWorkResult().getResult();
res = 2;
res += result;
} else {
res = object + 2;
}
return res;
}
next写法:
public static void nextWork() {
//引入Worker工作单元
WorkerA workerA = new WorkerA();
WorkerB workerB = new WorkerB();
WorkerC workerC = new WorkerC();
//C是最后一步,它没有next
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(null)//没有参数,根据A的返回值+3
.build();
//B是最后一步,它没有next
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(null)//没有参数,根据A的返回值+2
.build();
//A的next是B、C
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(1)//1+1
//next是B、C
.next(wrapperB, wrapperC)
.build();
try {
//Action
Async.beginWork(1000, wrapperA);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
depend写法:
//A没有depend,就是开始
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(1)
.build();
//C depend A
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(null)
.depend(wrapperA)
.build();
//B depend A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(null)
.depend(wrapperA)
.build();
阻塞等待 - 先并行,后串行
改造WorkerA的action方法:
public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
WorkerWrapper workerB = allWrappers.get("workerB");
System.out.println("获取workerB的结果:" + JSONUtil.toJsonStr(workerB));
WorkerWrapper workerC = allWrappers.get("workerC");
System.out.println("获取workerC的结果:" + JSONUtil.toJsonStr(workerC));
Integer res = null;
if (Objects.nonNull(workerB) && Objects.nonNull(workerC)) {
Integer resultB = (Integer) workerB.getWorkResult().getResult();
Integer resultC = (Integer) workerC.getWorkResult().getResult();
res = resultB + resultC;
} else {
res = object + 1;
}
return res;
}
next写法:
public static void nextWork() {
//引入Worker工作单元
WorkerA workerA = new WorkerA();
WorkerB workerB = new WorkerB();
WorkerC workerC = new WorkerC();
//A是最后一步,没有next
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(null)//参数是null,A = B + C
.build();
//C next A
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(3)//3+3 = 6
.next(wrapperA)
.build();
//B next A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(2)//2+2 = 4
.next(wrapperA)
.build();
try {new SynchronousQueue<Runnable>();
//Action
Async.beginWork(4000, wrapperB, wrapperC);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
depend写法:
//C没有depend,是起始节点
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(3)//3+3 = 6
.build();
//B没有depend,是起始节点
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(2)//2+2 = 4
.build();
//A depend B,C
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(null)//参数是null,A = B + C
.depend(wrapperB, wrapperC)
.build();
评论区