SynchronousQueue 的联想

2023-10-19 02:48
文章标签 联想 synchronousqueue

本文主要是介绍SynchronousQueue 的联想,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

SynchronousQueue介绍

SynchronousQueue是一种阻塞队列,该队列没有任务的容量。内部实现采用了一种性能更好的无锁算法
代码实现里的Dual Queue,其中每一个put对应一个take方法。

简单测试代码

public class SynchronousQueueExample {public static void main(String args[]) {final SynchronousQueue queue = new SynchronousQueue();new Thread(new QueueProducer(queue)).start();new Thread(new QueueConsumer(queue)).start();}
}public class QueueProducer implements Runnable {private SynchronousQueue queue;public QueueProducer(SynchronousQueue queue) {this.queue = queue;}@Overridepublic void run() {String event = "FIRST_EVENT";String another_event = "SECOND_EVENT";try {queue.put(event);System.out.printf("[%s] producer event : %s %n", Thread.currentThread().getName(), event);queue.put(another_event);System.out.printf("[%s] producer event : %s %n", Thread.currentThread().getName(), another_event);} catch (InterruptedException e) {e.printStackTrace();}}
}public class QueueConsumer implements Runnable {private SynchronousQueue queue;public QueueConsumer(SynchronousQueue queue) {this.queue = queue;}@Overridepublic void run() {try {String event = (String) queue.take();// thread will block hereSystem.out.printf("[%s] consumed event : %s %n", Thread.currentThread().getName(), event);} catch (InterruptedException e) {e.printStackTrace();}}
}--------------------------
[Thread-0] producer event : FIRST_EVENT 
[Thread-1] consumed event : FIRST_EVENT 
--------------------------

生产者每生产一个,如果没有消费者消费那就发生阻塞上面例子中。结果只打印了FIRST_EVENT ,因为SECOND_EVENT没有调用 queue.take()方法 ,所以没有打印。

绑定 put和take方法
  /*** Puts or takes an item.*/Object transfer(Object e, boolean timed, long nanos) {/** Basic algorithm is to loop trying one of three actions:** 1. If apparently empty or already containing nodes of same*    mode, try to push node on stack and wait for a match,*    returning it, or null if cancelled.** 2. If apparently containing node of complementary mode,*    try to push a fulfilling node on to stack, match*    with corresponding waiting node, pop both from*    stack, and return matched item. The matching or*    unlinking might not actually be necessary because of*    other threads performing action 3:** 3. If top of stack already holds another fulfilling node,*    help it out by doing its match and/or pop*    operations, and then continue. The code for helping*    is essentially the same as for fulfilling, except*    that it doesn't return the item.*/SNode s = null; // constructed/reused as neededint mode = (e == null)? REQUEST : DATA;for (;;) {SNode h = head;if (h == null || h.mode == mode) {  // empty or same-modeif (timed && nanos <= 0) {      // can't waitif (h != null && h.isCancelled())casHead(h, h.next);     // pop cancelled nodeelsereturn null;} else if (casHead(h, s = snode(s, e, h, mode))) {SNode m = awaitFulfill(s, timed, nanos);if (m == s) {               // wait was cancelledclean(s);return null;}if ((h = head) != null && h.next == s)casHead(h, s.next);     // help s's fulfillerreturn mode == REQUEST? m.item : s.item;}} else if (!isFulfilling(h.mode)) { // try to fulfillif (h.isCancelled())            // already cancelledcasHead(h, h.next);         // pop and retryelse if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for (;;) { // loop until matched or waiters disappearSNode m = s.next;       // m is s's matchif (m == null) {        // all waiters are gonecasHead(s, null);   // pop fulfill nodes = null;           // use new node next timebreak;              // restart main loop}SNode mn = m.next;if (m.tryMatch(s)) {casHead(s, mn);     // pop both s and mreturn (mode == REQUEST)? m.item : s.item;} else                  // lost matchs.casNext(m, mn);   // help unlink}}} else {                            // help a fulfillerSNode m = h.next;               // m is h's matchif (m == null)                  // waiter is gonecasHead(h, null);           // pop fulfilling nodeelse {SNode mn = m.next;if (m.tryMatch(h))          // help matchcasHead(h, mn);         // pop both h and melse                        // lost matchh.casNext(m, mn);       // help unlink}}}}

说到SynchronousQueue不由的想到LinkedBlockingQueue,ArrayBlockingQueue,PriorityBlockingQueue

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

####根据不同的需要BlockingQueue有4种具体实现:

  • (1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。
  • (2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制, 若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO(先入先出)顺序排序的。LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样, 导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue。
  • (3)PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
  • (4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。

####ThreadPoolExecutor

 /*** Creates a new <tt>ThreadPoolExecutor</tt> with the given initial* parameters and default thread factory.** @param corePoolSize the number of threads to keep in the* pool, even if they are idle.* @param maximumPoolSize the maximum number of threads to allow in the* pool.* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the keepAliveTime* argument.* @param workQueue the queue to use for holding tasks before they* are executed. This queue will hold only the <tt>Runnable</tt>* tasks submitted by the <tt>execute</tt> method.* @param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached.* @throws IllegalArgumentException if corePoolSize or* keepAliveTime less than zero, or if maximumPoolSize less than or* equal to zero, or if corePoolSize greater than maximumPoolSize.* @throws NullPointerException if <tt>workQueue</tt>* or <tt>handler</tt> are null.*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}

上面的每一个参数很详细的介绍了ThreadPoolExecutor的用法,保持线程的数量,最大化线程的数量,调度时间的间隔,用到的线程队列等。

主要的execute方法。

  /*** Executes the given task sometime in the future.  The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current <tt>RejectedExecutionHandler</tt>.** @param command the task to execute* @throws RejectedExecutionException at discretion of* <tt>RejectedExecutionHandler</tt>, if task cannot be accepted* for execution* @throws NullPointerException if command is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {if (runState == RUNNING && workQueue.offer(command)) {if (runState != RUNNING || poolSize == 0)ensureQueuedTaskHandled(command);}else if (!addIfUnderMaximumPoolSize(command))reject(command); // is shutdown or saturated}}

在线程池中每一个任务被包装成Runnable 类型,传入到execute方法中 , 该方法中会判断是否超过最大线程,是否有空余线程,当调用停止或者达到最大容量会调用RejectedExecutionHandler

/*** Rechecks state after queuing a task. Called from execute when* pool state has been observed to change after queuing a task. If* the task was queued concurrently with a call to shutdownNow,* and is still present in the queue, this task must be removed* and rejected to preserve shutdownNow guarantees.  Otherwise,* this method ensures (unless addThread fails) that there is at* least one live thread to handle this task* @param command the task*/private void ensureQueuedTaskHandled(Runnable command) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();boolean reject = false;Thread t = null;try {int state = runState;if (state != RUNNING && workQueue.remove(command))reject = true;else if (state < STOP &&poolSize < Math.max(corePoolSize, 1) &&!workQueue.isEmpty())t = addThread(null);} finally {mainLock.unlock();}if (reject)reject(command);else if (t != null)t.start();}/*** Invokes the rejected execution handler for the given command.*/void reject(Runnable command) {handler.rejectedExecution(command, this);}
网上的一个测试
public class Test {static ExecutorService e = Executors.newFixedThreadPool(2);static int N = 1000000;public static void main(String[] args) throws Exception {    for (int i = 0; i < 10; i++) {int length = (i == 0) ? 1 : i * 5;System.out.print(length + "\t");System.out.print(doTest(new LinkedBlockingQueue<Integer>(length), N) + "\t");System.out.print(doTest(new ArrayBlockingQueue<Integer>(length), N) + "\t");System.out.print(doTest(new SynchronousQueue<Integer>(), N));System.out.println();}e.shutdown();}private static long doTest(final BlockingQueue<Integer> q, final int n) throws Exception {long t = System.nanoTime();e.submit(new Runnable() {public void run() {for (int i = 0; i < n; i++)try { q.put(i); } catch (InterruptedException ex) {}}});    Long r = e.submit(new Callable<Long>() {public Long call() {long sum = 0;for (int i = 0; i < n; i++)try { sum += q.take(); } catch (InterruptedException ex) {}return sum;}}).get();t = System.nanoTime() - t;return (long)(1000000000.0 * N / t); // Throughput, items/sec}
}    


参考

具体使用那个一个消息队列要看使用场景,多个生产者一个消费者,多个生产者多个消费者以及并发量的大小。

这篇关于SynchronousQueue 的联想的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/236774

相关文章

用了虚拟机后,本机摄像头打不开了(联想电脑thinkpad)

虚拟机有摄像头,我断开了连接,现在本机的摄像头打开就是一个锁 我先把虚拟机的摄像头关了 然后把本机的vm usb关闭了 Win+R),输入services.msc,找到VMware USB Arbitration Service,确保其状态为“关闭 然后打开桌面助手 开启 参考: 联想知识库

联想凌拓「零信任」安全防护数据管理解决方案

联想凌拓「零信任」安全防护数据管理解决方案 据《2023 年数据泄露成本报告》指出,2023 年全球数据泄露的平均成本创下历史新高。 这意味着企业在遭遇了数据泄露以后,平均需要花费 277 天来识别并控制一个活跃的数据泄露 频繁的勒索攻击已成为影响企业持续发展的严重威胁,并给企业带来诸多伤害: 敏感客户信息/商业机密等数据丢失或泄露、业务中断、声誉受损、数据泄露法律责任、金融损失、数据恢

输入框联想功能查询

链接: demo链接 密码: 2wmc 现有一需求,输入框(非搜索框)需要具有联想查询功能:如下效果图 这里分享得是一个测试得demo,开发中可以根据自己情况修改。

怎么使用联想?

怎么使用联想? 1.以万物为师:人、狗、树木2.去掉所有的"不" 1.什么是以万物为师呢? 1.植物:狗尾草–》背包上的粘贴块 2.那联想需要注意哪些呢? 1.要跨界:同一行的联想可能会变成抄袭2.符合基本的逻辑:缺少可行性的联想也不能实现3.总结出点子,第二天再评估,不要提前评估4.选择大创意,不要选小创意:黑巧克力是小创意,热情巧克力是大创意 3.如何联想? 1.相关联想2.随机

联想笔记本bios设置u盘启动

联想笔记本bios设置u盘启动 一:快捷启动     大众化:F12        新款联想:FN + F12 二:进bios改u盘启动     大众化:DEL或F12   新款联想:FN + DEL 进入bios: 1:移动到Security选项---找到Security Boot     改Secure Boot 为 disabled(关闭) --ok 目的:找到Secure Boot并关闭

CES2012台式也强势 联想三大新品电脑

都说如今台式电脑走进了夕阳产业,但联想在CES2012展会上却有三大系列台式新品展出,涉及有定位游戏人群的ideacentre K430,小巧易用省空间的家悦s5以及高度便携型的Q180,一起来看。 ● 联想ideaCentre Q180,号称全球最小的主机 联想ideaCentre Q180小到可能被“忽视” 联想ideaCentre Q180真机实物 联想i

通过SynchronousQueue方式实现线程间数据传递

通过SynchronousQueue方式实现线程间数据传递 线程 A 与线程 B 共同持有一个 SynchronousQueue 的引用,线程 B 调用 take 方法,阻塞以等待; 线程 A 运行 后计算出结果,将结果 put 到 queue 中。 package com.example.synchronousqueuedemo;import java.util.concurrent.Sy

联想y7000p安装配置ubuntu笔记

七月底购买了一个联想y7000p的顶配笔记本,属于预售,刚刚发布,拿到手很兴奋,但是发现安装ubuntu的过程中很是困扰,现在终于成功,所以写一个方便自己与大家。 问题1: 制作好的启动盘进不去安装界面的问题,我是使用rufus制作的启动盘,经过其他电脑测试没有问题,问题发生时间为选择安装ubuntu后,进入一个加载界面,然后会提示各种失败,各种问题,进不去安装界面。 解决方案:在刚进入加载

联想小新 Pro 16:AI启航版,定义笔记本性能新高度

联想小新 Pro 16 AI元启版笔记本以其搭载的英特尔酷睿Ultra 9-185H处理器,16核心22线程的豪华配置,成为市场上的性能巨擘。搭配32GB LPDDR5x RAM和1TB PCIe 4.0 SSD,这款笔记本在处理高负载任务时展现出无与伦比的流畅性,无论是图形设计、视频编辑还是3D渲染,都能轻松应对。 视觉盛宴:16英寸120Hz高刷新率屏幕 屏幕作为用户与笔记本交互的第一

联想笔记本安装ubuntu16.04没WiFi

http://www.linuxidc.com/Linux/2015-04/116077.htm