本文主要是介绍HBase-压缩和分割原理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
HRegionServer调用合并请求
主要逻辑如下:
//遍历每个Store然后计算需要合并的文件,生成
//CompactionRequest对象并提交到线程池中执行
//根据throttleCompaction()函数规则来判断是提交到
//largeCompactions线程池还是smallCompactions线程池
CompactSplitThread#requestCompaction() {for (Store s : r.getStores().values()) {CompactionRequest cr = Store.requestCompaction(priority, request);ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())? largeCompactions : smallCompactions;pool.execute(cr); ret.add(cr);}
}//如果CompactionRequest的总大小 >
//minFilesToCompact * 2 * memstoreFlushSize
//则这次任务为major合并,否则在为minor合并
Store#throttleCompaction() {long throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",2 * this.minFilesToCompact * this.region.memstoreFlushSize);return compactionSize > throttlePoint;
}Store#compactSelection() {//选择出已经过期的StoreFileif(storefile.maxTimeStamp + store.ttl < now_timestamp) {//返回已经过期的store file文件集合 }//从0开始遍历到最后,如果发现有文件 > maxCompactSize则pos++//然后过滤掉这些大于maxCompactSize的文件while (pos < compactSelection.getFilesToCompact().size() &&compactSelection.getFilesToCompact().get(pos).getReader().length()> while (pos < compactSelection.getFilesToCompact().size() &&compactSelection.getFilesToCompact().get(pos).getReader().length()> maxCompactSize &&!compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;if (pos != 0) compactSelection.clearSubList(0, pos); &&!compactSelection.getFilesToCompact().get(pos).isReference()) {++pos;}if (pos != 0) { compactSelection.clearSubList(0, pos);} if (compactSelection.getFilesToCompact().size() < minFilesToCompact) {return; }//计算出sumSize数组,数组大小就是Store中的文件数量//sumSize数组中每个元素的大小是根据StroeFile的大小再加上 sumSize[i+1](或者0)//然后减去fileSizes[tooFar](或者0)//sumSize的内容跟元素的fileSizes数组应该差别不大int countOfFiles = compactSelection.getFilesToCompact().size();long [] fileSizes = new long[countOfFiles];long [] sumSize = new long[countOfFiles];for (int i = countOfFiles-1; i >= 0; --i) {StoreFile file = compactSelection.getFilesToCompact().get(i);fileSizes[i] = file.getReader().length();// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algoint tooFar = i + this.maxFilesToCompact - 1;sumSize[i] = fileSizes[i] + ((i+1 < countOfFiles) ? sumSize[i+1] : 0)- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);}//如果fileSize[start] > Math.max(minCompactSize,sumSize[start+1] * r)//则下标++,这里的操作是过滤掉过大的文件,以免影响合并时间while(countOfFiles - start >= this.minFilesToCompact && fileSizes[start] >Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {++start;}int end = Math.min(countOfFiles, start + this.maxFilesToCompact);long totalSize = fileSizes[start] + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);compactSelection = compactSelection.getSubList(start, end);//如果是major compact,并且需要执行的文件数量过多,则去掉一些 if(majorcompaction && compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;compactSelection.getFilesToCompact().subList(0, pastMax).clear(); }
}
CompactionRequest线程(用于执行major和minor合并)
压缩相关的类图如下:
major和minor合并的差别其实很小,如果最后待合并的总大小 > 2*minFilesToCompact*memstoreFlushSize
则认为这次是一个major合并,方到major线程池中执行,否则认为是一次minor合并
另外在创建StoreScanner构造函数时,会根据ScanType来判断是major还是minor合并,之后在
ScanQueryMathcer中根据ScanType的不同(有用户类型,minor和major三种类型)来决定返回的不同值的
主要逻辑如下:
//在单独的线程中执行合并
CompactionRequest#run() {boolean completed = HRegion.compact(this);if (completed) {if (s.getCompactPriority() <= 0) {server.getCompactSplitThread().requestCompaction(r, s, "Recursive enqueue", null);} else {// see if the compaction has caused us to exceed max region sizeserver.getCompactSplitThread().requestSplit(r);} }
}//这里会调用Store,来执行compact
HRegion#compact() {Preconditions.checkArgument(cr.getHRegion().equals(this));lock.readLock().lock();C
这篇关于HBase-压缩和分割原理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!