本文主要是介绍共享内存的数据同步,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、共享内存概述
在后台开发中,经常需要在多进程间进行数据共享,共享内存是一个较常见的选择。其他的IPC方式,包括磁盘文件、信号、套接字、管道、消息队列等,在需要传输大量数据时,性能都逊于共享内存。
共享内存是一段可以被多进程共享的物理内存,各个进程在使用之前,需要将这段物理内存映射到本进程的虚拟地址空间,系统调用是shmget+shmat或者shm_open+mmap。需要注意的是,各进程内的虚拟地址空间可能不一样,并且进程A修改共享内存的内容后,进程B可以立即读取。
二、共享内存的进程间同步
多个进程同时读写共享内存时,由于读写顺序不能保证,可能会导致数据错乱。因此需要引入同步机制,保证读写操作以一定的顺序执行。本节将介绍几种实现同步的方式。
首先想到的就是锁了。操作系统为我们提供了多种类型的锁,诸如互斥锁、读写锁、自旋锁等。互斥锁(pthread_mutex_t)可以保证线程间的互斥锁,通过设置互斥锁的进程间共享属性,可以实现进程间共用互斥锁。相应地,我们可以将读写锁、自旋锁改造为多进程共享的。这种方式有一个隐患:当加锁进程异常退出时,其他进程可能永远无法抢到锁。进程间共享的互斥锁示例代码如下:
// 创建一个进程间共享的互斥锁 // shm_ptr是共享内存映射得来的虚拟地址 int create_process_mutex(void* shm_ptr, pthread_mutex_t*&mutex) { pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED); mutex = reinterpret_cast<pthread_mutex_t*>(shm_ptr); int ret = pthread_mutex_init(mutex, &attr); pthread_mutexattr_destroy(&attr); return ret; } | |
其次,信号量也是一个常见的多进程同步方式。通过semget或者sem_open打开一个信号量,然后通过PV操作来达到进程间互斥的效果。但是信号量也存在加锁进程异常退出时,不能自动解锁的问题。
第三,文件记录锁可以锁定文件的一部分或者全部。假设我们将文件与共享内存中的数据块对应起来,则可以通过文件锁来同步共享内存的读写操作。值得说明的时,当进程异常退出时,该进程持有的文件锁会自动释放。相对于信号量,文件锁的效率稍差。示例代码如下:
int op_flock(int fid, int cmd, size_t offset, size_t length) { struct flock lock; lock.l_type = cmd; lock.l_start = offset; lock.l_whence = SEEK_SET; lock.l_len = length; return fcntl(fid, F_SETLK, &lock); } // 加文件锁 int lock_flock(int fid, size_t offset, size_t length) { return op_flock(fid, F_WRLCK, offset, length); } // 解文件锁 int unlock_flock(int fid, size_t offset, size_t length) { return op_flock(fid, F_UNLCK, offset, length); } | |
最后,如果对共享内存的操作都足够快,则可以模拟自旋锁的过程,实现同步。自旋锁在抢不到锁时进入自旋等待,直至别的进程\线程释放锁。我们可以通过原子操作来实现这个过程,从4.1.2开始,gcc提供了一系列原子操作操作,实现自旋锁需要CAS和Set。示例代码如下:
|
| // 山寨自旋锁:加锁 void water_spin_lock(void* shm_ptr) { uint8_t kFree = 0x00, kLock = 0xFF; uint8_t* ptr = reinterpret_cast<uint8_t*>(shm_ptr); for (uint32_t count = 1u; !__sync_bool_compare_and_swap(ptr,kFree, kLock); ++count) { if ((count % 10000u) == 0u) { timespec ts = {0, 1000}; nanosleep(&ts, &ts); // 自旋会霸占CPU,稍息 } } ///< while } // 山寨自旋锁:解锁 void water_spin_unlock(void* shm_ptr) { uint8_t* ptr = reinterpret_cast<uint8_t*>(shm_ptr); __sync_lock_test_and_set(ptr, 0x00); } | |
三、使用无锁结构
上一节给出了几种共享内存间同步数据的通用方法,实际上,如果能使用无锁数据结构,则可以避免同步操作,直达目标。比如我们经常用到的生产者-消费者模型中的队列,可以使用无锁队列来实现。
这里分享一个最近工作中用到的无锁实践。模块A提供了发送公众号消息的接口,内部使用队列异步调用Index模块来发消息。为了保护Index,A需要控制自己调用Index的速度,即每秒钟至多调用N次Index。使用漏桶算法很容易速度控制:1)维护一个X的数组,记录最近X秒钟内,每秒对Index的调用数;2)当数组所有元素总和大于X*N时,认为达到速度限制,将请求抛弃或者放到队列尾部重试。在多进程环境下,如何实现这样一个漏桶算法呢?
假设只保存最近一分钟的计数。在共享内存中开辟60*8B的空间,即60个uint64_t,其中每个元素的高32-bit为秒级时间戳,低32-bit表示该时间戳内的计数。使用CAS操作,可以保证多进程安全的操作这块共享内存。核心代码如下:
// 增加计数器 void AddCount(void* shm_ptr, uint32_t steps = 1) { uint32_t stamp = static_cast<uint32_t>(time(NULL)); uint32_t index = stamp % kArraySize; uint64_t* base_ptr = reinterpret_cast<uint64_t*>(shm_ptr); uint64_t* ptr = &base_ptr[index]; for (;;) { uint64_t old_val = *ptr; uint64_t new_val = 0u; if ((old_val >> 32u) == stamp) new_val = ((uint64_t)stamp << 32u) | ((old_val &0x0FFFFFFFF) + steps); else new_val = ((uint64_t)stamp << 32u) | (steps); // 如果等于旧值则交换,否则重试 if (__sync_bool_compare_and_swap(ptr, old_val, new_val)) break; } ///< for } | |
四、总结
共享内存作为交换数据效率较高的IPC方式,在后台开发中经常被使用。我们可以通过多种方式来实现数据同步,包括共享线程锁、文件锁、使用无锁结构。
这篇关于共享内存的数据同步的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!