本文主要是介绍disruptor(三),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
public class Trade {private String id;//id private String name;private double price;//金额 private AtomicInteger count = new AtomicInteger();public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}public AtomicInteger getCount() {return count;}public void setCount(AtomicInteger count) {this.count = count;}}
public class TradePublisher implements Runnable {Disruptor<Trade> disruptor;private CountDownLatch latch;private static int LOOP=1;public TradePublisher(Disruptor<Trade> disruptor, CountDownLatch latch) {this.disruptor = disruptor;this.latch = latch;}@Override public void run() {TradeEventTranslator tradeEventTranslator = new TradeEventTranslator();for(int i=0;i<LOOP;i++){disruptor.publishEvent(tradeEventTranslator);}latch.countDown();}} class TradeEventTranslator implements EventTranslator<Trade>{private Random random = new Random();@Override public void translateTo(Trade trade, long l) {this.generateTrade(trade);}private Trade generateTrade(Trade event){event.setPrice(random.nextDouble()*9999);return event;} }public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade>{@Override public void onEvent(Trade trade) throws Exception {System.out.println("handler1:set Name");trade.setName("h1");Thread.sleep(1000);}@Override public void onEvent(Trade trade, long l, boolean b) throws Exception {this.onEvent(trade);} }public class Handler2 implements EventHandler<Trade>,WorkHandler<Trade> {@Override public void onEvent(Trade trade, long l, boolean b) throws Exception {System.out.println("handler2:set price");trade.setPrice(17.0);Thread.sleep(1000);}@Override public void onEvent(Trade trade) throws Exception {this.onEvent(trade);} }public class Handler3 implements EventHandler<Trade>,WorkHandler<Trade> {@Override public void onEvent(Trade trade, long l, boolean b) throws Exception {System.out.println("handler3:name: "+trade.getName()+", price:"+trade.getPrice());}@Override public void onEvent(Trade trade) throws Exception {this.onEvent(trade);} }public class Main {public static void main(String[] args) throws InterruptedException {long beginTime = System.currentTimeMillis();int bufferSize = 1024;ExecutorService executro = Executors.newFixedThreadPool(8);Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {@Override public Trade newInstance() {return new Trade();}},bufferSize,executro, ProducerType.SINGLE,new YieldingWaitStrategy());EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(),new Handler2());handlerGroup.then(new Handler3());disruptor.start();CountDownLatch latch = new CountDownLatch(1);executro.submit(new TradePublisher(disruptor,latch));latch.await();disruptor.shutdown();executro.shutdown();System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));} }
这篇关于disruptor(三)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!