当前位置:首页 > Java技术 > Java之多线程中的Master-Worker模式

Java之多线程中的Master-Worker模式

该模式的好处是,将大任务拆解成若干小任务并并行执行,从而提高系统吞吐量。

定义Worker进程,负责处理实际任务。
/*具体工作对象*/
static abstract class Worker<T, R> implements Runnable {
private static final UtilsLog lg = UtilsLog.getLogger(Worker.class);
protected Queue<T> workQueue;//持有Master的任务队列
protected Map<String, R> resultMap;//用于存储结果集,key为任务对应的唯一标识符

public void setWorkQueue(Queue<T> workQueue) {
this.workQueue = workQueue;
}

public void setResultMap(Map<String, R> resultMap) {
this.resultMap = resultMap;
}

public abstract R handler(T entity);

@Override
public void run() {
while (true) {
T childWork = workQueue.poll();
if (childWork == null) {
lg.e("已经没有任务在队列中等待执行");
break;
}
//处理子任务
R result = handler(childWork);
resultMap.put(Integer.toString(childWork.hashCode()), result);
}
}
}
义Master进程,负责接收和分配任务。Master会在提交任务的同时立即返回结果集,由于此处的 getResultMap属于引用传递,因此属性resultMap的修改会同步至业务层。
public static class Master<T, R> {
private static final UtilsLog lg = UtilsLog.getLogger(Master.class);
protected Queue<T> workQueue;//用于存储任务集
protected Map<String, Thread> threadMap;//存储执行任务的线程集
protected Map<String, R> resultMap;//存储相关结果

@TargetApi(Build.VERSION_CODES.LOLLIPOP)
public Master(Worker<T, R> work, int threadCount) {
workQueue = new ConcurrentLinkedDeque<T>();
threadMap = new HashMap<>();
resultMap = new HashMap<>();

work.setWorkQueue(workQueue);
work.setResultMap(resultMap);
for (int i = 0; i < threadCount; i++) {
threadMap.put(Integer.toString(i), new Thread(work, "thread tag with " + Integer.toString(i)));
}
}

//是否所有的子任务都结束了
public boolean isComplete() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}

public Map<String, R> getResultMap() {
return resultMap;
}

public Master addJob(T job) {
workQueue.add(job);
return this;
}

public void execute() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
entry.getValue().start();
}
}
}
业务层调用方案如下,
        Master<Integer, Integer> master = new Master<>(new Worker<Integer, Integer>() {
@Override
public Integer handler(Integer entity) {
int max = 50, min = 0;
UtilsThread.sleepIgnoreInteruptedException(new Random().nextInt(max) % (max - min + 1) + min);//随机模拟耗时操作
lg.e("执行handler程序 with value:" + entity);
return entity * entity;
}
}, 3);
int jobCount = 10;//任务数
for (int i = 0; i < jobCount; i++) {
master.addJob(i);
}
master.execute();

Map<String, Integer> resultMap = master.getResultMap();
while (true) {
int resultMapSize = resultMap.size();
// lg.e("并行执行结果集中已有数据量:" + resultMapSize);//此处resultMap持有Master中结果集的引用,因此在线程不断执行的过程中不断刷新结果姐,会连带导致这里值的改变
if (master.isComplete()) {
break;
}
}
执行结果如下,
Java之多线程中的Master-Worker模式 _ JavaClub全栈架构师技术笔记
另外,也应注意到,在执行 handler方式,到结果集中写入数据是有延时的, 这在开发中需要格外注意,务必使用master. isComplete判断任务完成状况
Java之多线程中的Master-Worker模式 _ JavaClub全栈架构师技术笔记






作者:小轩948
来源链接:https://www.cnblogs.com/linux007/p/5790490.html

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/java/68782.html

分享给朋友: