Hbase进行RowCount统计

2018-11-30 18:18 
版权声明:本文为博主原创文章,请尊重劳动成果,觉得不错就在文章下方顶一下呗,转载请标明原地址。 https://blog.csdn.net/m0_37739193/article/details/75286496

对于Table内RowKey个数的统计,一直是HBase系统面临的一项重要工作,目前有三种执行该操作的方式。

测试环境:

Apache版的 hadoop-2.6.0 (cdh版的hadoop-2.6.0-cdh5.5.2也可以)
Apache版的 hbase-1.0.0 (一开始我用的是cdh版的hbase-1.0.0-cdh5.5.2,结果各种bug,无奈只能用Apache版的了)
jdk1.7.0_25

在hbase中创建测试所用的表:

create 'scores','grade','course'
put 'scores','zhangsan01','course:math','99'
put 'scores','zhangsan01','course:art','90'
put 'scores','zhangsan01','grade:','101'
put 'scores','zhangsan02','course:math','66'
put 'scores','zhangsan02','course:art','60'
put 'scores','lisi01','course:math','89'

一、使用MapReduce进行。可以借助HTableInputFormat实现对于Rowkey的划分,但是需要占用资源,另外由于使用的Hadoop集群提交作业,经常会遇到不能申请到资源的情况,延迟较大,不适合应用的频繁访问。(我没有使用HTableInputFormat方法,不知道如何使用这个方法进行rowcount,有时间再研究研究吧)

Java代码:(MapReduce读取HBase表的行数并将结果写入另一张HBase表中)

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class ReadWriteHBase1 {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		String hbaseTableName1 = "scores";
		String hbaseTableName2 = "mytb2";
		
		prepareTB2(hbaseTableName2);
		
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		job.setJarByClass(ReadWriteHBase1.class);
		job.setJobName("mrreadwritehbase");
		
		Scan scan = new Scan();
		scan.setCaching(500);
		scan.setCacheBlocks(false);
		
		TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);
		TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);
		System.exit(job.waitForCompletion(true) ? 1 : 0);
	}
	
	public static class doMapper extends TableMapper<Text, IntWritable>{
		private final static IntWritable one = new IntWritable(1);
		private final static Text hui = new Text("count");
		@Override
		protected void map(ImmutableBytesWritable key, Result value, Context context) throws Exception {
			context.write(hui, one);
		}
	}
	
	public static class doReducer extends TableReducer<Text, IntWritable, NullWritable>{
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws Exception {
		System.out.println(key.toString());
		int sum = 0;
		Iterator<IntWritable> haha = values.iterator();
		while (haha.hasNext()) {
			sum += haha.next().get();
			}
		Put put = new Put(Bytes.toBytes(key.toString()));
		put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("sum"), Bytes.toBytes(String.valueOf(sum)));				
		context.write(NullWritable.get(), put);
		}
	}
	
	public static void prepareTB2(String hbaseTableName) throws IOException{
		HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);
		HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily");
		tableDesc.addFamily(columnDesc);
		Configuration  cfg = HBaseConfiguration.create();
		HBaseAdmin admin = new HBaseAdmin(cfg);
		if (admin.tableExists(hbaseTableName)) {
			System.out.println("Table exists,trying drop and create!");
			admin.disableTable(hbaseTableName);
			admin.deleteTable(hbaseTableName);
			admin.createTable(tableDesc);
		} else {
			System.out.println("create table: "+ hbaseTableName);
			admin.createTable(tableDesc);
		}
	}
}

运行:

[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/javac ReadWriteHBase1.java
[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar ReadWriteHBase1*class
[hadoop@h40 q1]$ hadoop jar xx.jar ReadWriteHBase1 

hbase(main):021:0> scan 'scores'
ROW                                                          COLUMN+CELL                                                                                                                                                                     
 lisi01                                                      column=course:math, timestamp=1492420326312, value=89                                                                                                                           
 zhangsan01                                                  column=course:art, timestamp=1492420325361, value=90                                                                                                                            
 zhangsan01                                                  column=course:math, timestamp=1492420325331, value=99                                                                                                                           
 zhangsan01                                                  column=grade:, timestamp=1492420325397, value=101                                                                                                                               
 zhangsan02                                                  column=course:art, timestamp=1492420325500, value=60                                                                                                                            
 zhangsan02                                                  column=course:math, timestamp=1492420325441, value=66                                                                                                                           
3 row(s) in 0.0120 seconds

scores表有三行数据,且表mytb2不存在,运行代码后查看mytb2表

hbase(main):003:0> scan 'mytb2'
ROW                                                          COLUMN+CELL                                                                                                                                                                     
 count                                                       column=mycolumnfamily:sum, timestamp=1489762697539, value=3
1 row(s) in 0.0280 seconds

二、使用Scan+KeyOnlyFilter的方式进行。可以借助Filter的功能,尽可能实现数据在RegionServer端进行统计,减轻Client端的压力。但是,在大多数情况下,从每一个Region上进行Scan,当Table较大时,会造成非常长的延迟,用户体验下降。

Java代码:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

public class rowCount {
	public static Admin admin = null;  
    public static Connection conn = null;  
  
	public static Configuration getConfiguration() {
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.rootdir", "hdfs://192.168.8.40:9000/hbase");
		conf.set("hbase.zookeeper.quorum", "h40:2181,h41:2181,h42:2181");
		return conf;
	}
  public static void main(String[] args) throws Exception {  
    	//表名
    	rowCount("mytb1");
  }
	public static long rowCount(String tableName) {  
	    long rowCount = 0;  
	    try {  
	        HTable table = new HTable(getConfiguration(), tableName);  
	        Scan scan = new Scan();  
	        scan.setFilter(new FirstKeyOnlyFilter());  
	        ResultScanner resultScanner = table.getScanner(scan);  
	        for (Result result : resultScanner) {  
	            rowCount += result.size();  
	        }  
	        System.out.println("rowCount-->"+rowCount);
	    } catch (IOException e) {
	    }  
	    return rowCount;  
	}  
}

三、协处理器:

1.hbase自带的org.apache.hadoop.hbase.coprocessor.AggregateImplementation,使用该类可以count一张表的总记录数

方法一:

启用表aggregation,只对特定的表生效。通过hbase Shell 来实现。
(1)disable指定表
hbase(main):036:0> disable 'scores'
(2)添加aggregation
alter 'scores', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'
注意:这里吐槽一下CSDN,它会将一些关键词自动变红并且首字母大写,这里你就要注意了,org.apache.hadoop.hbase.coprocessor.AggregateImplementation中的hadoop中的h是小写而不是大写啊
(3)重启指定表 
hbase(main):038:0> enable 'scores'
(4)查看是否加载成功
hbase(main):039:0> describe 'scores'
Table scores is ENABLED                                                                                                                                                                                                                      
scores, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'}                                                                                                                             
COLUMN FAMILIES DESCRIPTION                                                                                                                                                                                                                  
{NAME => 'course', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMO
RY => 'false', BLOCKCACHE => 'true'}                                                                                                                                                                                                         
{NAME => 'grade', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMOR
Y => 'false', BLOCKCACHE => 'true'}                                                                                                                                                                                                          
2 row(s) in 0.0350 seconds
(5)如果你想删除的话
hbase(main):040:0> alter 'scores' ,METHOD=>'table_att_unset',NAME=>'coprocessor$1'

方法二:启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase集群所有节点的hbase-site.xml文件来实现,只需要添加如下代码:

<property>  
   <name>hbase.coprocessor.user.region.classes</name>
   <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>  
</property>

注意:方法一不需要重启hbase集群,而方法二需要重启hbase集群

java代码:

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.hbase.HBaseConfiguration;  
import org.apache.hadoop.hbase.TableName;  
import org.apache.hadoop.hbase.client.Scan;  
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;  
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;  
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;  
import org.apache.hadoop.hbase.util.Bytes;  
  
public class MyAggregationClient {  
  
    public static void main(String[] args) throws Throwable {  
    Configuration customConf = new Configuration();  
    customConf.set("hbase.zookeeper.quorum", "h40:2181,h40:2181,h40:2181");
    //提高RPC通信时长  
    customConf.setLong("hbase.rpc.timeout", 600000);  
    //设置Scan缓存  
    customConf.setLong("hbase.client.scanner.caching", 1000);  
    Configuration configuration = HBaseConfiguration.create(customConf);  
    AggregationClient aggregationClient = new AggregationClient(  
    configuration);
    Scan scan = new Scan();
    //根据列族名统计行数
//    scan.addFamily(Bytes.toBytes("grade"));
//    scan.addColumn(Bytes.toBytes("course"), Bytes.toBytes("art"));
    /**
    private static final byte[] TABLE_NAME = Bytes.toBytes("scores");  
    long rowCount = aggregationClient.rowCount(TABLE_NAME, null, scan);
	这两行代码报错,显示TABLE_NAME必须是个TableName类型的,但也不知道这个TableName是个什么类型,该咋么表示,网上有这么写的,也不知道他们是咋么执行成功的,真厉害。。。下面的这行是正解
     * */
    long rowCount = aggregationClient.rowCount(TableName.valueOf("scores"), new LongColumnInterpreter(), scan);  
    System.out.println("row count is " + rowCount);
    }
}

我是直接在myeclipse中运行的这个代码,你也可以在Linux上运行。

根据表名统计行数运行结果为:row count is 3
根据列族统计行数运行结果为:row count is 1 (把scan.addFamily(Bytes.toBytes("grade"));的注释去掉即可)
根据列名(把scan.addColumn(Bytes.toBytes("course"), Bytes.toBytes("art"));的注释去掉)
理想结果为:row count is 2 实际结果为:row count is 3
(这我就不明白了啊,hbase给了这个方法addColumn(byte[] family, byte[] qualifier),但为什么运行结果却不是想要的呢,有知道的朋友告我一声,谢谢)

2.自定义EndPoint:

(1)首先需要下载protobuf的解析器protobuf-2.5.0.tar.gz,下载地址:http://download.csdn.net/detail/m0_37739193/9901063

然后,按照如下的方式安装:
[root@h40 usr]# tar -zxvf protobuf-2.5.0.tar.gz
[root@h40 usr]# cd protobuf-2.5.0
编译之前需要把编译环境安装好,我这里的Linux是红帽版本:
[root@h40 ~]# yum -y install gcc*
[root@h40 protobuf-2.5.0]# ./configure --prefix=/usr/protobuf-2.5.0
(如果不加该参数的话,protobuf默认安装在/usr/local/bin目录下)
[root@h40 protobuf-2.5.0]# make && make install

(2)编写.proto文件
[root@h40 protobuf-2.5.0]# vi hehe.proto

option java_package = "com.cxk.coprocessor.test.generated";  
option java_outer_classname = "CXKTestProtos";  
option java_generic_services = true;  
option java_generate_equals_and_hash = true;  
option optimize_for = SPEED;  
message CountRequest {  
}  
message CountResponse {  
  required int64 count = 1 [default = 0];  
}  
service RowCountService {  
  rpc getRowCount(CountRequest)  
    returns (CountResponse);  
}

写完这个消息后,使用我们刚才安装的protoc工具将其编译生成我们需要的Java代码,使用的命令如下:
[root@h40 protobuf-2.5.0]# bin/protoc --java_out=/usr hehe.proto

在com.cxk.coprocessor.test.generated下会生成CXKTestProtos.java
[root@h40 generated]# ls
CXKTestProtos.java
[root@h40 generated]# pwd
/usr/com/cxk/coprocessor/test/generated

(3)定义自己的Endpoint类(实现一下自己的方法):

package com.cxk.coprocessor.test.generated;

import java.io.IOException;  
import java.util.ArrayList;  
import java.util.List;  
import org.apache.hadoop.hbase.Cell;  
import org.apache.hadoop.hbase.CellUtil;  
import org.apache.hadoop.hbase.Coprocessor;  
import org.apache.hadoop.hbase.CoprocessorEnvironment;  
import org.apache.hadoop.hbase.client.Scan;  
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;  
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;  
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;  
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;  
import org.apache.hadoop.hbase.protobuf.ResponseConverter;  
import org.apache.hadoop.hbase.regionserver.InternalScanner;  
import org.apache.hadoop.hbase.util.Bytes;  
import com.google.protobuf.RpcCallback;  
import com.google.protobuf.RpcController;  
import com.google.protobuf.Service;  
  
public class RowCountEndpoint extends CXKTestProtos.RowCountService  
    implements Coprocessor, CoprocessorService {  
  private RegionCoprocessorEnvironment env;  
  
  public RowCountEndpoint() {  
  }  
  
  @Override  
  public Service getService() {  
    return this;  
  }  
  
  /**  
   * 统计hbase表总行数  
   */  
  @Override  
  public void getRowCount(RpcController controller, CXKTestProtos.CountRequest request,  
                          RpcCallback<CXKTestProtos.CountResponse> done) {  
    Scan scan = new Scan();  
    scan.setFilter(new FirstKeyOnlyFilter());  
    CXKTestProtos.CountResponse response = null;  
    InternalScanner scanner = null;  
    try {  
      scanner = env.getRegion().getScanner(scan);  
      List<Cell> results = new ArrayList<Cell>();  
      boolean hasMore = false;  
      byte[] lastRow = null;  
      long count = 0;  
      do {  
        hasMore = scanner.next(results);  
        for (Cell kv : results) {  
          byte[] currentRow = CellUtil.cloneRow(kv);  
          if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {  
            lastRow = currentRow;  
            count++;  
          }  
        }  
        results.clear();  
      } while (hasMore);  
  
      response = CXKTestProtos.CountResponse.newBuilder()  
          .setCount(count).build();  
    } catch (IOException ioe) {  
      ResponseConverter.setControllerException(controller, ioe);  
    } finally {  
      if (scanner != null) {  
        try {  
          scanner.close();  
        } catch (IOException ignored) {}  
      }  
    }  
    done.run(response);  
  }  
  
  @Override  
  public void start(CoprocessorEnvironment env) throws IOException {  
    if (env instanceof RegionCoprocessorEnvironment) {  
      this.env = (RegionCoprocessorEnvironment)env;  
    } else {  
      throw new CoprocessorException("Must be loaded on a table region!");  
    }  
  }  
  
  @Override  
  public void stop(CoprocessorEnvironment env) throws IOException {  
    // nothing to do  
  }  
}

(4)实现自己的客户端方法:
TestEndPoint.java 代码如下:

package com.cxk.coprocessor.test.generated;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;

import com.cxk.coprocessor.test.generated.CXKTestProtos.RowCountService;
import com.google.protobuf.ServiceException;
  
public class TestEndPoint {  
/**  
 *   
 * @param args[0] ip ,args[1] zk_ip,args[2] table_name  
 * @throws ServiceException  
 * @throws Throwable  
 */  
    public static void main(String[] args) throws ServiceException, Throwable {  
        // TODO Auto-generated method stub  
         System.out.println("begin.....");
         long begin_time=System.currentTimeMillis();
        Configuration config=HBaseConfiguration.create();
        String master_ip=args[0];
//        String master_ip = "h40";
        String zk_ip=args[1];
//        String zk_ip = "h40";
        String table_name=args[2];
//        String table_name = "scores";
        config.set("hbase.zookeeper.property.clientPort", "2181");   
        config.set("hbase.zookeeper.quorum", zk_ip);   
        config.set("hbase.master", master_ip+":600000");  
        final CXKTestProtos.CountRequest request = CXKTestProtos.CountRequest.getDefaultInstance();  
        HTable table=new HTable(config,table_name);  
          
        Map<byte[],Long> results = table.coprocessorService(RowCountService.class,  
                null, null,  
                new Batch.Call<CXKTestProtos.RowCountService,Long>() {  
                  public Long call(CXKTestProtos.RowCountService counter) throws IOException {  
                    ServerRpcController controller = new ServerRpcController();  
                    BlockingRpcCallback<CXKTestProtos.CountResponse> rpcCallback =  
                        new BlockingRpcCallback<CXKTestProtos.CountResponse>();  
                    counter.getRowCount(controller, request, rpcCallback);  
                    CXKTestProtos.CountResponse response = rpcCallback.get();  
                    if (controller.failedOnException()) {  
                      throw controller.getFailedOn();  
                    }  
                    return (response != null && response.hasCount()) ? response.getCount() : 0;  
                  }  
                });  
        table.close();  
  
         if(results.size()>0){
        	 String hui = results.values().toString();
        	 String qiang = hui.substring(1, hui.length()-1);
        	 //results.values()输出为[i]类型,为了方便我这里转化成了i的形式
         System.out.println(qiang);  
         }else{  
             System.out.println("没有任何返回结果");  
         }  
         long end_time=System.currentTimeMillis();  
         System.out.println("end:"+(end_time-begin_time));  
    }  
}  

(5)将RowCountEndpoint.java、TestEndPoint.java和前面生成的CXKTestProtos.java放在myeclipse中的同一个工程下

将该工程打包成rowcount.jar
注意:myeclipse中的打包方式要用第二种,否则在后面无法运行java -jar命令,我这里是为了后面运行该命令才用这种打包方式(因为把所需要的jar包环境也一并打包了,所以打包速度慢),如果你不需要运行该命令,则完全可以用第一种打包方式并且打包速度更快


(6)部署endpoint方式:

1.1对指定表生效:
1.1.1
将rowcount.jar上传到3台机器的hbase的lib目录下,并且一定要重启hbase集群
再执行
alter 'scores', METHOD => 'table_att','coprocessor'=>'|com.cxk.coprocessor.test.generated.RowCountEndpoint||'
1.1.2
将rowcount.jar上传到hdfs上
[hadoop@h40 ~]$ hadoop fs -mkdir /in
[hadoop@h40 ~]$ hadoop fs -put rowcount.jar /in
再执行
alter 'scores' , METHOD =>'table_att','coprocessor'=>

'hdfs://h40:9000/in/rowcount.jar|com.cxk.coprocessor.test.generated.RowCountEndpoint|1001|arg1=1,arg2=2'
(这种方法我没有成功,并且还报错,无法挽回,所以慎用)

1.2对所有表生效:
修改hbase.site.xml文件,实现对所有表加载这个endpoint
主节点:
[hadoop@h40 ~]$ vi hbase-1.0.0/conf/hbase-site.xml 
添加如下内容:

<property>
   <name>hbase.coprocessor.user.region.classes</name>
   <value>com.cxk.coprocessor.test.generated.RowCountEndpoint</value>
</property>

其他两个节点一样(我的hbase集群用了3台机器搭建,分别为h40、h41、h42)
[hadoop@h40 ~]$ scp hbase-1.0.0/conf/hbase-site.xml h41:/home/hadoop/hbase-1.0.0/conf/
[hadoop@h40 ~]$ scp hbase-1.0.0/conf/hbase-site.xml h42:/home/hadoop/hbase-1.0.0/conf/

将rowcount.jar上传到3台机器的hbase的lib目录下,再重启hbase集群

(7)运行方式:
1.1
直接在myeclipse中运行TestEndPoint.java,该方法需要你在C:\Windows\System32\drivers\etc\hosts中添加如下内容映射你的虚拟机ip
192.168.8.40 h40
192.168.8.41 h41
192.168.8.42 h42
还有就是TestEndPoint.java中的代码没必要给参数了,直接这样给就行,把给参数的代码注释掉
String master_ip = "h40";
String zk_ip = "h40";
String table_name = "scores";
运行结果:

1.2
在Linux中运行TestEndPoint.java代码
[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/javac TestEndPoint.java 
(你可能这步无法编译,这就需要你在.bash_profile中将/home/hadoop/hbase-1.0.0/lib/*添加到CLASSPATH中)
[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/java TestEndPoint h40 h40 scores

1.3
在Linux中运行java -jar命令
[hadoop@h40 lib]$ /usr/jdk1.7.0_25/bin/java -jar rowcount.jar h40 h40 scores

上面的是根据表名进行行数统计,下面的这个是根据列族和列名进行行数统计,操作方法和上面的一样就不累述了,把主要的代码给大家。
(按理说应该可以将这些都合成在一个里面,等有时间再仔细研究吧,先把这几个案列给出大家来)

option java_package = "com.tencent.yun.endpoint.proto";
option java_outer_classname = "RowCountService";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message RowCountRequest{
    required string family = 1;
    required string column = 2;
}

message RowCountResponse {
    required int64 rowCount = 1 [default = 0];
}

service RowCount {
    rpc getRowCount(RowCountRequest)
    returns (RowCountResponse);
}

(如果你只想通过列族名而不过滤列名的话把required string column = 2;去掉)

package com.tencent.yun.endpoint.proto;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.tencent.yun.endpoint.proto.RowCountService;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountRequest;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountResponse;

public class RowCountEndPoint extends RowCountService.RowCount implements Coprocessor, CoprocessorService {

    private RegionCoprocessorEnvironment env;

    public Service getService() {
        return this;
    }

    public void start(CoprocessorEnvironment env) throws IOException {
        if (env instanceof RegionCoprocessorEnvironment) {
            this.env = (RegionCoprocessorEnvironment) env;
        } else {
            throw new CoprocessorException("Must be loaded on a table region!");
        }
    }

    public void stop(CoprocessorEnvironment arg0) throws IOException {
        // do nothing

    }

    @Override
    public void getRowCount(RpcController controller, RowCountRequest request, RpcCallback<RowCountResponse> done) {
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes(request.getFamily()));
        scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
        // scan.setMaxVersions(1);
        InternalScanner scanner = null;
        RowCountResponse response = null;

        long count = 0L;
        try {
            List<Cell> results = new ArrayList<Cell>();
            boolean hasMore = false;
            scanner = env.getRegion().getScanner(scan);

            do {
                hasMore = scanner.next(results);
                for (Cell cell : results) {
                    count++;
                    // count = count + Bytes.toLong(CellUtil.cloneValue(cell));
                }
                results.clear();
            } while (hasMore);

            response = RowCountResponse.newBuilder().setRowCount(count).build();

        } catch (IOException e) {
            ResponseConverter.setControllerException(controller, e);
        } finally {
            if (scanner != null) {
                try {
                    scanner.close();
                } catch (IOException ignored) {
                }
            }
        }
        done.run(response);
    }

}
package com.tencent.yun.endpoint.proto;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;

import com.google.protobuf.ServiceException;
import com.tencent.yun.endpoint.proto.RowCountService.RowCount;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountRequest;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountResponse;
import com.tencent.yun.endpoint.proto.RegionObserver;

public class RowCountClient {

    public static void testRowCountEndpoint(String tableName, String family, String col) throws IOException {
        System.out.println("begin test.....");
        long t1 = System.currentTimeMillis();
        Configuration config = HBaseConfiguration.create();
        //填写hbase zk地址
        config.set("hbase.zookeeper.quorum", "192.168.8.40:2181,192.168.8.41:2181,192.168.8.42:2181");

        // 根据列名统计
        final RowCountRequest req = RowCountRequest.newBuilder().setFamily(family).setColumn(col).build();
        // 根据列族名统计
//        final RowCountRequest req = RowCountRequest.newBuilder().setFamily(family).build();
        RowCountResponse resp = null;
        Connection con = null;
        Table table = null;
        try {
            con = ConnectionFactory.createConnection(config);
            table = con.getTable(TableName.valueOf(tableName));
            Map<byte[], Long> results = table.coprocessorService(RowCount.class, null, null,
                    new Batch.Call<RowCount, Long>() {

                        public Long call(RowCount instance) throws IOException {
                            ServerRpcController controller = new ServerRpcController();
                            BlockingRpcCallback<RowCountResponse> rpccall = new BlockingRpcCallback<RowCountResponse>();
                            instance.getRowCount(controller, req, rpccall);
                            RowCountResponse resp = rpccall.get();
                            return resp.hasRowCount() ? resp.getRowCount() : 0L;
                        }

                    });
            long count = 0L;
            for (Long sum : results.values()) {
//                System.out.println("region row Sum = " + sum);
                count += sum;
            }
            System.out.println("total count = " + count);
            long t2 = System.currentTimeMillis();
            System.out.println("use time = " + (t2-t1));
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ServiceException e) {
            e.printStackTrace();
        } catch (Throwable e) {
            e.printStackTrace();
        } finally{
            table.close();
            con.close();
        }
    }

    public static void main(String[] args) throws IOException {
        String tableName = "scores";
        String family = "course";
        String col = "math";
    	testRowCountEndpoint(tableName, family, col);
    }
}

根据列名统计:代码为String tableName = "scores";String family = "course";String col = "art";运行结果为:total count = 2
根据列族名统计:代码为String tableName = "scores";String family = "course";String col = null;理想结果:total count = 3实际结果:total count = 5(不知道哪里出问题了。。。)

为了统计使用Scan增加KeyOnlyFilter和Coprocessor之间的区别,记录了500次操作的时间,性能对比图如下:


从上图中,可以看出,大部分通过Coprocessor获取RowCount个数的延迟,小于1s,而使用Scan的方式,获得RowKeyCount的个数大概在4~5s。(备注,检查的table的Rowkey的个数在3w左右)。
那么究竟是什么原因让Coprocessor在统计Rowkey的个数上,拥有如此明显的优势呢?
这是因为在Table注册了Coprocessor之后,在执行AggregationClient的时候,会将RowCount分散到Table的每一个Region上,Region内RowCount的计算,是通过RPC执行调用接口,由Region对应的RegionServer执行InternalScanner进行的。
因此,性能的提升有两点原因:
1) 分布式统计。将原来客户端按照Rowkey的范围单点进行扫描,然后统计的方式,换成了由所有Region所在RegionServer同时计算的过程。
2)使用了在RegionServer内部执行使用了InternalScanner。这是距离实际存储最近的Scanner接口,存取更加快捷。

参考:
http://www.binospace.com/index.php/make-your-hbase-better-2/
http://blog.csdn.net/jameshadoop/article/details/42645551
http://blog.csdn.net/yangzongzhao/article/details/24306775
https://www.qcloud.com/document/product/407/4785

发表评论

您必须 登录 才能发表留言!