本文主要是介绍6.830 / 6.814: Syllabus 2021 - MIT Lab 2 - SimpleDB Operators,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 1.参考链接:
- 2.SimpleDB Architecture and Implementation Guide
- 2.1Filter and Join
- 2.2 Aggregates
- 2.3HeapFile Mutability
- 2.4. Insertion and deletion
- 2.5. Page eviction
在此实验中,您将为SimpleDB编写一组运算符,以实现表修改(例如,插入和删除记录),选择,连接和聚合。 这些将构建在实验1中写入的基础之上,为您提供一个可以在多个表上执行简单查询的数据库系统。
此外,我们忽略了实验中的缓冲池管理问题:我们没有处理引用多个页面时出现的问题。 在Lab 2中,您将设计一个驱动策略来从缓冲池中刷新陈旧页面。
您无需在此实验中实现事务或锁定。
1.参考链接:
1.1 课程链接:http://db.lcs.mit.edu/6.830/assign.php
1.2 lab2链接:https://github.com/MIT-DB-Class/simple-db-hw-2021/blob/master/lab2.md
1.3 HearmingBear Lab2链接:https://blog.csdn.net/hjw199666/article/details/103590963
1.4 我的仓库地址:https://gitee.com/zhou-zhiqiang/db-class
2.SimpleDB Architecture and Implementation Guide
2.1Filter and Join
Filter: This operator only returns tuples that satisfy a Predicate that is specified as part of its constructor. Hence, it filters out any tuples that do not match the predicate.
Filter:此操作符只返回满足Predicate的tuples,该tuples被指定为其构造函数的一部分。 因此,它过滤出与Predicate不匹配的任何tuples。
Join: This operator joins tuples from its two children according to a JoinPredicate that is passed in as part of its constructor.
Join:此操作符根据JoinPredicate连接两个tuples。
Exercise1.
src/java/simpledb/execution/Predicate.java
src/java/simpledb/execution/JoinPredicate.java
src/java/simpledb/execution/Filter.java
src/java/simpledb/execution/Join.java
src/java/simpledb/execution/Predicate.java
package simpledb.execution;import simpledb.storage.Field;
import simpledb.storage.Tuple;import java.io.Serializable;/*** Predicate compares tuples to a specified Field value.*/
public class Predicate implements Serializable {private static final long serialVersionUID = 1L;/** Constants used for return codes in Field.compare */public enum Op implements Serializable {EQUALS, GREATER_THAN, LESS_THAN, LESS_THAN_OR_EQ, GREATER_THAN_OR_EQ, LIKE, NOT_EQUALS;/*** Interface to access operations by integer value for command-line* convenience.* * @param i* a valid integer Op index*/public static Op getOp(int i) {return values()[i];}public String toString() {if (this == EQUALS)return "=";if (this == GREATER_THAN)return ">";if (this == LESS_THAN)return "<";if (this == LESS_THAN_OR_EQ)return "<=";if (this == GREATER_THAN_OR_EQ)return ">=";if (this == LIKE)return "LIKE";if (this == NOT_EQUALS)return "<>";throw new IllegalStateException("impossible to reach here");}}private final int field;private final Op op;private final Field operand;/*** Constructor.* * @param field* field number of passed in tuples to compare against.* @param op* operation to use for comparison* @param operand* field value to compare passed in tuples to*/public Predicate(int field, Op op, Field operand) {// some code goes herethis.field = field;this.op = op;this.operand = operand;}/*** @return the field number*/public int getField(){// some code goes herereturn field;}/*** @return the operator*/public Op getOp(){// some code goes herereturn op;}/*** @return the operand*/public Field getOperand(){// some code goes herereturn operand;}/*** Compares the field number of t specified in the constructor to the* operand field specified in the constructor using the operator specific in* the constructor. The comparison can be made through Field's compare* method.* * @param t* The tuple to compare against* @return true if the comparison is true, false otherwise.*/public boolean filter(Tuple t) {// some code goes herereturn t.getField(field).compare(op,operand);}/*** Returns something useful, like "f = field_id op = op_string operand =* operand_string"*/public String toString() {// some code goes hereString s = String.format("f = %d op = %s operand = %s",field,op.toString(),operand.toString());return "";}
}
字段及含义
type | name | description |
---|---|---|
int | field | field id |
Op | op | 操作符 |
Field | oprand | 比较的field |
Op:
实现了Serializable接口的枚举类型。
| EQUALS | GREATER_THAN | LESS_THAN | LESS_THAN_OR_EQ | GREATER_THAN_OR_EQ | LIKE | NOT_EQUALS |
| :----: | :----------: | :-------: | :-------------: | :----------------: | ---- | :--------: |
| = | > | < | <= | >= | like | != |
提供了getOp、toString方法。
Predicate:
构造器,字段id、操作、比较字段的数。
filter:
获取需要比较元组field对应的值,与当前构造器的oprand比较,通过op操作符。
getField、getOp、getOperand、toString:
略
src/java/simpledb/execution/JoinPredicate.java
package simpledb.execution;import simpledb.storage.Tuple;import java.io.Serializable;/*** JoinPredicate compares fields of two tuples using a predicate. JoinPredicate* is most likely used by the Join operator.*/
public class JoinPredicate implements Serializable {private static final long serialVersionUID = 1L;private final int field1;private final int field2;private final Predicate.Op op;/*** Constructor -- create a new predicate over two fields of two tuples.* * @param field1* The field index into the first tuple in the predicate* @param field2* The field index into the second tuple in the predicate* @param op* The operation to apply (as defined in Predicate.Op); either* Predicate.Op.GREATER_THAN, Predicate.Op.LESS_THAN,* Predicate.Op.EQUAL, Predicate.Op.GREATER_THAN_OR_EQ, or* Predicate.Op.LESS_THAN_OR_EQ* @see Predicate*/public JoinPredicate(int field1, Predicate.Op op, int field2) {// some code goes herethis.field1 = field1;this.op = op;this.field2 = field2;}/*** Apply the predicate to the two specified tuples. The comparison can be* made through Field's compare method.* * @return true if the tuples satisfy the predicate.*/public boolean filter(Tuple t1, Tuple t2) {// some code goes herereturn t1.getField(field1).compare(op, t2.getField(field2));}public int getField1(){// some code goes herereturn field1;}public int getField2(){// some code goes herereturn field2;}public Predicate.Op getOperator(){// some code goes herereturn op;}
}
属性定义:
field1是第一个tuple的field id,field2是第二个tuple的field id,它们直接通过op操作过滤不符合条件的元组。
关键方法:
filter:过滤掉不符合条件的元组。
类似mysql的join…on后的on
src/java/simpledb/execution/Filter.java
package simpledb.execution;import simpledb.transaction.TransactionAbortedException;
import simpledb.common.DbException;
import simpledb.storage.Tuple;
import simpledb.storage.TupleDesc;import java.util.*;/*** Filter is an operator that implements a relational select.*/
public class Filter extends Operator {private static final long serialVersionUID = 1L;private final Predicate p;private OpIterator child;/*** Constructor accepts a predicate to apply and a child operator to read* tuples to filter from.* * @param p* The predicate to filter tuples with* @param child* The child operator*/public Filter(Predicate p, OpIterator child) {// some code goes herethis.p = p;this.child = child;}public Predicate getPredicate() {// some code goes herereturn p;}public TupleDesc getTupleDesc() {// some code goes herereturn child.getTupleDesc();}public void open() throws DbException, NoSuchElementException,TransactionAbortedException {// some code goes herechild.open();super.open();}public void close() {// some code goes heresuper.close();child.close();}public void rewind() throws DbException, TransactionAbortedException {// some code goes herechild.rewind();}/*** AbstractDbIterator.readNext implementation. Iterates over tuples from the* child operator, applying the predicate to them and returning those that* pass the predicate (i.e. for which the Predicate.filter() returns true.)* * @return The next tuple that passes the filter, or null if there are no* more tuples* @see Predicate#filter*/protected Tuple fetchNext() throws NoSuchElementException,TransactionAbortedException, DbException {// some code goes herewhile (child.hasNext()){Tuple t = child.next();if (p.filter(t)){return t;}}return null;}@Overridepublic OpIterator[] getChildren() {// some code goes herereturn new OpIterator[] {this.child};}@Overridepublic void setChildren(OpIterator[] children) {// some code goes herechild = children[0];}}
属性定义:
定义了一个predicate过滤tuple,以及一个child表示要过滤元组的迭代器对象。
关键方法:
fetchNext:如果child含有符合predicate过滤条件的tuple,则返回,否则返回null。
例如满足过滤条件的元组,将会参与join操作。
src/java/simpledb/execution/Join.java
package simpledb.execution;import simpledb.transaction.TransactionAbortedException;
import simpledb.common.DbException;
import simpledb.storage.Tuple;
import simpledb.storage.TupleDesc;import java.util.*;/*** The Join operator implements the relational join operation.*/
public class Join extends Operator {private static final long serialVersionUID = 1L;private JoinPredicate p;private OpIterator child1;private OpIterator child2;private Tuple t;/*** Constructor. Accepts two children to join and the predicate to join them* on* * @param p* The predicate to use to join the children* @param child1* Iterator for the left(outer) relation to join* @param child2* Iterator for the right(inner) relation to join*/public Join(JoinPredicate p, OpIterator child1, OpIterator child2) {// some code goes herethis.p = p;this.child1 = child1;this.child2 = child2;t = null;}public JoinPredicate getJoinPredicate() {// some code goes herereturn p;}/*** @return* the field name of join field1. Should be quantified by* alias or table name.* */public String getJoinField1Name() {// some code goes herereturn child1.getTupleDesc().getFieldName(p.getField1());}/*** @return* the field name of join field2. Should be quantified by* alias or table name.* */public String getJoinField2Name() {// some code goes herereturn child2.getTupleDesc().getFieldName(p.getField2());}/*** @see TupleDesc#merge(TupleDesc, TupleDesc) for possible* implementation logic.*/public TupleDesc getTupleDesc() {// some code goes herereturn TupleDesc.merge(child1.getTupleDesc(),child2.getTupleDesc());}public void open() throws DbException, NoSuchElementException,TransactionAbortedException {// some code goes herechild1.open();child2.open();super.open();}public void close() {// some code goes heresuper.close();child2.close();child1.close();}public void rewind() throws DbException, TransactionAbortedException {// some code goes herechild1.rewind();child2.rewind();}/*** Returns the next tuple generated by the join, or null if there are no* more tuples. Logically, this is the next tuple in r1 cross r2 that* satisfies the join predicate. There are many possible implementations;* the simplest is a nested loops join.* <p>* Note that the tuples returned from this particular implementation of Join* are simply the concatenation of joining tuples from the left and right* relation. Therefore, if an equality predicate is used there will be two* copies of the join attribute in the results. (Removing such duplicate* columns can be done with an additional projection operator if needed.)* <p>* For example, if one tuple is {1,2,3} and the other tuple is {1,5,6},* joined on equality of the first column, then this returns {1,2,3,1,5,6}.* * @return The next matching tuple.* @see JoinPredicate#filter*/protected Tuple fetchNext() throws TransactionAbortedException, DbException {// some code goes herewhile (this.child1.hasNext() || this.t != null) {if (this.child1.hasNext() && this.t == null){t = child1.next();}while (child2.hasNext()){Tuple t2 = child2.next();if (p.filter(t,t2)){TupleDesc td1 = t.getTupleDesc();TupleDesc td2 = t2.getTupleDesc();TupleDesc newTd = TupleDesc.merge(td1,td2);Tuple newTuple = new Tuple(newTd);newTuple.setRecordId(t.getRecordId());int i = 0;for (;i < td1.numFields();i++){newTuple.setField(i,t.getField(i));}for (int j = 0; j < td2.numFields();j++){newTuple.setField(i+j,t2.getField(j));}if (!child2.hasNext()){child2.rewind();t = null;}return newTuple;}}child2.rewind();t = null;}return null;}@Overridepublic OpIterator[] getChildren() {// some code goes herereturn new OpIterator[] {child1,child2};}@Overridepublic void setChildren(OpIterator[] children) {// some code goes herechild1 = children[0];child2 = children[1];}}
属性定义:
定义了JoinPredicate对象p用于过滤tuple,child1作为第一个表的迭代器对象,child2作为第二个表的迭代器对象,以及临时变量t保存第一个表的tuple。
关键方法:
fetchNext:遍历第二个表的tuple,查看符合filter条件的tuple,拼接成为一个新的tuple并返回,第二个回到起点。
类似mysql
ant runtest -Dtest=PredicateTest
ant runtest -Dtest=JoinPredicateTest
ant runtest -Dtest=FilterTest
ant runtest -Dtest=JoinTest
这些单元测试都BUILD SUCCESSFUL。
ant runsystest -Dtest=FilterTest
ant runsystest -Dtest=JoinTest
这些系统测试都BUILD SUCCESSFUL。
2.2 Aggregates
src/java/simpledb/execution/IntegerAggregator.java
src/java/simpledb/execution/StringAggregator.java
src/java/simpledb/execution/Aggregate.java
src/java/simpledb/execution/IntegerAggregator.java
package simpledb.execution;import simpledb.common.DbException;
import simpledb.common.Type;
import simpledb.storage.Field;
import simpledb.storage.IntField;
import simpledb.storage.Tuple;
import simpledb.storage.TupleDesc;
import simpledb.transaction.TransactionAbortedException;import java.util.*;/*** Knows how to compute some aggregate over a set of IntFields.*/
public class IntegerAggregator implements Aggregator {private static final long serialVersionUID = 1L;private int gbfield;// group-by fieldprivate Type gbfieldtype;private int afield; //agregate fieldprivate Op what;/*** @return SUM,MIN,MAX,COUNT* **/private Map<Field,Integer> groupMap;private Map<Field,Integer> countMap;private Map<Field, List<Integer>> avgMap;/*** Aggregate constructor* * @param gbfield* the 0-based index of the group-by field in the tuple, or* NO_GROUPING if there is no grouping* @param gbfieldtype* the type of the group by field (e.g., Type.INT_TYPE), or null* if there is no grouping* @param afield* the 0-based index of the aggregate field in the tuple* @param what* the aggregation operator*/public IntegerAggregator(int gbfield, Type gbfieldtype, int afield, Op what) {// some code goes herethis.gbfield = gbfield;this.gbfieldtype = gbfieldtype;this.afield = afield;this.what = what;this.groupMap = new HashMap<>();this.countMap = new HashMap<>();this.avgMap = new HashMap<>();}/*** Merge a new tuple into the aggregate, grouping as indicated in the* constructor* * @param tup* the Tuple containing an aggregate field and a group-by field*/public void mergeTupleIntoGroup(Tuple tup) {// some code goes hereIntField afield = (IntField) tup.getField(this.afield);Field gbfield = this.gbfield == NO_GROUPING ? null : tup.getField(this.gbfield);int newVaule = afield.getValue();//IntField.getValue()if (gbfield != null && gbfield.getType()!=this.gbfieldtype){ //comparethrow new IllegalArgumentException("Given tuple has wrong type");}switch (this.what){case MIN:/*** Places the current new tuple in the aggregate tuple* */if (!this.groupMap.containsKey(gbfield)){this.groupMap.put(gbfield,newVaule);}else {this.groupMap.put(gbfield,Math.min(this.groupMap.get(gbfield),newVaule));}break;case MAX:if (!this.groupMap.containsKey(gbfield)){this.groupMap.put(gbfield,newVaule);}else {this.groupMap.put(gbfield,Math.max(this.groupMap.get(gbfield),newVaule));}break;case SUM:if (!this.groupMap.containsKey(gbfield)){this.groupMap.put(gbfield,newVaule);}else {this.groupMap.put(gbfield,this.groupMap.get(gbfield)+newVaule);}break;case COUNT:if (!this.groupMap.containsKey(gbfield)){this.groupMap.put(gbfield,1);}else {this.groupMap.put(gbfield,this.groupMap.get(gbfield)+1);}break;case SC_AVG:IntField countFiled = null;if (gbfield == null){countFiled = (IntField) tup.getField(1);} else {countFiled = (IntField) tup.getField(2);}int countValue = countFiled.getValue();if (!this.groupMap.containsKey(gbfield)){this.groupMap.put(gbfield,newVaule);this.countMap.put(gbfield,countValue);} else {this.groupMap.put(gbfield,this.groupMap.get(gbfield) + newVaule);this.countMap.put(gbfield,this.countMap.get(gbfield) + countValue);}break;case AVG:if (!this.avgMap.containsKey(gbfield)){List<Integer> l = new ArrayList<>();l.add(newVaule);this.avgMap.put(gbfield,l);}else {List<Integer> l = this.avgMap.get(gbfield);l.add(newVaule);}break;default:throw new IllegalArgumentException("Aggregate not supported!");}}/*** Create a OpIterator over group aggregate results.* * @return a OpIterator whose tuples are the pair (groupVal, aggregateVal)* if using group, or a single (aggregateVal) if no grouping. The* aggregateVal is determined by the type of aggregate specified in* the constructor.*/public OpIterator iterator() {// some code goes herereturn new IntAggIterator();}private class IntAggIterator extends AggregateIterator{private Iterator<Map.Entry<Field,List<Integer>>> avgIt;private boolean isAvg;private boolean isSCAvg;private boolean isSumCount;public IntAggIterator() {super(groupMap, gbfieldtype);this.isAvg = what.equals(Op.AVG);this.isSCAvg = what.equals(Op.SC_AVG);this.isSumCount = what.equals(Op.SUM_COUNT);if (isSumCount){this.td = new TupleDesc(new Type[] {this.itgbfieldtype,Type.INT_TYPE,Type.INT_TYPE},new String[]{"groupVal","sumVal","countVal"});}}@Overridepublic void open() throws DbException, TransactionAbortedException {super.open();if (this.isAvg || this.isSumCount){this.avgIt = avgMap.entrySet().iterator();} else {this.avgIt = null;}}@Overridepublic boolean hasNext() throws DbException, TransactionAbortedException {if (this.isAvg || this.isSumCount){return avgIt.hasNext();}return super.hasNext();}@Overridepublic Tuple next() throws DbException, TransactionAbortedException, NoSuchElementException {Tuple rtn = new Tuple(td);if (this.isAvg || this.isSumCount){Map.Entry<Field,List<Integer>> avgOrSumCountEntry = this.avgIt.next();Field avgOrSumCountField = avgOrSumCountEntry.getKey();List<Integer> avgOrSumCountList = avgOrSumCountEntry.getValue();if (this.isAvg){int value = this.sumList(avgOrSumCountList) / avgOrSumCountList.size();this.setFields(rtn,value,avgOrSumCountField);return rtn;} else {this.setFields(rtn,sumList(avgOrSumCountList),avgOrSumCountField);if (avgOrSumCountField != null){rtn.setField(2,new IntField(avgOrSumCountList.size()));} else {rtn.setField(1,new IntField(avgOrSumCountList.size()));}}} else if (this.isSCAvg){Map.Entry<Field,Integer> entry = this.it.next();Field f = entry.getKey();this.setFields(rtn,entry.getValue() / countMap.get(f), f);return rtn;}return super.next();}@Overridepublic void rewind() throws DbException, TransactionAbortedException {super.rewind();if (this.isAvg || this.isSumCount){this.avgIt = avgMap.entrySet().iterator();}}@Overridepublic TupleDesc getTupleDesc() {return super.getTupleDesc();}@Overridepublic void close() {super.close();this.avgIt = null;}private int sumList(List<Integer> l){int sum = 0;for (int i:l) {sum += i;}return sum;}}}
属性定义:
定义了gbfield表示group-by field id,gbfieldtype表示相应类型,afield表示agregate field id,what表示聚合类型有MIN, MAX, SUM, AVG, COUNT。
groupMap保存聚合的大部分情况,countMap保存每个分组的数目,avgMap保存需要统计平均值的value。
关键方法:
mergeTupleIntoGroup:合并tuple到group中,包括MIN, MAX, SUM, AVG, COUNT,SC_AVG操作。
iterator:返回一个IntAggIterator对象,包含聚合结果的迭代器。
src/java/simpledb/execution/StringAggregator.java
package simpledb.execution;import simpledb.common.Type;
import simpledb.storage.Field;
import simpledb.storage.StringField;
import simpledb.storage.Tuple;import java.util.HashMap;
import java.util.Map;/*** Knows how to compute some aggregate over a set of StringFields.*/
public class StringAggregator implements Aggregator {private static final long serialVersionUID = 1L;private int gbfield;private Type gbfieldtype;private int afield;private Op what;private Map<Field,Integer> groupMap;/*** Aggregate constructor* @param gbfield the 0-based index of the group-by field in the tuple, or NO_GROUPING if there is no grouping* @param gbfieldtype the type of the group by field (e.g., Type.INT_TYPE), or null if there is no grouping* @param afield the 0-based index of the aggregate field in the tuple* @param what aggregation operator to use -- only supports COUNT* @throws IllegalArgumentException if what != COUNT*/public StringAggregator(int gbfield, Type gbfieldtype, int afield, Op what) {// some code goes hereif (!what.equals(Op.COUNT)){throw new IllegalArgumentException("Only COUNT is supported for String fields!");}this.gbfield = gbfield;this.gbfieldtype = gbfieldtype;this.afield = afield;this.what = what;this.groupMap = new HashMap<>();}/*** Merge a new tuple into the aggregate, grouping as indicated in the constructor* @param tup the Tuple containing an aggregate field and a group-by field*/public void mergeTupleIntoGroup(Tuple tup) {// some code goes hereStringField afield = (StringField) tup.getField(this.afield);Field gbfield = this.gbfield == NO_GROUPING ? null : tup.getField(this.gbfield);//group by fieldString newValue = afield.getValue();//aggregate fieldif (gbfield != null && gbfield.getType() != this.gbfieldtype){throw new IllegalArgumentException("Given tuple has wrong type");}if (!this.groupMap.containsKey(gbfield)){this.groupMap.put(gbfield,1);} else {this.groupMap.put(gbfield,this.groupMap.get(gbfield) + 1);}}/*** Create a OpIterator over group aggregate results.** @return a OpIterator whose tuples are the pair (groupVal,* aggregateVal) if using group, or a single (aggregateVal) if no* grouping. The aggregateVal is determined by the type of* aggregate specified in the constructor.*/public OpIterator iterator() {// some code goes herereturn new AggregateIterator(this.groupMap,this.gbfieldtype);}}
属性定义:
定义了gbfield表示group-by field id,gbfieldtype表示相应类型,afield表示agregate field id,what表示聚合类型有COUNT。
groupMap保存聚合的COUNT情况
关键方法:
mergeTupleIntoGroup:合并tuple到group中,包括COUNT操作。
iterator:返回一个AggregateIterator对象,包含聚合结果的迭代器。
src/java/simpledb/execution/Aggregate.java
package simpledb.execution;import simpledb.common.DbException;
import simpledb.common.Type;
import simpledb.storage.Tuple;
import simpledb.storage.TupleDesc;
import simpledb.transaction.TransactionAbortedException;import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;/*** The Aggregation operator that computes an aggregate (e.g., sum, avg, max,* min). Note that we only support aggregates over a single column, grouped by a* single column.*/
public class Aggregate extends Operator {private static final long serialVersionUID = 1L;private OpIterator child;private int afield;private int gfield;private Aggregator.Op aop;private Aggregator aggregator;private OpIterator it;private TupleDesc td;/*** Constructor.* <p>* Implementation hint: depending on the type of afield, you will want to* construct an {@link IntegerAggregator} or {@link StringAggregator} to help* you with your implementation of readNext().** @param child The OpIterator that is feeding us tuples.* @param afield The column over which we are computing an aggregate.* @param gfield The column over which we are grouping the result, or -1 if* there is no grouping* @param aop The aggregation operator to use*/public Aggregate(OpIterator child, int afield, int gfield, Aggregator.Op aop) {// some code goes herethis.child = child;this.afield = afield;this.gfield = gfield;this.aop = aop;Type gfieldtype = gfield == -1 ? null : this.child.getTupleDesc().getFieldType(this.gfield);//group by field typeif (this.child.getTupleDesc().getFieldType(this.afield) == (Type.STRING_TYPE)){this.aggregator = new StringAggregator(this.gfield,gfieldtype,this.afield,this.aop);} else {this.aggregator = new IntegerAggregator(this.gfield,gfieldtype,this.afield,this.aop);}this.it = this.aggregator.iterator();List<Type> types = new ArrayList<>();List<String> names = new ArrayList<>();if (gfieldtype != null){types.add(gfieldtype);//group by field typenames.add(this.child.getTupleDesc().getFieldName(this.afield));//field name}types.add(this.child.getTupleDesc().getFieldType(this.afield));names.add(this.child.getTupleDesc().getFieldName(this.afield));if (aop.equals(Aggregator.Op.SUM_COUNT)){types.add(Type.INT_TYPE);names.add("COUNT");}this.td = new TupleDesc(types.toArray(new Type[types.size()]),names.toArray(new String[names.size()]));}/*** @return If this aggregate is accompanied by a groupby, return the groupby* field index in the <b>INPUT</b> tuples. If not, return* {@link Aggregator#NO_GROUPING}*/public int groupField() {// some code goes herereturn this.gfield;}/*** @return If this aggregate is accompanied by a group by, return the name* of the groupby field in the <b>OUTPUT</b> tuples. If not, return* null;*/public String groupFieldName() {// some code goes herereturn this.td.getFieldName(0);}/*** @return the aggregate field*/public int aggregateField() {// some code goes herereturn this.afield;}/*** @return return the name of the aggregate field in the <b>OUTPUT</b>* tuples*/public String aggregateFieldName() {// some code goes hereif (this.gfield == -1){return this.td.getFieldName(0);} else {return this.td.getFieldName(1);}}/*** @return return the aggregate operator*/public Aggregator.Op aggregateOp() {// some code goes herereturn this.aop;}public static String nameOfAggregatorOp(Aggregator.Op aop) {return aop.toString();}public void open() throws NoSuchElementException, DbException,TransactionAbortedException {// some code goes herethis.child.open();while(this.child.hasNext()){this.aggregator.mergeTupleIntoGroup(this.child.next());}this.it.open();super.open();}/*** Returns the next tuple. If there is a group by field, then the first* field is the field by which we are grouping, and the second field is the* result of computing the aggregate. If there is no group by field, then* the result tuple should contain one field representing the result of the* aggregate. Should return null if there are no more tuples.*/protected Tuple fetchNext() throws TransactionAbortedException, DbException {// some code goes herewhile(this.it.hasNext()){return this.it.next();}return null;}public void rewind() throws DbException, TransactionAbortedException {// some code goes herethis.child.rewind();this.it.rewind();}/*** Returns the TupleDesc of this Aggregate. If there is no group by field,* this will have one field - the aggregate column. If there is a group by* field, the first field will be the group by field, and the second will be* the aggregate value column.* <p>* The name of an aggregate column should be informative. For example:* "aggName(aop) (child_td.getFieldName(afield))" where aop and afield are* given in the constructor, and child_td is the TupleDesc of the child* iterator.*/public TupleDesc getTupleDesc() {// some code goes herereturn this.td;}public void close() {// some code goes heresuper.close();this.child.close();this.it.close();}@Overridepublic OpIterator[] getChildren() {// some code goes herereturn new OpIterator[]{this.child};}@Overridepublic void setChildren(OpIterator[] children) {// some code goes herethis.child = children[0];List<Type> types = new ArrayList<>();List<String> names = new ArrayList<>();Type gfieldtype = gfield == -1 ? null : this.child.getTupleDesc().getFieldType(gfield);if (gfieldtype != null){types.add(gfieldtype);names.add(this.child.getTupleDesc().getFieldName(gfield));}types.add(this.child.getTupleDesc().getFieldType(afield));names.add(this.child.getTupleDesc().getFieldName(afield));if (aop.equals(Aggregator.Op.SUM_COUNT)){types.add(Type.INT_TYPE);names.add("COUNT");}this.td = new TupleDesc(types.toArray(new Type[types.size()]),names.toArray(new String[names.size()]));}
}
属性定义:
private OpIterator child; //提供tuple的迭代器
private int afield;//被聚合字段
private int gfield;//聚合字段
private Aggregator.Op aop;//聚合操作,如count
private Aggregator aggregator;//聚合器
private OpIterator it;//聚合器的迭代器
private TupleDesc td;//保存聚合、被聚合字段的元组描述
关键方法:
open:聚合每个结果到aggregator
fetchNext:依次取结果。
ant runtest -Dtest=IntegerAggregatorTest
ant runtest -Dtest=StringAggregatorTest
ant runtest -Dtest=FilterTest 和 ant runtest -Dtest=AggregateTest
这些单元测试都BUILD SUCCESSFUL。
ant runsystest -Dtest=AggregateTest
这些系统测试BUILD SUCCESSFUL。
2.3HeapFile Mutability
src/java/simpledb/storage/HeapPage.java
package simpledb.storage;import simpledb.common.Catalog;
import simpledb.common.Database;
import simpledb.common.DbException;
import simpledb.transaction.TransactionId;import java.io.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.NoSuchElementException;/*** Each instance of HeapPage stores data for one page of HeapFiles and* implements the Page interface that is used by BufferPool.** @see HeapFile* @see BufferPool**/
public class HeapPage implements Page {final HeapPageId pid;final TupleDesc td;final byte[] header;final Tuple[] tuples;final int numSlots;byte[] oldData;private final Byte oldDataLock= (byte) 0;/*** the transaction id which changed the page to dirty* */private TransactionId dirtyId;private boolean dirty;/*** Create a HeapPage from a set of bytes of data read from disk.* The format of a HeapPage is a set of header bytes indicating* the slots of the page that are in use, some number of tuple slots.* Specifically, the number of tuples is equal to: <p>* floor((BufferPool.getPageSize()*8) / (tuple size * 8 + 1))* <p> where tuple size is the size of tuples in this* database table, which can be determined via {@link Catalog#getTupleDesc}.* The number of 8-bit header words is equal to:* <p>* ceiling(no. tuple slots / 8)* <p>* @see Database#getCatalog* @see Catalog#getTupleDesc* @see BufferPool#getPageSize()*/public HeapPage(HeapPageId id, byte[] data) throws IOException {this.pid = id;this.td = Database.getCatalog().getTupleDesc(id.getTableId());this.numSlots = getNumTuples();DataInputStream dis = new DataInputStream(new ByteArrayInputStream(data));// allocate and read the header slots of this pageheader = new byte[getHeaderSize()];for (int i=0; i<header.length; i++)header[i] = dis.readByte();tuples = new Tuple[numSlots];try{// allocate and read the actual records of this pagefor (int i=0; i<tuples.length; i++)tuples[i] = readNextTuple(dis,i);}catch(NoSuchElementException e){e.printStackTrace();}dis.close();setBeforeImage();}/** Retrieve the number of tuples on this page.@return the number of tuples on this page*/private int getNumTuples() {// some code goes hereint num = (int) Math.floor((BufferPool.getPageSize()*8*1.0)/(td.getSize()*8 + 1));return num;}/*** Computes the number of bytes in the header of a page in a HeapFile with each tuple occupying tupleSize bytes* @return the number of bytes in the header of a page in a HeapFile with each tuple occupying tupleSize bytes*/private int getHeaderSize() {// some code goes herereturn (int) Math.ceil(getNumTuples()*1.0/8);}/** Return a view of this page before it was modified-- used by recovery */public HeapPage getBeforeImage(){try {byte[] oldDataRef = null;synchronized(oldDataLock){oldDataRef = oldData;}return new HeapPage(pid,oldDataRef);} catch (IOException e) {e.printStackTrace();//should never happen -- we parsed it OK before!System.exit(1);}return null;}public void setBeforeImage() {synchronized(oldDataLock){oldData = getPageData().clone();}}/*** @return the PageId associated with this page.*/public HeapPageId getId() {// some code goes herereturn pid;}/*** Suck up tuples from the source file.*/private Tuple readNextTuple(DataInputStream dis, int slotId) throws NoSuchElementException {// if associated bit is not set, read forward to the next tuple, and// return null.if (!isSlotUsed(slotId)) {for (int i=0; i<td.getSize(); i++) {try {dis.readByte();} catch (IOException e) {throw new NoSuchElementException("error reading empty tuple");}}return null;}// read fields in the tupleTuple t = new Tuple(td);RecordId rid = new RecordId(pid, slotId);t.setRecordId(rid);try {for (int j=0; j<td.numFields(); j++) {Field f = td.getFieldType(j).parse(dis);t.setField(j, f);}} catch (java.text.ParseException e) {e.printStackTrace();throw new NoSuchElementException("parsing error!");}return t;}/*** Generates a byte array representing the contents of this page.* Used to serialize this page to disk.* <p>* The invariant here is that it should be possible to pass the byte* array generated by getPageData to the HeapPage constructor and* have it produce an identical HeapPage object.** @see #HeapPage* @return A byte array correspond to the bytes of this page.*/public byte[] getPageData() {int len = BufferPool.getPageSize();ByteArrayOutputStream baos = new ByteArrayOutputStream(len);DataOutputStream dos = new DataOutputStream(baos);// create the header of the pagefor (byte b : header) {try {dos.writeByte(b);} catch (IOException e) {// this really shouldn't happene.printStackTrace();}}// create the tuplesfor (int i=0; i<tuples.length; i++) {// empty slotif (!isSlotUsed(i)) {for (int j=0; j<td.getSize(); j++) {try {dos.writeByte(0);} catch (IOException e) {e.printStackTrace();}}continue;}// non-empty slotfor (int j=0; j<td.numFields(); j++) {Field f = tuples[i].getField(j);try {f.serialize(dos);} catch (IOException e) {e.printStackTrace();}}}// paddingint zerolen = BufferPool.getPageSize() - (header.length + td.getSize() * tuples.length); //- numSlots * td.getSize();byte[] zeroes = new byte[zerolen];try {dos.write(zeroes, 0, zerolen);} catch (IOException e) {e.printStackTrace();}try {dos.flush();} catch (IOException e) {e.printStackTrace();}return baos.toByteArray();}/*** Static method to generate a byte array corresponding to an empty* HeapPage.* Used to add new, empty pages to the file. Passing the results of* this method to the HeapPage constructor will create a HeapPage with* no valid tuples in it.** @return The returned ByteArray.*/public static byte[] createEmptyPageData() {int len = BufferPool.getPageSize();return new byte[len]; //all 0}/*** Delete the specified tuple from the page; the corresponding header bit should be updated to reflect* that it is no longer stored on any page.* @throws DbException if this tuple is not on this page, or tuple slot is* already empty.* @param t The tuple to delete*/public void deleteTuple(Tuple t) throws DbException {// some code goes here// not necessary for lab1if (t.getRecordId() == null){throw new DbException("tuple does not exist");}int tid = t.getRecordId().getTupleNumber();if (tid >= numSlots || tuples[tid] == null){throw new DbException("tuple does not exist");}if (!isSlotUsed(tid)){throw new DbException("the slot is already empty");}
// else {
// markSlotUsed(tid,false);
// tuples[tid] = null;
// }if (tuples[tid].equals(t)){tuples[tid] = null;markSlotUsed(tid,false);return;}throw new DbException("tuple does not exist");}/*** Adds the specified tuple to the page; the tuple should be updated to reflect* that it is now stored on this page.* @throws DbException if the page is full (no empty slots) or tupledesc* is mismatch.* @param t The tuple to add.*/public void insertTuple(Tuple t) throws DbException {// some code goes here// not necessary for lab1if (getNumEmptySlots() == 0 || !t.getTupleDesc().equals(td)){throw new DbException("page is full or tuple descriptor does not match");}for (int i = 0; i < numSlots; i++) {if (!isSlotUsed(i)){markSlotUsed(i,true);t.setRecordId(new RecordId(pid,i));tuples[i] = t;break;}}}/*** Marks this page as dirty/not dirty and record that transaction* that did the dirtying*/public void markDirty(boolean dirty, TransactionId tid) {// some code goes here// not necessary for lab1this.dirty = dirty;this.dirtyId = tid;}/*** Returns the tid of the transaction that last dirtied this page, or null if the page is not dirty*/public TransactionId isDirty() {// some code goes here// Not necessary for lab1return this.dirty ? this.dirtyId : null;}/*** Returns the number of empty slots on this page.*/public int getNumEmptySlots() {// some code goes hereint cnt = 0;for (int i = 0; i < numSlots; i++) {if (!isSlotUsed(i)){++cnt;}}return cnt;}/*** Returns true if associated slot on this page is filled.*/public boolean isSlotUsed(int i) {// some code goes hereint quot = i / 8;int remainder = i % 8;int bitidx = header[quot];int bit = (bitidx >> remainder) & 1;return bit == 1;}/*** Abstraction to fill or clear a slot on this page.*/private void markSlotUsed(int i, boolean value) {// some code goes here// not necessary for lab1byte b = header[Math.floorDiv(i,8)];byte mask = (byte)(1<<(i%8));if (value){header[Math.floorDiv(i,8)] = (byte) (b | mask);} else {header[Math.floorDiv(i,8)] = (byte) (b & (~mask));}}/*** @return an iterator over all tuples on this page (calling remove on this iterator throws an UnsupportedOperationException)* (note that this iterator shouldn't return tuples in empty slots!)*/public Iterator<Tuple> iterator() {// some code goes hereArrayList<Tuple> filleTuples = new ArrayList<>();for (int i = 0; i < numSlots; i++) {if (isSlotUsed(i)){filleTuples.add(tuples[i]);}}return filleTuples.iterator();}}
变量定义:
final HeapPageId pid;//页面ID值
final TupleDesc td;//元组描述
final byte[] header;
final Tuple[] tuples;
final int numSlots;
byte[] oldData;
private final Byte oldDataLock= (byte) 0;
private TransactionId dirtyId;
private boolean dirty;
关键方法:
readNextTuple:从DataInputStream中读取某个槽的元组
getPageData:生成表示此页面内容的字节数组。用于将此页面序列化到磁盘。
deleteTuple:tuples相应位置为null,且设置槽为false。
insertTuple:设置tuples相应位置为t,且槽为true
markDirty:标记页面是否为脏,并记录事务id。
iterator:迭代整个页的元组。
src/java/simpledb/storage/HeapFile.java
package simpledb.storage;import simpledb.common.Database;
import simpledb.common.DbException;
import simpledb.common.Permissions;
import simpledb.transaction.TransactionAbortedException;
import simpledb.transaction.TransactionId;import java.io.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;/*** HeapFile is an implementation of a DbFile that stores a collection of tuples* in no particular order. Tuples are stored on pages, each of which is a fixed* size, and the file is simply a collection of those pages. HeapFile works* closely with HeapPage. The format of HeapPages is described in the HeapPage* constructor.* * @see HeapPage#HeapPage* @author Sam Madden*/
public class HeapFile implements DbFile {private final File file;private final TupleDesc td;/*** Constructs a heap file backed by the specified file.* * @param f* the file that stores the on-disk backing store for this heap* file.*/public HeapFile(File f, TupleDesc td) {// some code goes herethis.file = f;this.td = td;}/*** Returns the File backing this HeapFile on disk.* * @return the File backing this HeapFile on disk.*/public File getFile() {// some code goes herereturn file;}/*** Returns an ID uniquely identifying this HeapFile. Implementation note:* you will need to generate this tableid somewhere to ensure that each* HeapFile has a "unique id," and that you always return the same value for* a particular HeapFile. We suggest hashing the absolute file name of the* file underlying the heapfile, i.e. f.getAbsoluteFile().hashCode().* * @return an ID uniquely identifying this HeapFile.*/public int getId() {// some code goes herereturn file.getAbsoluteFile().hashCode();}/*** Returns the TupleDesc of the table stored in this DbFile.* * @return TupleDesc of this DbFile.*/public TupleDesc getTupleDesc() {// some code goes herereturn td;}// see DbFile.java for javadocspublic Page readPage(PageId pid) {// some code goes hereint tableId = pid.getTableId();int pgNo = pid.getPageNumber();RandomAccessFile f = null;try {f = new RandomAccessFile(file,"r");if ((pgNo + 1) * BufferPool.getPageSize() > f.length()) {f.close();throw new IllegalArgumentException(String.format("table %d page %d is invalid", tableId, pgNo));}byte[] bytes = new byte[BufferPool.getPageSize()];f.seek(pgNo * BufferPool.getPageSize());int read = f.read(bytes,0,BufferPool.getPageSize());if (read != BufferPool.getPageSize()){throw new IllegalArgumentException(String.format("table %d page %d read %d bytes", tableId, pgNo, read));}HeapPageId id = new HeapPageId(pid.getTableId(), pid.getPageNumber());return new HeapPage(id,bytes);} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {try {f.close();} catch (Exception e){e.printStackTrace();}}throw new IllegalArgumentException(String.format("table %d page %d is invalid", tableId, pgNo));}// see DbFile.java for javadocspublic void writePage(Page page) throws IOException {// some code goes here// not necessary for lab1int pgNo = page.getId().getPageNumber();if (pgNo > numPages()){throw new IllegalArgumentException();}int pgSize = BufferPool.getPageSize();RandomAccessFile f = new RandomAccessFile(file,"rw");f.seek(pgNo*pgSize);byte[] data = page.getPageData();f.write(data);f.close();}/*** Returns the number of pages in this HeapFile.*/public int numPages() {// some code goes hereint num = (int) Math.floor(file.length()*1.0 / BufferPool.getPageSize());return num;}// see DbFile.java for javadocspublic List<Page> insertTuple(TransactionId tid, Tuple t)throws DbException, IOException, TransactionAbortedException {// some code goes here// not necessary for lab1ArrayList<Page> pageList = new ArrayList<>();for (int i = 0; i < numPages(); i++) {HeapPage p = (HeapPage) Database.getBufferPool().getPage(tid,new HeapPageId(this.getId(),i),Permissions.READ_WRITE);if (p.getNumEmptySlots() == 0){continue;}p.insertTuple(t);pageList.add(p);return pageList;}BufferedOutputStream bw = new BufferedOutputStream(new FileOutputStream(file,true));byte[] emptyData = HeapPage.createEmptyPageData();bw.write(emptyData);bw.close();HeapPage p = (HeapPage) Database.getBufferPool().getPage(tid,new HeapPageId(getId(),numPages() - 1),Permissions.READ_WRITE);p.insertTuple(t);pageList.add(p);return pageList;}// see DbFile.java for javadocspublic ArrayList<Page> deleteTuple(TransactionId tid, Tuple t) throws DbException,TransactionAbortedException {// some code goes here// not necessary for lab1ArrayList<Page> pageList = new ArrayList<>();HeapPage p = (HeapPage) Database.getBufferPool().getPage(tid,t.getRecordId().getPageId(),Permissions.READ_WRITE);p.deleteTuple(t);pageList.add(p);return pageList;}// see DbFile.java for javadocspublic DbFileIterator iterator(TransactionId tid) {// some code goes herereturn new HeapFileIterator(this,tid);}private static final class HeapFileIterator implements DbFileIterator{private final HeapFile heapFile;private final TransactionId tid;private Iterator<Tuple> it;private int whichPage;public HeapFileIterator(HeapFile file,TransactionId tid){this.heapFile = file;this.tid = tid;}@Overridepublic void open() throws DbException, TransactionAbortedException {whichPage = 0;it = getPageTuples(whichPage);}private Iterator<Tuple> getPageTuples(int pageNumber) throws DbException, TransactionAbortedException {if (pageNumber >= 0 && pageNumber < heapFile.numPages()){HeapPageId pid = new HeapPageId(heapFile.getId(),pageNumber);HeapPage page = (HeapPage) Database.getBufferPool().getPage(tid,pid,Permissions.READ_ONLY);return page.iterator();} else {throw new DbException(String.format("heapfile %d does not contain page %d!", pageNumber,heapFile.getId()));}}@Overridepublic boolean hasNext() throws DbException, TransactionAbortedException {if (it == null) {return false;}if (!it.hasNext()){while (whichPage < (heapFile.numPages() - 1)){whichPage++;it = getPageTuples(whichPage);if (it.hasNext()) {return it.hasNext();}}if (whichPage >= (heapFile.numPages() - 1)) {return false;}} else {return true;}return false;}@Overridepublic Tuple next() throws DbException, TransactionAbortedException, NoSuchElementException {if (it == null || !it.hasNext()){throw new NoSuchElementException();}return it.next();}@Overridepublic void rewind() throws DbException, TransactionAbortedException {close();open();}@Overridepublic void close() {it = null;}}}
变量定义:
private final File file;//保存tuple的文件
private final TupleDesc td;
关键方法:
readPage、writePage:读或写页面到文件
insertTuple、deleteTuple:插入tuple或删除tuple到page,其中page从bufferpool获取
src/java/simpledb/storage/BufferPool.java
public void insertTuple(TransactionId tid, int tableId, Tuple t)throws DbException, IOException, TransactionAbortedException {// some code goes here// not necessary for lab1DbFile f = Database.getCatalog().getDatabaseFile(tableId);updateBufferPool(f.insertTuple(tid,t),tid);}/*** Remove the specified tuple from the buffer pool.* Will acquire a write lock on the page the tuple is removed from and any* other pages that are updated. May block if the lock(s) cannot be acquired.** Marks any pages that were dirtied by the operation as dirty by calling* their markDirty bit, and adds versions of any pages that have * been dirtied to the cache (replacing any existing versions of those pages) so * that future requests see up-to-date pages. ** @param tid the transaction deleting the tuple.* @param t the tuple to delete*/public void deleteTuple(TransactionId tid, Tuple t)throws DbException, IOException, TransactionAbortedException {// some code goes here// not necessary for lab1DbFile f = Database.getCatalog().getDatabaseFile(t.getRecordId().getPageId().getTableId());updateBufferPool(f.deleteTuple(tid,t),tid);}private void updateBufferPool(ArrayList<Page> pagelist,TransactionId tid) throws DbException{for(Page p:pagelist){p.markDirty(true,tid);// update bufferpoolif(pageStore.size() > numPages)evictPage();pageStore.put(p.getId(),p);}}
ant runtest -Dtest=HeapPageWriteTest
ant runtest -Dtest=HeapFileWriteTest
ant runtest -Dtest=BufferPoolWriteTest
都BUILD SUCCESSFUL。
2.4. Insertion and deletion
src/java/simpledb/execution/Insert.java
package simpledb.execution;import simpledb.common.Database;
import simpledb.common.DbException;
import simpledb.common.Type;
import simpledb.storage.BufferPool;
import simpledb.storage.IntField;
import simpledb.storage.Tuple;
import simpledb.storage.TupleDesc;
import simpledb.transaction.TransactionAbortedException;
import simpledb.transaction.TransactionId;import java.io.IOException;/*** Inserts tuples read from the child operator into the tableId specified in the* constructor*/
public class Insert extends Operator {private static final long serialVersionUID = 1L;private TransactionId tid;private OpIterator child;private int tableId;private final TupleDesc td;/*** helper for fetchNext* */private int counter;private boolean called;/*** Constructor.** @param t* The transaction running the insert.* @param child* The child operator from which to read tuples to be inserted.* @param tableId* The table in which to insert tuples.* @throws DbException* if TupleDesc of child differs from table into which we are to* insert.*/public Insert(TransactionId t, OpIterator child, int tableId)throws DbException {// some code goes hereif (!child.getTupleDesc().equals(Database.getCatalog().getTupleDesc(tableId))){throw new DbException("TupleDesc does not match!");}this.tid = t;this.child = child;this.tableId = tableId;this.td = new TupleDesc(new Type[]{Type.INT_TYPE},new String[]{"number of inserted tuples"});this.counter = -1;this.called = false;}public TupleDesc getTupleDesc() {// some code goes herereturn this.td;}public void open() throws DbException, TransactionAbortedException {// some code goes herethis.counter = 0;this.child.open();super.open();}public void close() {// some code goes heresuper.close();this.child.close();this.counter = -1;this.called = false;}public void rewind() throws DbException, TransactionAbortedException {// some code goes herethis.child.rewind();this.counter = 0;this.called = false;}/*** Inserts tuples read from child into the tableId specified by the* constructor. It returns a one field tuple containing the number of* inserted records. Inserts should be passed through BufferPool. An* instances of BufferPool is available via Database.getBufferPool(). Note* that insert DOES NOT need check to see if a particular tuple is a* duplicate before inserting it.** @return A 1-field tuple containing the number of inserted records, or* null if called more than once.* @see Database#getBufferPool* @see BufferPool#insertTuple*/protected Tuple fetchNext() throws TransactionAbortedException, DbException {// some code goes hereif (this.called){return null;}this.called = true;while (this.child.hasNext()){Tuple t = this.child.next();try {Database.getBufferPool().insertTuple(this.tid,this.tableId,t);this.counter++;} catch (IOException e){e.printStackTrace();break;}}Tuple tu = new Tuple(this.td);tu.setField(0,new IntField(this.counter));return tu;}@Overridepublic OpIterator[] getChildren() {// some code goes herereturn new OpIterator[]{this.child};}@Overridepublic void setChildren(OpIterator[] children) {// some code goes herethis.child = children[0];}
}
方法定义:
private TransactionId tid;//事务id
private OpIterator child;//要插入tuple迭代器
private int tableId;//表id
private final TupleDesc td;//元组描述
private int counter;//计数器
private boolean called;//
关键方法:
fetchNext:调用bufferpool去insert元组。
src/java/simpledb/execution/Delete.java
package simpledb.execution;import simpledb.common.Database;
import simpledb.common.DbException;
import simpledb.common.Type;
import simpledb.storage.BufferPool;
import simpledb.storage.IntField;
import simpledb.storage.Tuple;
import simpledb.storage.TupleDesc;
import simpledb.transaction.TransactionAbortedException;
import simpledb.transaction.TransactionId;import java.io.IOException;/*** The delete operator. Delete reads tuples from its child operator and removes* them from the table they belong to.*/
public class Delete extends Operator {private static final long serialVersionUID = 1L;private TransactionId tid;private OpIterator child;private final TupleDesc td;private int counter;private boolean called;/*** Constructor specifying the transaction that this delete belongs to as* well as the child to read from.* * @param t* The transaction this delete runs in* @param child* The child operator from which to read tuples for deletion*/public Delete(TransactionId t, OpIterator child) {// some code goes herethis.tid = t;this.child = child;this.td = new TupleDesc(new Type[]{Type.INT_TYPE},new String[]{"number of deleted tuples"});this.counter = -1;this.called = false;}public TupleDesc getTupleDesc() {// some code goes herereturn this.td;}public void open() throws DbException, TransactionAbortedException {// some code goes herethis.child.open();super.open();this.counter = 0;}public void close() {// some code goes heresuper.close();this.child.close();this.counter = -1;}public void rewind() throws DbException, TransactionAbortedException {// some code goes herethis.child.rewind();this.counter = 0;}/*** Deletes tuples as they are read from the child operator. Deletes are* processed via the buffer pool (which can be accessed via the* Database.getBufferPool() method.* * @return A 1-field tuple containing the number of deleted records.* @see Database#getBufferPool* @see BufferPool#deleteTuple*/protected Tuple fetchNext() throws TransactionAbortedException, DbException {// some code goes hereif (this.called){return null;}this.called = true;while (this.child.hasNext()){Tuple t = this.child.next();try{Database.getBufferPool().deleteTuple(this.tid,t);this.counter++;} catch (IOException e){e.printStackTrace();break;}}Tuple tu = new Tuple(this.td);tu.setField(0,new IntField(this.counter));return tu;}@Overridepublic OpIterator[] getChildren() {// some code goes herereturn new OpIterator[]{this.child};}@Overridepublic void setChildren(OpIterator[] children) {// some code goes herethis.child = children[0];}}
变量定义:
private TransactionId tid;//事务id
private OpIterator child;//要插入tuple迭代器
private int tableId;//表id
private final TupleDesc td;//元组描述
private int counter;//计数器
private boolean called;//
关键方法:
fetchNext:调用bufferpool去delete元组。
ant runtest -Dtest=InsertTest
单元测试 BUILD SUCCESSFUL。
ant runsystest -Dtest=InsertTest
ant runsystest -Dtest=DeleteTest
系统测试BUILD SUCCESSFUL。
2.5. Page eviction
src/java/simpledb/storage/BufferPool.java
package simpledb.storage;import simpledb.common.Database;
import simpledb.common.DbException;
import simpledb.common.Permissions;
import simpledb.transaction.TransactionAbortedException;
import simpledb.transaction.TransactionId;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;/*** BufferPool manages the reading and writing of pages into memory from* disk. Access methods call into it to retrieve pages, and it fetches* pages from the appropriate location.* <p>* The BufferPool is also responsible for locking; when a transaction fetches* a page, BufferPool checks that the transaction has the appropriate* locks to read/write the page.* * @Threadsafe, all fields are final*/
public class BufferPool {/** Bytes per page, including header. */private static final int DEFAULT_PAGE_SIZE = 4096;private static int pageSize = DEFAULT_PAGE_SIZE;/** Default number of pages passed to the constructor. This is used byother classes. BufferPool should use the numPages argument to theconstructor instead. */public static final int DEFAULT_PAGES = 50;private final int numPages;private final ConcurrentHashMap<Integer,Page> pageStore;/*** Creates a BufferPool that caches up to numPages pages.** @param numPages maximum number of pages in this buffer pool.*/public BufferPool(int numPages) {// some code goes herethis.numPages = numPages;pageStore = new ConcurrentHashMap<>();}public static int getPageSize() {return pageSize;}// THIS FUNCTION SHOULD ONLY BE USED FOR TESTING!!public static void setPageSize(int pageSize) {BufferPool.pageSize = pageSize;}// THIS FUNCTION SHOULD ONLY BE USED FOR TESTING!!public static void resetPageSize() {BufferPool.pageSize = DEFAULT_PAGE_SIZE;}/*** Retrieve the specified page with the associated permissions.* Will acquire a lock and may block if that lock is held by another* transaction.* <p>* The retrieved page should be looked up in the buffer pool. If it* is present, it should be returned. If it is not present, it should* be added to the buffer pool and returned. If there is insufficient* space in the buffer pool, a page should be evicted and the new page* should be added in its place.** @param tid the ID of the transaction requesting the page* @param pid the ID of the requested page* @param perm the requested permissions on the page*/public Page getPage(TransactionId tid, PageId pid, Permissions perm)throws TransactionAbortedException, DbException {// some code goes hereif (!pageStore.containsKey(pid.hashCode())){if (pageStore.size() > numPages){evictPage();}DbFile dbFile = Database.getCatalog().getDatabaseFile(pid.getTableId());Page page = dbFile.readPage(pid);pageStore.put(pid.hashCode(),page);}return pageStore.get(pid.hashCode());}/*** Releases the lock on a page.* Calling this is very risky, and may result in wrong behavior. Think hard* about who needs to call this and why, and why they can run the risk of* calling it.** @param tid the ID of the transaction requesting the unlock* @param pid the ID of the page to unlock*/public void unsafeReleasePage(TransactionId tid, PageId pid) {// some code goes here// not necessary for lab1|lab2}/*** Release all locks associated with a given transaction.** @param tid the ID of the transaction requesting the unlock*/public void transactionComplete(TransactionId tid) {// some code goes here// not necessary for lab1|lab2}/** Return true if the specified transaction has a lock on the specified page */public boolean holdsLock(TransactionId tid, PageId p) {// some code goes here// not necessary for lab1|lab2return false;}/*** Commit or abort a given transaction; release all locks associated to* the transaction.** @param tid the ID of the transaction requesting the unlock* @param commit a flag indicating whether we should commit or abort*/public void transactionComplete(TransactionId tid, boolean commit) {// some code goes here// not necessary for lab1|lab2}/*** Add a tuple to the specified table on behalf of transaction tid. Will* acquire a write lock on the page the tuple is added to and any other * pages that are updated (Lock acquisition is not needed for lab2). * May block if the lock(s) cannot be acquired.* * Marks any pages that were dirtied by the operation as dirty by calling* their markDirty bit, and adds versions of any pages that have * been dirtied to the cache (replacing any existing versions of those pages) so * that future requests see up-to-date pages. ** @param tid the transaction adding the tuple* @param tableId the table to add the tuple to* @param t the tuple to add*/public void insertTuple(TransactionId tid, int tableId, Tuple t)throws DbException, IOException, TransactionAbortedException {// some code goes here// not necessary for lab1DbFile f = Database.getCatalog().getDatabaseFile(tableId);updateBufferPool(f.insertTuple(tid,t),tid);}/*** Remove the specified tuple from the buffer pool.* Will acquire a write lock on the page the tuple is removed from and any* other pages that are updated. May block if the lock(s) cannot be acquired.** Marks any pages that were dirtied by the operation as dirty by calling* their markDirty bit, and adds versions of any pages that have * been dirtied to the cache (replacing any existing versions of those pages) so * that future requests see up-to-date pages. ** @param tid the transaction deleting the tuple.* @param t the tuple to delete*/public void deleteTuple(TransactionId tid, Tuple t)throws DbException, IOException, TransactionAbortedException {// some code goes here// not necessary for lab1DbFile f = Database.getCatalog().getDatabaseFile(t.getRecordId().getPageId().getTableId());updateBufferPool(f.deleteTuple(tid,t),tid);}private void updateBufferPool(List<Page> pageList, TransactionId tid) throws DbException {for (Page p:pageList) {p.markDirty(true,tid);if (pageStore.size() > numPages){evictPage();}pageStore.put(p.getId().getPageNumber(),p);}}/*** Flush all dirty pages to disk.* NB: Be careful using this routine -- it writes dirty data to disk so will* break simpledb if running in NO STEAL mode.*/public synchronized void flushAllPages() throws IOException {// some code goes here// not necessary for lab1for (Page p:pageStore.values()) {flushPage(p.getId());}}/** Remove the specific page id from the buffer pool.Needed by the recovery manager to ensure that thebuffer pool doesn't keep a rolled back page in itscache.Also used by B+ tree files to ensure that deleted pagesare removed from the cache so they can be reused safely*/public synchronized void discardPage(PageId pid) {// some code goes here// not necessary for lab1pageStore.remove(pid.hashCode());}/*** Flushes a certain page to disk* @param pid an ID indicating the page to flush*/private synchronized void flushPage(PageId pid) throws IOException {// some code goes here// not necessary for lab1Page p = pageStore.get(pid.hashCode());TransactionId tid = null;if ((tid = p.isDirty()) != null){Database.getLogFile().logWrite(tid,p.getBeforeImage(),p);Database.getLogFile().force();Database.getCatalog().getDatabaseFile(pid.getTableId()).writePage(p);p.markDirty(false,null);}}/** Write all pages of the specified transaction to disk.*/public synchronized void flushPages(TransactionId tid) throws IOException {// some code goes here// not necessary for lab1|lab2}/*** Discards a page from the buffer pool.* Flushes the page to disk to ensure dirty pages are updated on disk.*/private synchronized void evictPage() throws DbException {// some code goes here// not necessary for lab1PageId pid = new ArrayList<>(pageStore.values()).get(0).getId();try {flushPage(pid);} catch (IOException e){e.printStackTrace();}discardPage(pid);}}
变量定义:
private static final int DEFAULT_PAGE_SIZE = 4096;//默认页大小
private static int pageSize = DEFAULT_PAGE_SIZE;//默认页大小
public static final int DEFAULT_PAGES = 50;//默认页数目
private final int numPages;//实际页数目
private final ConcurrentHashMap<Integer,Page> pageStore;//页map
关键方法:
flushPage:刷新页面到磁盘,把脏页刷新。
evictPage:淘汰缓冲池里的页面。
ant runsystest -Dtest=EvictionTest
系统测试BUILD SUCCESSFUL。
这篇关于6.830 / 6.814: Syllabus 2021 - MIT Lab 2 - SimpleDB Operators的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!