设计模式一:生产者消费者模式(及LinkedBlockingQueue的介绍使用)

本文主要是介绍设计模式一:生产者消费者模式(及LinkedBlockingQueue的介绍使用),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

一、为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

二、什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

三、生产者消费者模式实战

实例代码1:
package cn.thread;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;/*** 多线程模拟实现生产者/消费者模型*  * @author tkhoon*/
public class BlockingQueueTest2 {/*** * 定义装苹果的篮子* */public class Basket {// 篮子,能够容纳3个苹果BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);// 生产苹果,放入篮子public void produce() throws InterruptedException {// put方法放入一个苹果,若basket满了,等到basket有位置basket.put("An apple");}// 消费苹果,从篮子中取走public String consume() throws InterruptedException {// take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)return basket.take();}}// 定义苹果生产者class Producer implements Runnable {private String instance;private Basket basket;public Producer(String instance, Basket basket) {this.instance = instance;this.basket = basket;}public void run() {try {while (true) {// 生产苹果System.out.println("生产者准备生产苹果:" + instance);basket.produce();System.out.println("!生产者生产苹果完毕:" + instance);// 休眠300msThread.sleep(300);}} catch (InterruptedException ex) {System.out.println("Producer Interrupted");}}}// 定义苹果消费者class Consumer implements Runnable {private String instance;private Basket basket;public Consumer(String instance, Basket basket) {this.instance = instance;this.basket = basket;}public void run() {try {while (true) {// 消费苹果System.out.println("消费者准备消费苹果:" + instance);System.out.println(basket.consume());System.out.println("!消费者消费苹果完毕:" + instance);// 休眠1000msThread.sleep(1000);}} catch (InterruptedException ex) {System.out.println("Consumer Interrupted");}}}public static void main(String[] args) {BlockingQueueTest2 test = new BlockingQueueTest2();// 建立一个装苹果的篮子Basket basket = test.new Basket();ExecutorService service = Executors.newCachedThreadPool();Producer producer = test.new Producer("生产者001", basket);Producer producer2 = test.new Producer("生产者002", basket);Consumer consumer = test.new Consumer("消费者001", basket);service.submit(producer);service.submit(producer2);service.submit(consumer);// 程序运行5s后,所有任务停止
//        try {
//            Thread.sleep(1000 * 5);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//        service.shutdownNow();}}

代码中关于LinkedBlockingQueue的介绍会在稍后进行介绍。


实例代码2:
public class QuickEmailToWikiExtractor extends AbstractExtractor {private ThreadPoolExecutor      threadsPool;private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;public QuickEmailToWikiExtractor() {emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>();int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000));}
//执行的主方法
public void extract() {logger.debug("开始" + getExtractorName() + "。。");long start = System.currentTimeMillis();//抽取所有邮件放到队列里(生产者)new ExtractEmailTask().start();// 把队列里的文章插入到Wiki(消费者)insertToWiki();long end = System.currentTimeMillis();double cost = (end - start) / 1000;logger.debug("完成" + getExtractorName() + ",花费时间:" + cost + "秒");}/*** 把队列里的文章插入到Wiki*/private void insertToWiki() {//登录wiki,每间隔一段时间需要登录一次confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);while (true) {//2秒内取不到就退出ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);if (email == null) {break;}threadsPool.submit(new insertToWikiTask(email));}}protected List<Article> extractEmail() {List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails();if (allEmails == null) {return null;}for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {emailQueue.offer(exchangeEmailShallowDTO);}return null;}/*** 抽取邮件任务(生产者类)** @author tengfei.fangtf*/public class ExtractEmailTask extends Thread {public void run() {extractEmail();}}
}


四、LinkedBlockingQueue介绍使用
在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。

注:什么叫线程安全?这个首先要明确。线程安全就是说多线程访问同一代码,不会产生不确定的结果。

由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

常用API:

offer

将元素插入队列,成功返回true,如果当前没有可用的空间,则返回false

offer(E e, long timeout, TimeUnit unit) 

将元素插入队列,在到达指定的等待时间前等待可用的空间

E poll(long timeout, TimeUnit unit) 

获取并移除队列的头部,在指定的等待时间前等待可用的元素

void put(E e) 

将元素插入队列,将等待可用的空间(堵塞)

take() 

获取并移除队列的头部,在元素变得可用之前一直等待(堵塞)
  1.     //offer方法为非堵塞的  
  2.    //queue.offer(rnd.nextInt(100), 1, TimeUnit.SECONDS); //等待1秒后还不能加入队列则返回失败,放弃加入  
  3.  //queue.offer(rnd.nextInt(100));  
  1.     //poll方法为非堵塞的  
  2.    //Integer value = queue.poll(1, TimeUnit.SECONDS); //等待1秒后还没有数据可取则返回失败,放弃获取  
  3.   //Integer value = queue.poll();  


总结: 在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。

使用非阻塞队列,虽然能即时返回结果(消费结果),但必须自行编码解决返回为空的情况处理(以及消费重试等问题)。

另外他们都是线程安全的,不用考虑线程同步问题。


这篇关于设计模式一:生产者消费者模式(及LinkedBlockingQueue的介绍使用)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/807813

相关文章

使用Python自建轻量级的HTTP调试工具

《使用Python自建轻量级的HTTP调试工具》这篇文章主要为大家详细介绍了如何使用Python自建一个轻量级的HTTP调试工具,文中的示例代码讲解详细,感兴趣的小伙伴可以参考一下... 目录一、为什么需要自建工具二、核心功能设计三、技术选型四、分步实现五、进阶优化技巧六、使用示例七、性能对比八、扩展方向建

使用Python实现一键隐藏屏幕并锁定输入

《使用Python实现一键隐藏屏幕并锁定输入》本文主要介绍了使用Python编写一个一键隐藏屏幕并锁定输入的黑科技程序,能够在指定热键触发后立即遮挡屏幕,并禁止一切键盘鼠标输入,这样就再也不用担心自己... 目录1. 概述2. 功能亮点3.代码实现4.使用方法5. 展示效果6. 代码优化与拓展7. 总结1.

使用Python开发一个简单的本地图片服务器

《使用Python开发一个简单的本地图片服务器》本文介绍了如何结合wxPython构建的图形用户界面GUI和Python内建的Web服务器功能,在本地网络中搭建一个私人的,即开即用的网页相册,文中的示... 目录项目目标核心技术栈代码深度解析完整代码工作流程主要功能与优势潜在改进与思考运行结果总结你是否曾经

Linux中的计划任务(crontab)使用方式

《Linux中的计划任务(crontab)使用方式》:本文主要介绍Linux中的计划任务(crontab)使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、前言1、linux的起源与发展2、什么是计划任务(crontab)二、crontab基础1、cro

kotlin中const 和val的区别及使用场景分析

《kotlin中const和val的区别及使用场景分析》在Kotlin中,const和val都是用来声明常量的,但它们的使用场景和功能有所不同,下面给大家介绍kotlin中const和val的区别,... 目录kotlin中const 和val的区别1. val:2. const:二 代码示例1 Java

C++变换迭代器使用方法小结

《C++变换迭代器使用方法小结》本文主要介绍了C++变换迭代器使用方法小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1、源码2、代码解析代码解析:transform_iterator1. transform_iterat

C++中std::distance使用方法示例

《C++中std::distance使用方法示例》std::distance是C++标准库中的一个函数,用于计算两个迭代器之间的距离,本文主要介绍了C++中std::distance使用方法示例,具... 目录语法使用方式解释示例输出:其他说明:总结std::distance&n编程bsp;是 C++ 标准

vue使用docxtemplater导出word

《vue使用docxtemplater导出word》docxtemplater是一种邮件合并工具,以编程方式使用并处理条件、循环,并且可以扩展以插入任何内容,下面我们来看看如何使用docxtempl... 目录docxtemplatervue使用docxtemplater导出word安装常用语法 封装导出方

Linux换行符的使用方法详解

《Linux换行符的使用方法详解》本文介绍了Linux中常用的换行符LF及其在文件中的表示,展示了如何使用sed命令替换换行符,并列举了与换行符处理相关的Linux命令,通过代码讲解的非常详细,需要的... 目录简介检测文件中的换行符使用 cat -A 查看换行符使用 od -c 检查字符换行符格式转换将

使用Jackson进行JSON生成与解析的新手指南

《使用Jackson进行JSON生成与解析的新手指南》这篇文章主要为大家详细介绍了如何使用Jackson进行JSON生成与解析处理,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 核心依赖2. 基础用法2.1 对象转 jsON(序列化)2.2 JSON 转对象(反序列化)3.