本文主要是介绍Yahoo pulsar -1 Producer send和Consumer receive 过程源码剖析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1、同步发送消息
MessageId send(byte[] message) throws PulsarClientException;
private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
.newUpdater(ProducerImpl.class, "msgIdGenerator");
msgIdGeneratorUpdater 用于生成请求ID,每次发送请求ID都不一样,用来区分消息。
private final BlockingQueue<OpSendMsg> pendingMessages = Queues.newArrayBlockingQueue(conf.getMaxPendingMessages()); 阻塞的消息队列,用于缓存发送消息。
从Channel所在的线程 eventLoop()来执行发送请求。避免线程切换。
// Read the connection before validating if it's still connected, so that we avoid reading a null
// value
ClientCnx cnx = cnx();
if (isConnected()) {
// If we do have a connection, the message is sent immediately, otherwise we'll try again once a
// new
// connection is established
cmd.retain();
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
}
private static final class WriteInEventLoopCallback implements Runnable {private ProducerImpl producer;private ClientCnx cnx;private OpSendMsg op;static WriteInEventLoopCallback create(ProducerImpl producer, ClientCnx cnx, OpSendMsg op) {WriteInEventLoopCallback c = RECYCLER.get();c.producer = producer;c.cnx = cnx;c.op = op;return c;}@Overridepublic void run() {if (log.isDebugEnabled()) {log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", producer.topic, producer.producerName, cnx,op.sequenceId);}try {cnx.ctx().writeAndFlush(op.cmd, cnx.ctx().voidPromise());} finally {recycle();}}private void recycle() {producer = null;cnx = null;op = null;RECYCLER.recycle(this, recyclerHandle);}private final Handle recyclerHandle;private WriteInEventLoopCallback(Handle recyclerHandle) {this.recyclerHandle = recyclerHandle;}private static final Recycler<WriteInEventLoopCallback> RECYCLER = new Recycler<WriteInEventLoopCallback>() {@Overrideprotected WriteInEventLoopCallback newObject(Handle handle) {return new WriteInEventLoopCallback(handle);}};}
发送消息后接受响应消息:
@Over
这篇关于Yahoo pulsar -1 Producer send和Consumer receive 过程源码剖析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!