disruptor(二)

2023-12-11 15:58
文章标签 disruptor

本文主要是介绍disruptor(二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

public class Trade {

private String id;//id
private String name;
private double price;//金额
private AtomicInteger count = new AtomicInteger();

public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public AtomicInteger getCount() {
return count;
}
public void setCount(AtomicInteger count) {
this.count = count;
}

}


public class TradeHandler implements EventHandler<Trade>,WorkHandler<Trade>{


@Override
public void onEvent(Trade arg0) throws Exception {
//这里做具体的消费逻辑
arg0.setId(UUID.randomUUID().toString());
System.out.println(arg0.getId()+" 价格="+arg0.getPrice());
}


@Override
public void onEvent(Trade arg0, long arg1, boolean arg2) throws Exception {
this.onEvent(arg0);
}


}


public class Main {


public static void main(String[] args) throws Exception, ExecutionException {
int BUFFER_SIZE = 1024;
int THREAD_NUMBERS = 4;

final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {


@Override
public Trade newInstance() {
return new Trade();
}
}, BUFFER_SIZE,new YieldingWaitStrategy());

//创建线程池
ExecutorService executos = Executors.newFixedThreadPool(THREAD_NUMBERS);

//创建SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

//创建消息处理器
BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(ringBuffer, sequenceBarrier, new TradeHandler());

//把消费者信息引入到生产端
ringBuffer.addGatingSequences(transProcessor.getSequence());
//把消息处理器提交到线程池
executos.submit(transProcessor);

Future<?> future = executos.submit(new Callable<Void>() {


@Override
public Void call() throws Exception {
// TODO Auto-generated method stub
long seq;
for(int i=0;i<10;i++){
seq = ringBuffer.next();
ringBuffer.get(seq).setPrice(Math.random()*9999);
ringBuffer.publish(seq);
}
return null;
}
});

future.get();
Thread.sleep(1000);
transProcessor.halt();
executos.shutdown();
}


}


public class Main2 {


public static void main(String[] args) throws Exception {
int BUFFER_SIZE = 1024;
int THREAD_NUMBERS = 4;

final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {


@Override
public Trade newInstance() {
return new Trade();
}
}, BUFFER_SIZE,new YieldingWaitStrategy());

//创建线程池
ExecutorService executos = Executors.newFixedThreadPool(THREAD_NUMBERS);
//创建SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

WorkHandler<Trade> handler = new TradeHandler();

WorkerPool<Trade> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(),handler);

workerPool.start(executos);

for(int i=0;i<8;i++){
long seq = ringBuffer.next();
ringBuffer.get(seq).setPrice(Math.random()*9999);
ringBuffer.publish(seq);
}
Thread.sleep(1000);
workerPool.halt();
executos.shutdown();
}
}

这篇关于disruptor(二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/481241

相关文章

无锁环形队列框架Disruptor不同策略说明

* <pre>* BlockingWaitStrategy: 这是默认的策略,使用BlockingWaitStrategy和使用BlockingQueue是非常类似的,* 他们都使用锁和条件Condition进行数据的监控和线程的唤醒,因为涉及到线程的切换,BlockingWaitStrategy策略* 是最节省cpu,但是高并发情况下性能表现最差的等待策略.* SleepingW

disruptor(2)-等待策略

在生产者消费者模式中,等待策略对消费者而言,是一个获取消息感知的方式,可以用轮询,事件触发来实现。 对于生产者而言,等待策略表现在队列池已满的情况,如何等待消息被消费,在一般不重要的场景中,我们可能是就直接抛弃了。 如我们自己使用queue作为等待队列,我们消费时一般用poll()这么去等待数据到来,如果直接用while循环,那cpu会消耗的很严重。这时我们常见的解决办法是在while循环中加

记一次Disruptor排坑

Abstract 我们在项目中使用了Disruptor作为事件总线,实现的业务是:用户消费完成成就,完成八个成就之后自动获得第九个成就——获得前面八个成就。 这个项目不是我参与的,当时我自己封装的高性能事件总线(Electrons)已经完全能胜任上述功能,但是由于小伙伴当时对我的这个组件没有特别研究,仍然感觉我的这个就是顺序执行前面几个监听器,就没有用。 这个项目在测试环境中一直没有问题,原

Java并发编程---Disruptor体验

最近在学习中接触到Disruptor这个框架,虽然目前没有能实际运用到项目中,但是做个了解,在面试吹牛逼?的时候还能避免尴尬!学的不深,仅限于简单的使用和特性的认识。 什么是Disruptor Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一

秒级达百万高并发框架Disruptor

1、起源 Disruptor最初由lmax.com开发,2010年在Qcon公开发表,并于2011年开源,企业应用软件专家Martin Fowler专门撰写长文介绍,同年它还获得了Oracle官方的Duke大奖。其官网定义为:“High Performance Inter-Thread Messaging Library”,即:线程间的高性能消息框架。其实JDK已经为我们提供了很多开箱即用的

处理高并发高性能队列-Disruptor

已经不记得最早接触到 Disruptor 是什么时候了,只记得发现它的时候它是以具有闪电般的速度被介绍的。于是在脑子里, Disruptor 和“闪电”一词关联了起来,然而却一直没有时间去探究一下。       最近正在进行一项对性能有很高要求的产品项目的研究,自然想起了闪电般的 Disruptor ,这必有它的用武之地,于是进行了一番探查,将成果和体会记录在案。 一、什么是 Di

Disruptor系列3:Disruptor样例实战

章节回顾: - Disruptor系列1:初识Disruptor - Disruptor系列2:Disruptor原理剖析 本章节是Disruptor样例实战,依据Disruptor的工作流依次执行的特性,实现各种样例。如果想了解Disruptor是什么,可以查看章节 Disruptor系列1:初识Disruptor ,如果想深层次了解Disruptor,可以查看章节 Disruptor系列

Disruptor系列2:Disruptor原理剖析

章节回顾: - Disruptor系列1:初识Disruptor 都说Disruptor是高性能、低延迟的内存队列,每秒可以处理600W的订单,但是它为什么这么快呢?这就需要我们从他的底层设计原理开始剖析。我觉得,学习了他的实现原理,对自身了解Java并发内存结构是有很大的好处的,因为它把如何基于Java内存结构实现高性能的并发操作,解决锁的性能开销问题发挥到了极致。 无锁(Lock-Fre

剖析Disruptor:为什么会这么快?(三)揭秘内存屏障(validate关键词解析)

主题是什么? 我写这个系列的博客主要目的是解析Disruptor是如何工作的,并深入了解下为什么这样工作。理论上,我应该从可能准备使用disruptor的开发人员的角度来写,以便在代码和技术论文[Disruptor-1.0.pdf]之间搭建一座桥梁。这篇文章提及到了内存屏障,我想弄清楚它们到底是什么,以及它们是如何应用于实践中的。 什么是内存屏障? 它是一个CPU指令。没错,又一次,我们在讨

从构建分布式秒杀系统聊聊Disruptor高性能队列

Disruptor学习网站:http://ifeve.com/disruptor-getting-started/   前言 秒杀架构持续优化中,基于自身认知不足之处在所难免,也请大家指正,共同进步。文章标题来自码友<tukangzheng>的建议,希望可以把阻塞队列ArrayBlockingQueue这个队列替换成Disruptor,由于之前曾接触过这个东西,听说很不错,正好借此机会整合进