本文主要是介绍基于ACE_Message_Queue的生产者消费者模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1.生产者尽可能生产;
2.消费者按照自身需求执行消费行为;code:class CMediaSendBlock : public ACE_Message_Block
{
public:CMediaSendBlock( const char* pData, const unsigned long ulDataLen): ACE_Message_Block((size_t)ulDataLen, MB_DATA, 0, 0){if (NULL != pData){(void)this->copy(pData, (size_t) ulDataLen);}}virtual ~CMediaSendBlock() {}
};class CMediaSendQueue : public ACE_Message_Queue<ACE_MT_SYNCH>
{
public:CMediaSendQueue() {}virtual ~CMediaSendQueue() {}int enqueue_tail_ex(CMediaSendBlock* pBlock){ACE_Time_Value timeValue = ACE_OS::gettimeofday() + ACE_Time_Value(0, 10);int nRetVal = ACE_Message_Queue<ACE_MT_SYNCH>::enqueue_tail(pBlock, &timeValue);if (0 >= nRetVal){return -1;}return nRetVal;}
};CMediaSendQueue m_sendQueue;
bool m_bRunning = true;//生产者
//生产者尽可能快的生产
void* produce(void *arg)
{static int iThreadIndex = -1;++iThreadIndex;std::cout << "this is produce thread num " << iThreadIndex << std::endl;int iSize = 1024 * 1024;char* pBuff = new char[iSize];memset(pBuff, 0x0, iSize);while(m_bRunning){CMediaSendBlock* pBlock = new CMediaSendBlock( pBuff, iSize);int iRet = m_sendQueue.enqueue_tail(pBlock, NULL); //等待到达低水位if (0 >= iRet){delete pBlock;pBlock = NULL;std::cout << "center is full...." << std::endl;}else {std::cout << "push success...." << std::endl;}}std::cout << "produce task finished...." << std::endl;return NULL;
}//消费者
//消费者按照自身需要的速度进行消费
void* consume(void *arg)
{static int iThreadIndex = -1;++iThreadIndex;std::cout << "this is consume thread num " << iThreadIndex << std::endl;while(m_bRunning){ACE_Message_Block* pBlock = NULL;if (-1 == m_sendQueue.dequeue_head(pBlock))continue;//发送CMediaSendBlock* pSendBlock = dynamic_cast<CMediaSendBlock*>(pBlock);if(NULL == pSendBlock)continue;std::cout << "I am consuming.... " << std::endl;//释放blockdelete pSendBlock;pSendBlock = NULL;std::cout << "message_bytes" << std::dec << m_sendQueue.message_bytes() << std::endl;std::cout << "message_length" << std::dec << m_sendQueue.message_length() << std::endl;ACE_OS::sleep(2);}std::cout << "consume over......" << std::endl;return NULL;
}int main(int argc, char* argv[])
{ACE::init();m_sendQueue.high_water_mark(10 * 1024 * 1024);m_sendQueue.low_water_mark(2 * 1024 * 1024);m_sendQueue.activate();//m个生产者,n个消费者//产生生产者线程ACE_Thread_Manager::instance()->spawn_n(5,(ACE_THR_FUNC) produce);ACE_OS::sleep(2); //让生产者填满仓库产生消费者线程ACE_Thread_Manager::instance()->spawn_n(2,(ACE_THR_FUNC) consume);int iData;std::cin >> iData;//closem_bRunning = false;m_sendQueue.deactivate();//waitACE_OS::sleep(2);ACE::fini();return 0;
}
这篇关于基于ACE_Message_Queue的生产者消费者模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!