本文主要是介绍「实验记录」MIT 6.S081 Lab8 locks,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
#Lab8: locks
- I. Source
- II. My Code
- III. Motivation
- IV. Memory allocator (moderate)
- i.Motivation
- ii. Solution
- iii. Result
- V. Buffer cache (hard)
- i. Motivation
- ii. Solution
- S1 - 定义 bcache 结构体
- S2 - bcache 的相关操作
- S3 - 何时 pin() 和 unpin()
- S4 - 设计 bget()
- iii. Result
- VI. Reference
I. Source
- MIT-6.S081 2020 课程官网
- Lab8: locks 实验主页
- MIT-6.S081 xv6 book Chapter 6 Scheduling 个人笔记
- B站 - Lec13: Sleep & Wakeup
II. My Code
- Lab8: locks 的 Gitee
- xv6-labs-2020 的 Gitee 总目录
III. Motivation
Lab8: locks 主要是想让我们熟悉一些提高并行效率的高级技巧,用了 xv6 已有的两个内存管理机制作为 demo,让我们对其进行优化。类似于分布式锁原理,可以说是技巧相同,师出同门。如果说 Lab7: multithreading 是牛刀小试的话, Lab8: locks 完全称得上是进阶修炼
提高并行效率的根本方法,就是减小锁的颗粒度。废话不多说,看下面的两个子实验吧
IV. Memory allocator (moderate)
i.Motivation
xv6 目前有一个差不多能用的 Memory allocator ,它的责任就是为进程分配内存,诸如 pagetable or 用户层的 malloc 等大大小小的空间,无论是分配上层的还是底层的内存,都离不开 Memory allocator 。可以说它是 xv6 中非常基础的组件了
现有的 Memory allocator 是这样管理空闲 page 的,将 xv6 中所有空闲 page 都收集起来,然后将其用单链表串联。每次新来一个 kalloc()
请求后,Memory allocator 会从 freelist 中选取一个空闲 page 进行分配;当用完 page 之后,用户层调用 kfree()
释放内存,此时 Memory allocator 会将该 page 挂回 freelist 。这期间,需要上锁放锁,我就不再赘述了(与多线程共享同一变量类似)
就是这么简单的操作和流程,为什么采用这么简单的方法?难道没有更高级的数据结构了嘛?可以用哈希表 or B+树管理 memory !答案恰恰相反,OS 不比应用程序,它不追求正式管理,不讲究套路。OS 讲究的是效率,是效率与复杂程度的博弈。用单链表这个复杂程度较低的数据结构换取更高的执行效率,这才是 OS 想要的速度
现有的 Memory allocator 看样子很好,没什么大的问题!且慢,这是 1~2 个 CPU 的情况,Memory allocator 能够正常且高效的工作,如果是多个 CPU 呢?现实世界可是这样啊
多个 CPU 都要共享一个 freelist ,每个 CPU 申请 or 释放 page ,都需对整个 freelist 上锁,那么势必带来很严重的排队问题,队排的越长,效率越低。这队的长短取决于进程的个数,现实世界中,队很长,但出口不能只有一个,不然处理的太慢。目前 xv6 的出口只有一个,对应只有一个 freelist
现在的问题,就是能不能多创建几个出口,也就是把唯一的 freelist 拆分成多份。Lab: Memory alloctor 的目的就是让我们贴近现实,因为 xv6 有多(8)个 CPU ,能不能把一张大的 freelist 拆成 8 小份,分给每个 CPU 。这样的话,再有进程申请 or 释放 page ,就不会再影响整体了(不像从前,动不动就对整张 freelist 上锁),会关上特定 CPU 的小门,自己处理,不会影响运行在其他 CPU 上的进程
ii. Solution
Lab: Memory alloctor 的主要战场在 kernel/kalloc.c
中,基于这个目标,我们将原有的 kmem
拉伸成 kmems
,让每个 CPU 都拥有 freelist 可供管理。结构定义改成,
struct {struct spinlock lock;struct run *freelist;
} kmems[NCPU];
定义好数据结构后,就可以正式开始流程了。首先,调整 kinit()
,以前是一个 lock ,现在是 NCPU
个 lock ,当然要全部初始化,
void
kinit()
{for(int i=0; i<NCPU; i++)initlock(&kmems[i].lock, "kmem");freerange(end, (void*)PHYSTOP);
}
当进程有申请 page 的请求后,会调用 kalloc()
。在此过程中,我们需要知道,当前进程是在哪个 CPU 上运行的,Lab8: locks 实验主页 指明在获取 cpuid 时需要关中断,原话是这样的,
The function
cpuid
returns the current core number, but it’s only safe to call it and use its result when interrupts are turned off. You should usepush_off()
andpop_off()
to turn interrupts off and on.
为什么要获取 cpuid 呢?因为只有知道当前进程在哪个 CPU 上运行,才能知道去哪个 CPU 的 freelist 中拿取空闲 page 。先看一下代码,
// Allocate one 4096-byte page of physical memory.
// Returns a pointer that the kernel can use.
// Returns 0 if the memory cannot be allocated.
void *
kalloc(void)
{struct run *r;/** 获取 cpuid 时要关中断,完事之后再把中断开下来 */push_off();int id = cpuid();pop_off();/** 尝试获取剩余的空闲 page */acquire(&kmems[id].lock);r = kmems[id].freelist;if(r) {kmems[id].freelist = r->next;} else {for(int i=0; i<NCPU; i++) {if(i == id)continue;/** 尝试偷一个其他 CPU 的空闲 page */acquire(&kmems[i].lock);if(!kmems[i].freelist) {release(&kmems[i].lock);continue;}r = kmems[i].freelist;kmems[i].freelist = r->next;release(&kmems[i].lock);break;}}release(&kmems[id].lock);/** 有一种可能:第id个CPU没有空闲 page ,也没偷到 */if(r)memset((char*)r, 5, PGSIZE); // fill with junkreturn (void*)r;
}
主要流程,就是去第 id 号 CPU 中拿取空闲 page 。如果该 CPU 有空闲 page ,则直接分配;反之,就去偷别的 CPU 的 ,去看看别的 CPU 是否有空闲 page ;如果偷也没偷到,那么就向上层返回空指针
上锁放锁的问题,很简单,注意配套就行。上锁了,别忘记放锁!
kfree()
也一样,有进程来归还 page ,就将 page 挂载到相应的 CPU 的 freelist 上,流程比 kalloc()
简单。代码如下,
// Free the page of physical memory pointed at by v,
// which normally should have been returned by a
// call to kalloc(). (The exception is when
// initializing the allocator; see kinit above.)
void
kfree(void *pa)
{struct run *r;if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)panic("kfree");// Fill with junk to catch dangling refs.memset(pa, 1, PGSIZE);r = (struct run*)pa;/** 获取 cpuid 时要关中断,完事之后再把中断开下来 */push_off();int id = cpuid();pop_off();/** 将空闲的 page 归还给第 id 个 CPU */acquire(&kmems[id].lock);r->next = kmems[id].freelist;kmems[id].freelist = r;release(&kmems[id].lock);
}
至此,已完成 Memory allocator 的再造工作
iii. Result
手动进入 qemu
make qemu
$kalloctest
$usertests sbrkmuch
$usertests
V. Buffer cache (hard)
i. Motivation
在介绍为什么要改进 xv6 现有的 Buffer cache 机制之前,先要清楚一个常识:在现代 OS 中,所有 I/O 操作都不会直接与 disk 打交道,一般都是将 disk 中的数据拷贝到 memory 中,进程再从 memory 中进行读写。memory 中用来暂存 disk 数据的地方,叫做 Buffer cache
好,知道了 Buffer cache 为何物后,还要搞明白 xv6 是如何运作这套 I/O 流程的
现有的 Buffer cache 机制是这样的,xv6 有一个非常大的数组,代表着空闲 buf 的集合。然后,利用双链表手段将这些空闲 buf 串联起来,方便分配和回收。为什么使用双链表手段?就是冲着效率较高的最近最少使用算法来的( least recently used,LRU ),算法我就不赘述了,力扣上有类似的题目可供熟悉。简而言之,Buffer cache 就是一块空闲内存,用来暂存数据,作 disk 和进程之间的缓冲地带
串联好空闲的 buf 之后,开始正式的 I/O 流程。因为 xv6 是刚启动,所以 Buffer cache 没有从 disk 中拷贝过数据。此时来了进程 A ,带来了读 disk block0 的请求。进程 A 发出读请求后,就将自己切入至休眠状态了(主动让出 CPU )。xv6 接收到读请求后,将 disk block0 中的数据拷贝到 Buffer cache 中,然后再通知正在休眠的进程 A (其需要的 I/O 条件已满足,可以继续运行)
上面提到的情形,是只有一个进程读 disk block0。现实中,肯定远远不止有一个进程,可能同时有一百个进程都想读 or 写 disk block0。我们知道,多个线程同时操纵一块内存时,需要通过锁机制来确保读写的有序进行,不然会出错。同样,在处理 Buffer cache 问题上也是利用这种手段
xv6 帮我们实现好了一个简易的版本,Buffer cache 只是一块很大的内存空间,为了确保多进程之间有序的读写,xv6 为 Buffer cache 配备了一个 lock 。这已经可以实现有序操作了,但还是存在问题的,比如进程 A 只想修改 disk block0 的那一小块内容,但却要对整块 Buffer cache 上锁(假设 Buffer cache 可以装下100块 block ),这太得不偿失了
试想,若此时还有一个进程 B ,它仅仅是想读 disk block2 的数据。本来无需考虑除 disk block2 外的情况,结果却因为进程 A 对整个 Buffer cache 上锁了,被迫需要等待,等进程 A 放锁之后才能读取。这就是问题所在!专业术语,就是锁的颗粒度很大,需要优化
所以,这个子实验的目的很明确,就是解决上述的问题。针对进程 A 的请求可以只对 disk block0 在 Buffer cache 的映射区域上锁,而不是选择对一整块 Buffer cache 上锁,这样的话,也不会影响进程 B 的 disk block2 的请求
说的专业一点,就是将一块非常大的 Buffer cache 分而治之,划分成众多小区域,针对小区域进行上锁,而不是动不动就一整块上锁。这样可以明显提高并行效率,这种手段术语叫减小锁的颗粒度
ii. Solution
S1 - 定义 bcache 结构体
在明确 Lab: Buffer cache 的目的之后,我们开始动手优化 xv6 原有的 Buffer cache 机制。这次我们的主要战场是 kernel/bio.c
,主要就是优化 bget()
中的代码,我们首先会看到 bcache
的结构体定义,
struct {struct spinlock lock;struct buf buf[NBUF];// Linked list of all buffers, through prev/next.// Sorted by how recently the buffer was used.// head.next is most recent, head.prev is least.struct buf head;
} bcache;
很明确,bcache
有一个很大的数组,即是空闲 buf
的集合;然后就是 head
,它是为了支持 LRU 算法而特意设计的数据结构(双链表的表头);当然,还有一个 lock
,用来控制众多进程访问 bcache
的顺序
我们在 I. Motivation 中讲的很清楚,并行效率的瓶颈,就是 bcache
作为一个整体而言它太大了,导致每一次进程的访问都会对整块区域上锁,我们希望的是,能不能缩小上锁的区域,尽量只影响一小块区域的工作
这种设想是可以实现的,它与分布式锁的思想类似。就是分而治之,将整块 Buffer cache 划分成许多小块。为了让 Lab: Buffer cache 主题鲜明,就不在分而治之这块技术上多做文章了,免得有些喧宾夺主了。采用较为简单的哈希分桶策略,将众多空闲 buf 分别挂载到不同(由 buf
的 blockno
的哈希值而定)的哈希桶上,在这里简单定义一下,
struct {struct buf buf[NBUF];// Linked list of all buffers, through prev/next.// Sorted by how recently the buffer was used.// head.next is most recent, head.prev is least.struct buf buckets[NBUCKET];struct spinlock lks[NBUCKET];
} bcache;
其中的 NBUCKET
在 kernel/param.h
中有声明,
#define NBUCKET 13 /** 哈希表桶数 */
为什么讲 NBUCKET
定义成奇数,Lab8: locks 实验主页 是这样讲的,
It is OK to use a fixed number of buckets and not resize the hash table dynamically. Use a prime number of buckets (e.g., 13) to reduce the likelihood of hashing conflicts.
S2 - bcache 的相关操作
在定义好最重要的 bcache
数据结构之后,就可以对 bget()
等核心操作动手了
首先,要调整 binit()
,与版本0不同(因为数据结构不同了),版本1要多初始化每个哈希桶的小锁,
void
binit(void)
{struct buf *b;for(int i=0; i<NBUCKET; i++) initlock(&bcache.lks[i], "bcache");// Create linked list of buffersfor(int i=0; i<NBUCKET; i++) {bcache.buckets[i].prev = &bcache.buckets[i];bcache.buckets[i].next = &bcache.buckets[i];}for(b = bcache.buf; b < bcache.buf+NBUF; b++){b->next = bcache.buckets[0].next;b->prev = &bcache.buckets[0];initsleeplock(&b->lock, "buffer");bcache.buckets[0].next->prev = b;bcache.buckets[0].next = b;}
}
和版本0相同的地方是,也要保持每个哈希桶是双链表的特性,所以将每个哈希桶的 prev
和 next
都指向了自己;另外,为了满足之后的 “挖墙脚策略” ,在这里我们将所有空闲 buf
都挂载到第 0 号哈希桶上,方便后续接济其他哈希桶,因为存在一种情况:有的哈希桶中可能没有空闲 buf
了,此时它会去别的哈希桶中偷(这个故事,后面再细说)
bread()
和 bwrite()
无需我们修改,但我还是提一嘴的好。bread()
是提供给上层的接口,当 kernel 要从 Buffer cache 中读取数据时,就会调用 bread()
。bread()
又会去调用 bget()
,尝试在 Buffer cache 中定位到所需的数据。若 bget()
顺利定位,那么 bread()
直接将结果返回给 kernel ;反之,bread()
都需要去 disk 中读取新鲜的数据,再将其返回。看下代码,
// Return a locked buf with the contents of the indicated block.
struct buf*
bread(uint dev, uint blockno)
{struct buf *b;b = bget(dev, blockno);/** 如果buf中的数据过时了,那么需要重新读取 */if(!b->valid) {virtio_disk_rw(b, 0);b->valid = 1;}return b;
}
bwrite()
是负责 Buffer cache 与 disk 之间的交互工作,它更为简单,就是将 Buffer cache 中的数据写回至 disk 中,代码如下,
// Write b's contents to disk. Must be locked.
void
bwrite(struct buf *b)
{if(!holdingsleep(&b->lock))panic("bwrite");virtio_disk_rw(b, 1);
}
讲到这,有必要展示一下 Buffer cache 与 disk 的层次模型,如下图,
disk 是硬件,处于最底层;而 Buffer cache 属于软件范畴,管理着 disk 的映像
S3 - 何时 pin() 和 unpin()
谈完了对内外的接口,开始调整几个辅助函数。首先是 brelse()
,它是 b-release 的缩写,xv6-book 提到过,许多 Unix 的接口都是采用类似风格的命名法则,可以学习一下。书中说了何时调用 brelse()
,即是在每次用完 buf 之后,其意就是,每次读 or 写 buf 之后,都应该更新 buf 的元数据,重要的是调用 releasesleep(&b->lock)
,唤醒等待该 I/O 完成的进程。先看代码,
1 void
2 brelse(struct buf *b)
3 {
4 if(!holdingsleep(&b->lock))
5 panic("brelse");6 releasesleep(&b->lock);7 int id = myhash(b->blockno);
8 acquire(&bcache.lks[id]);
9 b->refcnt--;
10 if(b->refcnt == 0)
11 b->lastuse = ticks;
12 release(&bcache.lks[id]);
13 }
可能会有疑问,为什么会增添第11行,记录下 ticks
?Lab8: locks 实验主页 是这样讲的,
Remove the list of all buffers (
bcache.head
etc.) and instead time-stamp buffers using the time of their last use (i.e., usingticks
in kernel/trap.c). With this changebrelse
doesn’t need to acquire the bcache lock, andbget
can select the least-recently used block based on the time-stamps.
我大概翻译一下,就是不再采用原先淘汰表头元素的 LRU 算法了,取而代之的是基于时间戳的 LRU 算法。这里的时间戳,就是 kernel/trap.c:ticks
。其中的 myhash()
定义如下,
static int
myhash(int x)
{return x%NBUCKET;
}
就是很简单的取余求哈希值。另外,还需要在 struct buf
类型中添加 lastuse
字段,用来记录最近一次被使用的时间节点,
struct buf {...uint lastuse; /** 最近被使用的时间戳 */
};
bpin()
和 bunpin()
相对而言就简单很多了,
void
bpin(struct buf *b)
{int id = myhash(b->blockno);acquire(&bcache.lks[id]);b->refcnt++;release(&bcache.lks[id]);
}void
bunpin(struct buf *b)
{int id = myhash(b->blockno);acquire(&bcache.lks[id]);b->refcnt--;release(&bcache.lks[id]);
}
S4 - 设计 bget()
完成了上面一系列的琐碎小事之后,进入了 Lab: Buffer cache 最重要的环节:设计 bget()
bget()
的任务,就是去 blockno 对应的哈希桶中找到编号为 dev 的 buf 。如果能在哈希桶中顺利找到,那么皆大欢喜;但是很多情况,跟生活一样,都会事与愿违。所以要认真考虑一下扑空的情况,分为两种,
- 虽然在第 id 号哈希桶中没有找到对应的 buf ,但是第 id 号哈希桶中有可供淘汰的空闲 buf 。那么
bcache
毫不犹豫,立马腾出空间,用来保存 blockno 的数据 - 接着第一种情况,在第 id 号哈希桶内也没有找到可供淘汰的空闲 buf ,那么此时就需要去挖别的哈希桶的墙角了,具体表现为向有空闲 buf 的哈希桶发出请求,过继一个 buf
如果经历过上述两种情况后,还没有找到可用的空闲 buf ,那么直接 panic ,宣布 buf 已用完。看一下代码(较长),
static struct buf*
bget(uint dev, uint blockno)
{struct buf *b;int id = myhash(blockno);acquire(&bcache.lks[id]);// Is the block already cached?for(b = bcache.buckets[id].next; b != &bcache.buckets[id]; b = b->next){if(b->dev == dev && b->blockno == blockno){b->refcnt++;release(&bcache.lks[id]);acquiresleep(&b->lock);return b;}}/*** 检查没有发现cached的 buf,那么就根据 ticks LRU 策略* 选择第 id 号哈希桶中 ticks 最小的淘汰,ticks 最小意味着* 该 buf 在众多未被使用到( b->refcnt==0 )的 buf 中是距今最远的* */// Not cached.// Recycle the least recently used (LRU) unused buffer.struct buf *victm = 0;uint minticks = ticks;for(b = bcache.buckets[id].next; b != &bcache.buckets[id]; b = b->next){if(b->refcnt==0 && b->lastuse<=minticks) {minticks = b->lastuse;victm = b;}}if(!victm) goto steal;/** * 直接覆盖待淘汰的buf,无需再将其中的旧内容写回至disk中 * 标记位valid置0,为了保证能够读取到最新的数据 * */bufinit(victm, dev, blockno);release(&bcache.lks[id]);acquiresleep(&victm->lock);return victm;steal:/** 到别的哈希桶挖 buf */for(int i=0; i<NBUCKET; i++) {if(i == id)continue;acquire(&bcache.lks[i]);minticks = ticks;for(b = bcache.buckets[i].next; b != &bcache.buckets[i]; b = b->next){if(b->refcnt==0 && b->lastuse<=minticks) {minticks = b->lastuse;victm = b;}}if(!victm) {release(&bcache.lks[i]);continue;}bufinit(victm, dev, blockno);/** 将 victm 从第 i 号哈希桶中取出来 */victm->next->prev = victm->prev;victm->prev->next = victm->next;release(&bcache.lks[i]);/** 将 victm 接入第 id 号中 */victm->next = bcache.buckets[id].next;bcache.buckets[id].next->prev = victm;bcache.buckets[id].next = victm;victm->prev = &bcache.buckets[id];release(&bcache.lks[id]);acquiresleep(&victm->lock);return victm;}release(&bcache.lks[id]);panic("bget: no buf");
}
其中的 bufinit()
是我自己追加的辅助函数,定义如下,
static void
bufinit(struct buf* b, uint dev, uint blockno)
{b->dev = dev;b->blockno = blockno;b->valid = 0;b->refcnt = 1;
}
主要就是设置 buf 的元数据。另外该函数声明为 static ,这算是一个小技巧了,这说明 bufinit()
只在 kernel/bio.c
这个文件内可见,与项目中其他文件内的同名函数并不冲突,可以理解成类似于 class 的封装手段
回到 bget()
上来,流程前面已详细交代过,这里就不赘述了。讲一个注意点,是在 steal 标签里的。在去别的哈希桶挖 buf 时,定位到合适的 buf 后,要先将其从原先(代码中的第 i 号)的哈希桶链表中取出来,然后再接入第 id 号哈希桶中。无论是取还是接,操作的本质都是对链表的插入和删除,这要记住要点:先建后拆!
另外,我选用 goto 处理 if-else 问题,我认为很正确很简洁
最后,我还想提一嘴关于锁的问题。Lab8: locks 实验主页 中说还可以用一个大锁来控制并行顺序问题,说是这样做的话较为简单,但我感觉并不如此,原话是这样的,
Some debugging tips: implement bucket locks but leave the global bcache.lock acquire/release at the beginning/end of bget to serialize the code. Once you are sure it is correct without race conditions, remove the global locks and deal with concurrency issues. You can also run
make CPUS=1 qemu
to test with one core.
不必这样调试,干脆一步到位,分布式锁没那么复杂的。以及 sleep lock 的问题,可以查阅 xv6-book 的第五章 Locks ,我这里就按照我自己的理解,大致讲一下 lock 和 sleep lock 的主要区别
- lock 主要是用来协调进程之间的顺序问题,比如进程 A 想对 buf1 进行写操作,进程 B 也想对 buf1 进行读操作。xv6 先后接收到了俩进程的请求:先写后读(这是用户的本意),那么进程 A 先抢到 buf1 的 lock 并上锁,上锁之后进程 B 只能干瞪眼等待。只有进程 A 写完后释放锁,进程 B 才有机会去读 buf1。这就很好的控制了进程对同一内存的访问顺序,另外 buf1 是在 memory 中的,所以进程 AB 访问其,是无需等待的,不需要去休眠
- sleep lock 主要是为了应对一些有 I/O 需求的情况,我们都知道跟 CPU 的速度比起来,I/O 是很慢的。一旦进程发出 I/O 请求后,xv6 都会让其去休眠,待 I/O 好了之后再唤醒它。这一套动作,就是用 sleep lock 来保证的
进程执行acquiresleep()
后,会主动让出 CPU ,然后自己去休眠。当有其他进程执行releasesleep()
之后,会被唤醒,从而继续工作
至此,已经完成了设计并优化 Buffer cache 机制的大致工作
iii. Result
手动进入 qemu
make qemu
$bcachetest
$usertests
和 Lab8: locks 实验主页 给出的结果相同就意味着没太大问题,其中 tot 量要小于 500,我这个是 0 ,实验指导说 0 最好,接近 0 也行,自己把握
VI. Reference
- CSDN - MIT 6.S081 Lab8: locks
这篇关于「实验记录」MIT 6.S081 Lab8 locks的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!