《ACE程序员指南》中领导者与跟随者模式示例代码中的bug和解决方案

本文主要是介绍《ACE程序员指南》中领导者与跟随者模式示例代码中的bug和解决方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《ACE程序员指南》中线程池一章中提到了两个模式:

 

半同步半异步模式和领导者和跟随者模式,对于后者

 

书中给出了一个示例程序,为了方便说明,我把示例代码贴在这里:

 

#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)

#include "ace/OS_NS_string.h"
#include "ace/OS_NS_sys_time.h"
#include "ace/Task.h"
#include "ace/Containers.h"
#include "ace/Synch.h"

// Listing 4 code/ch16
class Follower
{
public:
  Follower (ACE_Thread_Mutex &leader_lock)
    : cond_(leader_lock)
  {
    owner_ = ACE_Thread::self ();
  }

  //FUZZ: disable check_for_lack_ACE_OS
  int wait (void)
  {
    return this->cond_.wait ();
  }

  int signal (void)
  {
    return this->cond_.signal ();
  }
  //FUZZ: enable check_for_lack_ACE_OS

  ACE_thread_t owner (void)
  {
    return this->owner_;
  }

private:
  ACE_Condition<ACE_Thread_Mutex> cond_;
  ACE_thread_t owner_;
};
// Listing 4
// Listing 1 code/ch16
class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH>
{
public:
  LF_ThreadPool () : shutdown_(0), current_leader_(0)
  {
    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::TP"));
  }

  virtual int svc (void);

  void shut_down (void)
  {
    shutdown_ = 1;
  }

private:
  int become_leader (void);

  Follower *make_follower (void);

  int elect_new_leader (void);

  int leader_active (void)
  {
    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));
    return this->current_leader_ != 0;
  }

  void leader_active (ACE_thread_t leader)
  {
    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));
    this->current_leader_ = leader;
  }

  void process_message (ACE_Message_Block *mb);

  int done (void)
  {
    return (shutdown_ == 1);
  }

private:
  int shutdown_;
  ACE_thread_t current_leader_;
  ACE_Thread_Mutex leader_lock_;
  ACE_Unbounded_Queue<Follower*> followers_;
  ACE_Thread_Mutex followers_lock_;
  static long LONG_TIME;
};
// Listing 1
// Listing 2 code/ch16
int
LF_ThreadPool::svc (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::svc"));
  while (!done ())
    {
      become_leader ();  // Block until this thread is the leader.

      ACE_Message_Block *mb = 0;
      ACE_Time_Value tv (LONG_TIME);
      tv += ACE_OS::gettimeofday ();

      // Get a message, elect new leader, then process message.
      if (this->getq (mb, &tv) < 0)
        {
          if (elect_new_leader () == 0)
            break;
          continue;
        }

      elect_new_leader ();
      process_message (mb);
    }

  return 0;
}
// Listing 2
// Listing 3 code/ch16
int
LF_ThreadPool::become_leader (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::become_leader"));

  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
  if (leader_active ())
    {
      Follower *fw = make_follower ();
      {
        // Wait until told to do so.
        while (leader_active ())
          fw->wait ();
      }

      delete fw;
    }

  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader/n")));

  // Mark yourself as the active leader.
  leader_active (ACE_Thread::self ());
  return 0;
}

Follower*
LF_ThreadPool::make_follower (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::make_follower"));

  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0);
  Follower *fw;
  ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0);
  this->followers_.enqueue_tail (fw);
  return fw;
}
// Listing 3
// Listing 5 code/ch16
int
LF_ThreadPool::elect_new_leader (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::elect_new_leader"));

  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
  leader_active (0);

  // Wake up a follower
  if (!followers_.is_empty ())
    {
      ACE_GUARD_RETURN (ACE_Thread_Mutex,
                        follower_mon,
                        this->followers_lock_,
                        -1);
      // Get the old follower.
      Follower *fw;
      if (this->followers_.dequeue_head (fw) != 0)
        return -1;
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("(%t) Resigning and Electing %d/n"),
                  fw->owner ()));
      return (fw->signal () == 0) ? 0 : -1;
    }
  else
    {
      ACE_DEBUG
        ((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left/n")));
      return -1;
    }
}
// Listing 5

void
LF_ThreadPool::process_message (ACE_Message_Block *mb)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::process_message"));
  int msgId;
  ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
  mb->release ();

  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) Started processing message:%d/n"),
              msgId));
  ACE_OS::sleep (1);
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) Finished processing message:%d/n"),
              msgId));
}

long LF_ThreadPool::LONG_TIME = 5L;

int ACE_TMAIN (int, ACE_TCHAR *[])
{
  LF_ThreadPool tp;
  tp.activate (THR_NEW_LWP| THR_JOINABLE, 5);

  // Wait for a few seconds...
  ACE_OS::sleep (2);
  ACE_Time_Value tv (1L);

  ACE_Message_Block *mb;
  for (int i = 0; i < 30; i++)
    {
      ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1);
      ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
      ACE_OS::sleep (tv);

      // Add a new work item.
      tp.putq (mb);
    }

  ACE_Thread_Manager::instance ()->wait ();

  ACE_OS::sleep (10);

  return 0;
}

#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"

int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}

#endif /* ACE_HAS_THREADS */

 

我简单解释一下程序的思路:首先创建一个线程池,这个线程池中包括5个线程,每个线程都试图成为领导者,

 

但领导者只能有一个,因此一旦某个线程竞争成为领导者,其他线程只能把自己定位为跟随者,

 

并把与之相应的跟随者对象放在一个跟随者队列中,然后等待机会成为领导者,等待时跟随者线程处于挂起状态。

 

那个竞争中成为领导者的线程,试图从消息队列中抓起消息,如果消息队列中没有消息,该线程也会处于挂起状态。

 

一旦客户把一个消息放入队列(相当于添加了一个任务),领导者线程会被激活,并从消息队列中获取一条消息,

 

然后它做的第一件事情是找一个新领导者,寻找算法就是简单的从跟随者队列的头部取出跟随者,

 

指定它为新领导者,并激活该跟随者线程,当完成这些事情后,再去处理从队列中获取的消息,去执行相应的任务。

 

等它完成任务处理后,它再去努力成为领导者,如果发现目前已经有领导者,只好把自己定位为跟随者,并把与之相

 

对应的跟随者对象放入跟随者队列,等待机会成为领导者。

 

我在Windows平台上测试上述代码,确定是有问题的,最开始处理几条消息,领导者和跟随者的切换还是对的,

 

但到后来,有几个线程就永远也无法成为领导者了,最后这有两个线程在轮流坐庄,这个显然不是我们期望的结果。

 

我仔细的研究了一下线程运行的中间过程,找到了问题的原因。

 

elect_new_leader 函数在选择完跟随者线程作为新领导者后,并没有在函数中直接修改current_leader_变量为


新领导者的值,而是把该变量设置为0,修改current_leader_变量为新领导者的值事情延迟到新领导者线程被激活后

 

再去在该线程中执行,这样中间就有一个时间差,这就给了老领导者一个机会偷窃领导者的位置。

 

假设老领者已经完成了任务,这时新领导者的线程刚刚被操作系统激活,还没有来得及把current_leader_标记为

 

自己的Threadid,老领导者发现领导者的位置还没有人,直接把自己设置为领导者,而新领导者本来作为正统的被

 

选出来的领导者,在做设置领导者前,发现位置上已经有领导了,只好又继续等待,但因为elect_new_leader 函数

 

已经把它的跟随者对象从队列中移除,这样它永远没有机会成为领导者,而只能一直挂起等待了。

 

找到问题的原因后,修改bug并不复杂,只要在elect_new_leader 函数中把current_leader_改成新领导者的线程id

 

就不会出问题了,修改后的代码如下:

 

#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)

#include "ace/OS_NS_string.h"
#include "ace/OS_NS_sys_time.h"
#include "ace/Task.h"
#include "ace/Containers.h"
#include "ace/Synch.h"
#include <sstream>

using namespace std;

// Listing 4 code/ch16
class Follower
{
public:
  Follower (ACE_Thread_Mutex &leader_lock)
    : cond_(leader_lock)
  {
    owner_ = ACE_Thread::self ();
  }

  //FUZZ: disable check_for_lack_ACE_OS
  int wait (void)
  {
    return this->cond_.wait ();
  }

  int signal (void)
  {
    return this->cond_.signal ();
  }
  //FUZZ: enable check_for_lack_ACE_OS

  ACE_thread_t owner (void)
  {
    return this->owner_;
  }

private:
  ACE_Condition<ACE_Thread_Mutex> cond_;
  ACE_thread_t owner_;
};
// Listing 4
// Listing 1 code/ch16
class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH>
{
public:
  LF_ThreadPool () : shutdown_(0), current_leader_(0)
  {
    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::TP"));
  }

  virtual int svc (void);

  void shut_down (void)
  {
    shutdown_ = 1;
  }

private:
  int become_leader (void);

  Follower *make_follower (void);

  int elect_new_leader (void);

  int leader_active (void)
  {
    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));
    return this->current_leader_ != 0;
  }

  void leader_active (ACE_thread_t leader)
  {
    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));
    this->current_leader_ = leader;
  }

  void process_message (ACE_Message_Block *mb);

  int done (void)
  {
    return (shutdown_ == 1);
  }

private:
  int shutdown_;
  ACE_thread_t current_leader_;
  ACE_Thread_Mutex leader_lock_;
  ACE_Unbounded_Queue<Follower*> followers_;
  ACE_Thread_Mutex followers_lock_;
  static long LONG_TIME;
};
// Listing 1
// Listing 2 code/ch16
int
LF_ThreadPool::svc (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::svc"));
  while (!done ())
    {
      become_leader ();  // Block until this thread is the leader.

      ACE_Message_Block *mb = 0;
      ACE_Time_Value tv (LONG_TIME);
      tv += ACE_OS::gettimeofday ();

      // Get a message, elect new leader, then process message.
      if (this->getq (mb, &tv) < 0)
        {
          elect_new_leader ();
          break;
        }

      elect_new_leader ();
      process_message (mb);
    }

  return 0;
}
// Listing 2
// Listing 3 code/ch16
int
LF_ThreadPool::become_leader (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::become_leader"));

  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
  if (leader_active ())
    {
      Follower *fw = make_follower ();
      {
        // Wait until told to do so.
        while (leader_active ())
        {
          fw->wait ();
          ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) thread activate!/n")));
          break;
        }
      }

      delete fw;
    }

  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader/n")));

  // Mark yourself as the active leader.
  leader_active (ACE_Thread::self ());
  return 0;
}

Follower*
LF_ThreadPool::make_follower (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::make_follower"));

  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0);
  Follower *fw;
  ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0);
  this->followers_.enqueue_tail (fw);
  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) thread follower enter queue/n")));
  return fw;
}
// Listing 3
// Listing 5 code/ch16
int
LF_ThreadPool::elect_new_leader (void)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::elect_new_leader"));

  ACE_GUARD_RETURN
    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
  leader_active (0);

  // Wake up a follower
  if (!followers_.is_empty ())
    {
      ACE_GUARD_RETURN (ACE_Thread_Mutex,
                        follower_mon,
                        this->followers_lock_,
                        -1);
      // Get the old follower.
      Follower *fw;
      if (this->followers_.dequeue_head (fw) != 0)
        return -1;
      leader_active (fw->owner());
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("(%t) Resigning and Electing %d/n"),
                  fw->owner ()));
      return (fw->signal () == 0) ? 0 : -1;
    }
  else
    {
      ACE_DEBUG
        ((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left/n")));
      return -1;
    }
}
// Listing 5

void
LF_ThreadPool::process_message (ACE_Message_Block *mb)
{
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::process_message"));
  int msgId;
  ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
  mb->release ();

  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) Started processing message:%d/n"),
              msgId));
  ACE_OS::sleep (1);
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) Finished processing message:%d/n"),
              msgId));
}

long LF_ThreadPool::LONG_TIME = 5L;

int ACE_TMAIN (int, ACE_TCHAR *[])
{
  LF_ThreadPool tp;
  tp.activate (THR_NEW_LWP| THR_JOINABLE, 5);

  // Wait for a few seconds...
  ACE_OS::sleep (2);
  ACE_Time_Value tv (1L);

  ACE_Message_Block *mb;
  for (int i = 0; i < 30; i++)
    {
      ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1);
      ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
      ACE_OS::sleep (tv);

      // Add a new work item.
      tp.putq (mb);
    }

  ACE_Thread_Manager::instance ()->wait ();

  ACE_OS::sleep (10);

  return 0;
}

#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"

int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_OS::puts (ACE_TEXT ("This example requires threads."));
  return 0;
}

#endif /* ACE_HAS_THREADS */

 

 

这篇关于《ACE程序员指南》中领导者与跟随者模式示例代码中的bug和解决方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/764264

相关文章

Python位移操作和位运算的实现示例

《Python位移操作和位运算的实现示例》本文主要介绍了Python位移操作和位运算的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 位移操作1.1 左移操作 (<<)1.2 右移操作 (>>)注意事项:2. 位运算2.1

SpringBoot整合OpenFeign的完整指南

《SpringBoot整合OpenFeign的完整指南》OpenFeign是由Netflix开发的一个声明式Web服务客户端,它使得编写HTTP客户端变得更加简单,本文为大家介绍了SpringBoot... 目录什么是OpenFeign环境准备创建 Spring Boot 项目添加依赖启用 OpenFeig

pandas中位数填充空值的实现示例

《pandas中位数填充空值的实现示例》中位数填充是一种简单而有效的方法,用于填充数据集中缺失的值,本文就来介绍一下pandas中位数填充空值的实现,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是中位数填充?为什么选择中位数填充?示例数据结果分析完整代码总结在数据分析和机器学习过程中,处理缺失数

Pandas统计每行数据中的空值的方法示例

《Pandas统计每行数据中的空值的方法示例》处理缺失数据(NaN值)是一个非常常见的问题,本文主要介绍了Pandas统计每行数据中的空值的方法示例,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是空值?为什么要统计空值?准备工作创建示例数据统计每行空值数量进一步分析www.chinasem.cn处

SpringBoot请求参数接收控制指南分享

《SpringBoot请求参数接收控制指南分享》:本文主要介绍SpringBoot请求参数接收控制指南,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Spring Boot 请求参数接收控制指南1. 概述2. 有注解时参数接收方式对比3. 无注解时接收参数默认位置

利用Python调试串口的示例代码

《利用Python调试串口的示例代码》在嵌入式开发、物联网设备调试过程中,串口通信是最基础的调试手段本文将带你用Python+ttkbootstrap打造一款高颜值、多功能的串口调试助手,需要的可以了... 目录概述:为什么需要专业的串口调试工具项目架构设计1.1 技术栈选型1.2 关键类说明1.3 线程模

Python Transformers库(NLP处理库)案例代码讲解

《PythonTransformers库(NLP处理库)案例代码讲解》本文介绍transformers库的全面讲解,包含基础知识、高级用法、案例代码及学习路径,内容经过组织,适合不同阶段的学习者,对... 目录一、基础知识1. Transformers 库简介2. 安装与环境配置3. 快速上手示例二、核心模

Python使用getopt处理命令行参数示例解析(最佳实践)

《Python使用getopt处理命令行参数示例解析(最佳实践)》getopt模块是Python标准库中一个简单但强大的命令行参数处理工具,它特别适合那些需要快速实现基本命令行参数解析的场景,或者需要... 目录为什么需要处理命令行参数?getopt模块基础实际应用示例与其他参数处理方式的比较常见问http

Nginx location匹配模式与规则详解

《Nginxlocation匹配模式与规则详解》:本文主要介绍Nginxlocation匹配模式与规则,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、环境二、匹配模式1. 精准模式2. 前缀模式(不继续匹配正则)3. 前缀模式(继续匹配正则)4. 正则模式(大

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组