本文主要是介绍HBase Endpoint编程示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Endpoint类似于关系型数据库中的存储过程,只不过这个存储过程会运行在多个Region上。终端是动态RPC插件的接口,它的实现代码被安装在服务器端,从而能够通过HBase RPC唤醒。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个终端,它们的实现代码会被目标Region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。
Endpoint的使用需要经过如下步骤(示例为计算某列的sum值):
1.定义一个新的protocol接口,并继承CoprocessorProtocol
public interface ColumnAggregationProtocol extends CoprocessorProtocol {public long sum(byte[] family, byte[] qualifier) throws IOException;
}
2.继承抽象类BaseEndpointCoprocessor,并实现已定义的接口
public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor implements ColumnAggregationProtocol {@Override
public long sum(byte[] family, byte[] qualifier) throws IOException {Scan scan = new Scan();scan.addColumn(family, qualifier);long sumResult = 0;InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion().getScanner(scan);try {List<KeyValue> curVals = new ArrayList<KeyValue>();boolean hasMore = false;do {curVals.clear();hasMore = scanner.next(curVals);KeyValue kv = curVals.get(0);sumResult += Bytes.toLong(kv.getValue());} while (hasMore);} finally {scanner.close();}return sumResult;}
}
3.客户端调用定义好的方法
public class EndpointTest {private static final byte[] TABLE_NAME = Bytes.toBytes("testtable");private static final byte[] CF = Bytes.toBytes("cf");private static final byte[] QUALIFIER = Bytes.toBytes("counts");private static final byte[] STRAT_KEY = Bytes.toBytes("000");private static final byte[] END_KEY = Bytes.toBytes("999");public static void main(String[] args) throws Throwable {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "172.16.0.126");conf.set("hbase.zookeeper.property.clientPort", "2181");conf.set("mapred.task.timeout", "0");HTableTnterface table = new HTable(conf, TABLE_NAME);Map<byte[], Long> results;results = table.coprocessorExec(ColumnAggregationProtocol.class,START_KEY, END_KEY, new Batch.Call<ColumnAggregationProtocol, Long>() {@Override
public Long call(ColumnAggregationProtocol instance) throws IOException {return instance.sum(CF, QUALIFIER);}});long sumResult = 0;for (Map.Entry<byte[], Long> e : results.entrySet()) {sumResult += e.getValue();}System.out.println(sumResult);}
}
4.客户端调用Endpoint的方法有三种方式:
//适用于单个region
public <T extends CoprocessorProtocol> T coprocessorProtocol(Class<T> protocol, Row row);
//适用于多个region
public <T extends CoprocessorProtocol, R> void coprocessorExec(Class<T> protocol, byte[] startkey,
byte[] endkey, Batch.Call<T, R> call, Batch.Callback<R> callback);
//适用于多个region
public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
Class<T> protocol, byte[] startkey, byte[] endkey,
Batch.Call<T, R> call);
5.整体的Endpoint加载过程
原文:http://blog.selfup.cn/466.html
这篇关于HBase Endpoint编程示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!