本文主要是介绍disruptor(二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
public class Trade {private String id;//id
private String name;
private double price;//金额
private AtomicInteger count = new AtomicInteger();
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public AtomicInteger getCount() {
return count;
}
public void setCount(AtomicInteger count) {
this.count = count;
}
}
public class TradeHandler implements EventHandler<Trade>,WorkHandler<Trade>{
@Override
public void onEvent(Trade arg0) throws Exception {
//这里做具体的消费逻辑
arg0.setId(UUID.randomUUID().toString());
System.out.println(arg0.getId()+" 价格="+arg0.getPrice());
}
@Override
public void onEvent(Trade arg0, long arg1, boolean arg2) throws Exception {
this.onEvent(arg0);
}
}
public class Main {
public static void main(String[] args) throws Exception, ExecutionException {
int BUFFER_SIZE = 1024;
int THREAD_NUMBERS = 4;
final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {
@Override
public Trade newInstance() {
return new Trade();
}
}, BUFFER_SIZE,new YieldingWaitStrategy());
//创建线程池
ExecutorService executos = Executors.newFixedThreadPool(THREAD_NUMBERS);
//创建SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//创建消息处理器
BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(ringBuffer, sequenceBarrier, new TradeHandler());
//把消费者信息引入到生产端
ringBuffer.addGatingSequences(transProcessor.getSequence());
//把消息处理器提交到线程池
executos.submit(transProcessor);
Future<?> future = executos.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
// TODO Auto-generated method stub
long seq;
for(int i=0;i<10;i++){
seq = ringBuffer.next();
ringBuffer.get(seq).setPrice(Math.random()*9999);
ringBuffer.publish(seq);
}
return null;
}
});
future.get();
Thread.sleep(1000);
transProcessor.halt();
executos.shutdown();
}
}
public class Main2 {
public static void main(String[] args) throws Exception {
int BUFFER_SIZE = 1024;
int THREAD_NUMBERS = 4;
final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {
@Override
public Trade newInstance() {
return new Trade();
}
}, BUFFER_SIZE,new YieldingWaitStrategy());
//创建线程池
ExecutorService executos = Executors.newFixedThreadPool(THREAD_NUMBERS);
//创建SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
WorkHandler<Trade> handler = new TradeHandler();
WorkerPool<Trade> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(),handler);
workerPool.start(executos);
for(int i=0;i<8;i++){
long seq = ringBuffer.next();
ringBuffer.get(seq).setPrice(Math.random()*9999);
ringBuffer.publish(seq);
}
Thread.sleep(1000);
workerPool.halt();
executos.shutdown();
}
}
这篇关于disruptor(二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!