本文主要是介绍java线程深度解析(四)——并发模型(Master-Worker),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
http://blog.csdn.net/daybreak1209/article/details/51372929
二、Master-worker ——分而治之
Master-worker常用的并行模式之一,核心思想是由两个进程协作工作,master负责接收和分配任务,worker负责处理任务,并把处理结果返回给Master进程,由Master进行汇总,返回给客户端。
它的好处在于能把一个大任务分解成若干个小任务,并行执行,提高系统吞吐量。而对于客户端而言,一旦提交任务,mater进程立刻返回一个处理结果,并非等待系统处理完毕再返回。
下面利用Master-Worker模型实现一个计算1-100立方和,思路如下:
1、将计算任务分配成100个子任务,每个子任务用于计算单独数字的立方和
2、master产生固定个数的worker用于处理这个子任务
3、worker开始计算,并把结果写入resultMap中
4、master负责汇总map中的数据,求和后将最终结果返回给客户端。
Worker类的实现
- public class Worker implements Runnable{
- //任务队列,用于每个子任务
- protected Queue<Object> workQueue;
- //子任务处理结果集
- protected Map<String,Object> resultMap;
-
- public void setWorkQueue(Queue<Object> workQueue)
- {
- this.workQueue=workQueue;
- }
-
- public void setResultMap(Map<String, Object> resultMap) {
- this.resultMap = resultMap;
- }
-
- //子任务处理逻辑,在子类中具体实现
- public Object handle(Object input)
- {
- return input;
- }
-
- @Override
- public void run() {
- while(true)
- {
- //获取子任务
- Object input =workQueue.poll();//remove the head of queue
- if(input==null) break;
- //处理子任务
- Object re=handle(input);
- //将处理结果写入结果集
- resultMap.put(Integer.toString(input.hashCode()),re);
- }
- }
- }
Worker子类实现:单个数字立方计算,重写worker的handle方法
- public class SubWorker extends Worker{
- public Object handle(Object input)
- {
- Integer i=(Integer)input;
- return i*i*i;
- }
- }
Master类的实现
- public class Master {
- //任务队列
- protected Queue<Object> workQueue=new ConcurrentLinkedQueue<Object>();
- //work进程队列
- protected Map<String,Thread> threadMap=new HashMap<String,Thread>();
- //子任务处理结果集
- protected Map<String,Object> resultMap=new ConcurrentHashMap<String,Object>();
-
- //是否所有的子任务都结束了
- public boolean isComplete()
- {
- for(Map.Entry<String, Thread> entry:threadMap.entrySet())
- {
- if(entry.getValue().getState()!=Thread.State.TERMINATED)
- {
- return false;
- }
- }
- return true;
- }
-
- //master的构造,需要一个worker线程和worker的进程书香
- public Master(Worker worker,int countWorker)
- {
- worker.setWorkQueue(workQueue);
- worker.setResultMap(resultMap);
- for(int i=0;i<countWorker;i++)
- {
- threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i)));
- }
- }
-
- //提交任务-放入进程队列中
- public void submit(Object job)
- {
- workQueue.add(job);
- System.out.println("任务队列size:"+workQueue.size());
- }
-
- //返回子任务结果集
- public Map<String,Object> getResultMap()
- {
- return resultMap;
-
- }
-
- //开始运行所有的worker进程
- public void execute()
- {
- for(Map.Entry<String, Thread> entry:threadMap.entrySet())
- {
- entry.getValue().start();//调用子线程 worker.run
- System.out.println(entry.getValue());
- }
- }
- }
客户端实现
- public class Client {
- public static void main(String[] args) {
- Master master=new Master(new SubWorker(), 5);//指定5个
- for(int i=0;i<100;i++)
- master.submit(i);
- master.execute();
- int re=0;
- Map<String,Object> resultMap=master.getResultMap();
- while(resultMap.size()>0 || !master.isComplete())
- {
- //不需要等待所有的worker执行完就可以计算结果
- Set<String> keys=resultMap.keySet();
- String key=null;
- for(String k:keys)
- {
- key=k;
- break;
- }
- Integer i=null;
- if(key!=null)
- i=(Integer)resultMap.get(key);
- if(i!=null)
- re+=i;//最终计算结果
-
- if(key!=null)
- resultMap.remove(key);
- }
- System.out.println(re); //打印最后计算结果
- }
- }
最终结果:
任务队列大小
size:1-100
五个线程数:
Thread[0,5,main]
Thread[1,5,main]
Thread[2,5,main]
Thread[3,5,main]
Thread[4,5,main]
最终计算结果:
24502500
在整个计算中,master和worker 的执行完全是异步的,master不必等到每所有worker完成,就可以进行求和操作。在获得部分子任务结果时,就已经可以对结果进行计算,从而提高并发度和吞吐量。
这篇关于java线程深度解析(四)——并发模型(Master-Worker)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!