本文主要是介绍Java 并行程序设计模式 (Master-Worker模式),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
并行设计模式属于设计优化的一部分,它是对一些常用的多线程结构的总结和抽象。与串行程序相比,并行程序的结构通常更为复杂。因此,合理的使用并行模式在多线程开发中,更具有积极意义。并行程序设计模式主要有 Future模式 、Master-Worker模式、Guarded Suspension模式、不变模式和生产者-消费者模式,本文主要讲解 Master-Worker模式
Master-Worker模式是常用的并行模式之一,核心思想是,系统由两类进程协作:Master进程和Worker进程。Master进程负责接收和分配任务,worker进程负责子任务。当各个Worker进程将子任务处理完成后,将结果返回给Master进程,由Master进程做归纳和汇总,从而得到系统的最终结果。
示意图如下:
Master-Worker的代码实现
Worker实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | import java.util.Map; import java.util.Queue; public class Worker implements Runnable { //任务队列,用于取得子任务 protected Queue<Object> workQueue; //子任务处理结果集 protected Map<String, Object> resultMap; public void setWorkQueue(Queue<Object> workQueue) { this .workQueue = workQueue; } public void setResultMap(Map<String, Object> resultMap) { this .resultMap = resultMap; } //子任务处理的逻辑,在子类中实现具体逻辑 public Object handle(Object input){ return input; } public void run() { while ( true ){ Object input = workQueue.poll(); if (input == null ) break ; //处理子任务 Object result = handle(input); System.out.println(input.hashCode()); //将处理结果写入结果集 resultMap.put(Integer.toString(input.hashCode()), result); } } } |
Master实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { //任务队列 protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>(); //Worker进程队列 protected Map<String, Thread> threadMap = new HashMap<String, Thread>(); //子任务处理结果集 protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); //是否所有子任务都结束了 public boolean isComplete(){ for (Map.Entry<String, Thread> entry:threadMap.entrySet()){ if (entry.getValue().getState() != Thread.State.TERMINATED) { return false ; } } return true ; } //Master的构造,需要一个Worker进程逻辑,和需要的Worker进程数量 public Master(Worker worker, int countWorker){ worker.setWorkQueue(workQueue); worker.setResultMap(resultMap); for (int i=0; i < countWorker; i++){ threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i))); } } //提交一个任务 public void submit(Object job){ workQueue.add(job); } //返回结果集 public Map<String, Object> getResultMap(){ return resultMap; } //开始运行所有的worker进程,进行处理 public void execute(){ for (Map.Entry<String, Thread> entry:threadMap.entrySet()){ entry.getValue().start(); } } } |
运用这个小框架计算1——100的立方和
PlusWorker的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import java.util.concurrent.TimeUnit; public class PlusWorker extends Worker { @Override public Object handle(Object input) { Integer i = (Integer) input; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return i * i * i; } } |
Test Main方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | import java.util.Map; import java.util.Set; /** * 计算1^3 + 2^3 + 3^3 +......+ 100^3 * @author tanlk * @date 2017年7月21日下午2:30:17 */ public class Test { public static void main(String[] args) { Master master = new Master( new PlusWorker(), 5); //使用5个worker //添加worker到workQueue, workQueue是ConcurrentLinkedQueue,5个PlusWorker进程同时抢workQueue里面的数据 for (int i = 1; i<= 100; i++){ master.submit(i); } master.execute(); int result = 0; Map<String, Object> resultMap = master.getResultMap(); //不需要全部结果执行完就可以返回结果 while (resultMap.size() > 0 || !master.isComplete()){ Set<String> keys = resultMap.keySet(); String key = null ; for (String str : keys) { key = str; break ; } Integer i = null ; if (key != null ) { i = (Integer) resultMap.get(key); } if (i != null ) { result = result + i; } if (key != null ) { resultMap.remove(key); //删除已经取出的数据 } } System.out.println(result); } } |
总结
Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。
类似的框架Fork/Join
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
Fork/Join架构的主要类
RecursiveAction供不需要返回值的任务继续。
RecursiveTask通过泛型参数设置计算的返回值类型。
ForkJoinPool提供了一系列的submit方法,计算任务。ForkJoinPool默认的线程数通过Runtime.availableProcessors()获得,因为在计算密集型的任务中,获得多于处理性核心数的线程并不能获得更多性能提升。
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
doSubmit(task);
return task;
}
Fork/Join实现并行计算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; /** * Fork/Join来优化并行计算 * 计算1^3 + 2^3 + 3^3 +......+ 100^3 * * 继承RecursiveTask,重写compute * @author tanlk * @date 2017年7月21日下午3:24:57 */ public class Calculate extends RecursiveTask<Integer> { /** * */ private static final long serialVersionUID = -3363693028643602343L; final static int THRESHOLD = 4; private Integer start; private Integer length; public Calculate(Integer start, Integer length) { this .length = length; this .start = start; } @Override protected Integer compute() { System.out.println( "Calculate.compute() start:" +start+ ",length:" +length ); int result = 0; if (length < THRESHOLD) { // 小于临界值,直接计算 for (int i = start; i < start + length; i++) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } result = result + (i * i * i); } } else { // 分而治之 int split = length / 2; Calculate c1 = new Calculate(start, split); Calculate c2 = new Calculate(start + split, length - split); c1.fork(); c2.fork(); //fork拆分子任务 result = c1.join() + c2.join(); //join合并子任务结果 } return result; } public static void main(String[] args) throws InterruptedException, ExecutionException { Calculate calculate = new Calculate(1,100); long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); forkJoinPool.submit(calculate); Integer result = calculate.get(); System.out.println( "结果:" +result + ",耗时:" +(System.currentTimeMillis() - start)); } } |
关于fork/join和Master-Worker模式的区别,欢迎大家留言讨论
我认为Master-Worker 这种方式对于大小相同,且任务大小适中可控的任务来说是不错的。但是当任务大小不一致的时候就会遇到问题。就是说,一个worker可能被缠在冗长的任务中,然后其他的worker闲着没事做。
Fork join并不是预先拆分所有任务,而是在执行时动态的决定拆分
这篇关于Java 并行程序设计模式 (Master-Worker模式)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!