本文主要是介绍16 Master-Worker模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
6.3 Master-Worker模式
Master-Worker模式是常用的并行计算模式,它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。Master负责接收和分配任务,Woker负责处理子任务。当各个Worker子进程处理完后,会将结果返回给Master,由Master做归纳和总结。其好处是能将一个大任务分解成若干小任务,并行执行,从而提高系统的吞吐量。
Master-Worker模式执行原理图:
代码示例:
/** * Client操作,建立任务进行提交执行 * @author Vision_TXG * */ public class MainTest {
public static void main(String[] args) { Random r = new Random(); Master master = new Master(new Worker(),10); System.out.println(Runtime.getRuntime().availableProcessors()); //Runtime.getRuntiem().availableProcessors(); 当前可使用的线程数 for(int i = 1;i<=100;i++) { Task t = new Task(); t.setId(i); t.setName("任务" + i); t.setPrice(r.nextInt(1000)); master.submit(t); } master.execute(); long start = System.currentTimeMillis(); /* * 循环判断所有线程都执行完毕 */ while(true) { if(master.isComplete()) { long end = System.currentTimeMillis()-start; int ret = master.getResult(); System.out.println(ret + " 任务结束。。。耗时:"+end); break; } } }
}
/** * 如原理图中Master * @author Vision_TXG * */ public class Master { //承装任务的集合 private ConcurrentLinkedQueue<Task> workerQueue = new ConcurrentLinkedQueue<Task>(); //使用HashMap承装所有的worker对象 private HashMap<String,Thread> workers = new HashMap<String,Thread>(); //使用一个容器承装每一个worker并非执行任务的结果集 private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String,Object>(); //构造方法 public Master(Worker worker,int workerCount) { //每一个worker对象都要有一个Master的引用 ,workerQueue用于任务的领用,resultMap用于任务的提交 worker.setWorkerQueue(this.workerQueue); worker.setResultMap(this.resultMap); for(int i = 0;i< workerCount;i++) { //key为每一个worker的名字,value表示线程执行对象 workers.put("子节点"+Integer.toString(i), new Thread(worker)); } } //提交方法 public void submit(Task task) { this.workerQueue.add(task); } //一个执行的方法用于启动应用程序 public void execute() { for(Map.Entry<String,Thread> me :workers.entrySet() ) { me.getValue().start(); } } //判断线程是否执行完毕 public boolean isComplete() { // TODO Auto-generated method stub for(Map.Entry<String, Thread> me : workers.entrySet()) { if(me.getValue().getState() != Thread.State.TERMINATED/*线程停止*/) { return false; } } return true; } //获取业务结果 public int getResult() { // TODO Auto-generated method stub int ret = 0; for(Map.Entry<String, Object> me : resultMap.entrySet()) { ret += (Integer)me.getValue(); } return ret; } }
/** * 执行线程 * @author Vision_TXG * */ public class Worker implements Runnable {
private ConcurrentLinkedQueue<Task> workerQueue; private ConcurrentHashMap<String, Object> resultMap;
public void setWorkerQueue(ConcurrentLinkedQueue<Task> workerQueue) { // TODO Auto-generated method stub this.workerQueue = workerQueue; }
public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { // TODO Auto-generated method stub this.resultMap = resultMap; }
@Override public void run() { // TODO Auto-generated method stub while(true) { Task input = this.workerQueue.poll(); if(input==null) { break; } //真正的做业务处理 Object ouput = handle(input); this.resultMap.put(Integer.toString(input.getId()), ouput); } }
private Object handle(Task input) { Object output = null; try { //表示处理task任务的耗时 Thread.sleep(500); output = input.getPrice(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return output; // TODO Auto-generated method stub } }
/** * 实现类 * @author Vision_TXG * */ public class Task {
private int id; private String name; private int price; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } } |
这篇关于16 Master-Worker模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!