本文主要是介绍disruptor(一),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
参考文档:文档1 文档2
public class LongEvent {
private long value;public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
public class LongEventFactory implements EventFactory {
@Override
public Object newInstance() {
// TODO Auto-generated method stub
return new LongEvent();
}
}
public class LongEventHandler implements EventHandler<LongEvent>{
@Override
public void onEvent(LongEvent arg0, long arg1, boolean arg2) throws Exception {
System.out.println("消费者进行处理:"+arg0.getValue());
}
}
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(java.nio.ByteBuffer bb){
long sequence = ringBuffer.next();
try{
LongEvent event = ringBuffer.get(sequence);
event.setValue(bb.getLong(0));
}finally{
ringBuffer.publish(sequence);
}
}
}
public class LongEventProducerWithTranslator {
public static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
@Override
public void translateTo(LongEvent arg0, long arg1,
ByteBuffer arg2) {
arg0.setValue(arg2.getLong(0));
}
};
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
super();
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer buffer){
ringBuffer.publishEvent(TRANSLATOR,buffer);
}
}
public class LongEventMain {
public static void main(String[] args) {
//创建缓冲池
ExecutorService executor = Executors.newCachedThreadPool();
//创建工厂
LongEventFactory factory = new LongEventFactory();
int ringBufferSize = 1024*1024;
//第一个参数为工厂类对象
//第二个缓冲区大小
//第三个线程池进行调度
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, executor,ProducerType.SINGLE,new YieldingWaitStrategy());
//连接消费事件方法
disruptor.handleEventsWith(new LongEventHandler());
//启动
disruptor.start();
//发布事件
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// LongEventProducer producer = new LongEventProducer(ringBuffer);
LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for(long i=0;i<100;i++){
byteBuffer.putLong(0,i);
producer.onData(byteBuffer);
}
disruptor.shutdown();
executor.shutdown();
}
}
这篇关于disruptor(一)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!