使用C++(通过Thrift)访问/操作/读写Hbase

By | 2019年2月12日


无奈,网上关于C++访问Hbase的文章实在太少,所以只好自己折腾一下,然后写出来了。

要使用C++访问Hbase,可以走的途径少之又少,据说当前最好的方法就是通过Thrift来实现:http://thrift.apache.org/

所以本文分成几部分:(1)安装Thrift;(2)用Thrift 生成访问Hbase所需的C++文件;(3)在程序中通过Thrift来访问Hbase。

另外,本文只包含读写Hbase数据的例子,不包含配置Hbase的方法,如需这些内容,请自行搜索。

首先声明一下,本文基于以下环境:

操作系统:RHEL 5.3,64位

Thrift 版本:0.7.0

要访问的 Hbase 版本:0.20.6

我使用0.90.4的 Hbase 安装包来生成C++所需的Hbase.h等文件(用新版的应该能兼容旧版的)

文章来源:http://www.codelast.com/

下面开始,一步步来。

(1)安装Thrift

不是一件很轻松的事。如果你的系统比较干净,可能很顺利地就搞定了,如果有依赖库缺失问题或者库冲突问题,那么就只能根据具体情况,一个个问题去fix了,谁知道会有多少麻烦。

我运气比较好,在一个干净的系统上,很快就完成了。

Thrift 至少会依赖于两个系统中一般不会带的库:libevent,boost。

libevent 到这里下载:http://monkey.org/~provos/libevent/ 
我使用的版本是:2.0.12-stable

boost 到这里下载:http://www.boost.org/ 
我使用的版本是:1.47.0

文章来源:http://www.codelast.com/

安装libevent:

1
2
3
./configure --prefix=/usr/local/libevent
make
make install

安装boost(boost不像一般的Linux源码安装包一样,它的安装不是configure,make,make install,有点怪):

1
./bootstrap.sh --prefix=/usr/local/boost

不出错的话接着执行以下命令开始编译(也可以通过编辑project-config.jam文件调整编译参数):

1
2
./b2
./b2 install

安装Thrift:

1
2
3
4
chmod +x configure
./configure --with-boost=/usr/local --prefix=/usr/local/thrift
make
make install

至此,安装Thrift 的工作就完成了。

文章来源:http://www.codelast.com/

(2)用Thrift 生成访问Hbase所需的C++文件

访问Hbase需要在你的程序中使用若干.h,.cpp文件,这些文件是用 Thrift 生成的。

解压Hbase源码安装包:

1
2
tar zxf hbase-0.90.4.tar.gz
cd hbase-0.90.4

在解压出来的文件中, 你可以找到一个名为 Hbase.thrift 的文件,这个文件定义了如何通过 Thrift 接口来访问Hbase。用这个Thrift文件,可以生成访问Hbase所需的C++文件:

1
/usr/local/thrift/bin/thrift --gen cpp ./src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift

会发现生成了gen-cpp目录:

1
ls gen-cpp/

输出:

Hbase_constants.cpp  Hbase_constants.h  Hbase.cpp  Hbase.h  Hbase_server.skeleton.cpp
 Hbase_types.cpp  Hbase_types.h

除了Hbase_server.skeleton.cpp之外,其余文件都是在我们的程序里要用到的,将它们拷贝到我们的工程目录下。

文章来源:http://www.codelast.com/

(3)在程序中使用Thrift来访问Hbase

要能通过 Thrift 访问Hbase,你必须首先要打开HBase的 Thrift 服务,请参考其他文档确保这一点是可用的。

下一步,我们在程序中如何读取Hbase的数据?

我们先看看hbase源码安装包中自带的例子:在解压出来的安装包中的 examples/thrift/ 目录下的 DemoClient.cpp 文件,有如下代码:

01
02
03
04
05
06
07
08
09
10
11
12
13
boost::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
HbaseClient client(protocol);
try {
  transport->open();
 
  // do something
 
  transport->close();
} catch (TException &tx) {
  printf("ERROR: %s\n", tx.what());
}

我们就仿照这个例子来做。从DemoClient.cpp可见,我们要先创建三个指针socket,transport和protocol,后两个分别依赖于前两个,最后再创建一个client对象,我们操作Hbase就是使用这个client对象。在操作Hbase前,需要先打开到Hbase
Thrift service的连接,即 transport->open(),在操作完 Hbase之后,需要关闭连接,即 transport->close(),这下就比较清楚了:我们可以写一个自己的类CHbaseOperate,它应该有一个connect函数和一个disconnect函数,分别用于打开、关闭连接,还应该有读写Hbase的基本功能。读写Hbase的方法,请参考Hbase.h中的函数,例子还是看DemoClient.cpp。

文章来源:http://www.codelast.com/

下面上代码:
HbaseOperate.h:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#ifndef __HBASE_OPERATE_H
#define __HBASE_OPERATE_H
  
#include <string>
#include <protocol/TBinaryProtocol.h>
#include <transport/TSocket.h>
#include <transport/TTransportUtils.h>
#include "Hbase.h"
  
/**
 * Class to operate Hbase.
 *
 * @author Darran Zhang (codelast.com)
 * @version 11-08-24
 * @declaration These codes are only for non-commercial use, and are distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.
 * You must not remove this declaration at any time.
 */
  
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::hadoop::hbase::thrift;
  
typedef struct hbaseRet {
  std::string rowValue;
  time_t ts;
  
  hbaseRet() {
    ts = 0;
  }
  
} hbaseRet;
  
class CHbaseOperate
{
public:
  CHbaseOperate();
  virtual ~CHbaseOperate();
  
private:
  boost::shared_ptr<TTransport> socket;
  boost::shared_ptr<TTransport> transport;
  boost::shared_ptr<TProtocol> protocol;
  
  HbaseClient *client;
  
  std::string  hbaseServiceHost;
  int     hbaseServicePort;
  bool    isConnected;
  
public:
  bool  connect();
  
  bool  connect(std::string host, int port);
  
  bool  disconnect();
  
  bool  putRow(const std::string &tableName,
              const std::string &rowKey,
              const std::string &column,
              const std::string &rowValue);
  
  bool  getRow(hbaseRet &result,
              const std::string &tableName,
              const std::string &rowKey,
              const std::string &columnName);
};
  
#endif

文章来源:http://www.codelast.com/

HbaseOperate.cpp:

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#include "HbaseOperate.h"
#include "log4cxx/log4cxx.h"
#include "log4cxx/propertyconfigurator.h"
  
static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("HbaseOperate.cpp"));
  
/**
 * Class to operate Hbase.
 *
 * @author Darran Zhang (codelast.com)
 * @version 11-08-24
 * @declaration These codes are only for non-commercial use, and are distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.
 * You must not remove this declaration at any time.
 */
  
using namespace std;
  
CHbaseOperate::CHbaseOperate() :
socket((TSocket*)NULL), transport((TBufferedTransport*)NULL), protocol((TBinaryProtocol*)NULL), client(NULL), hbaseServicePort(9090), isConnected(false)
{
}
  
CHbaseOperate::~CHbaseOperate()
{
  if (isConnected) {
    disconnect();
  }
  if (NULL != client) {
    delete client;
    client = NULL;
  }
}
  
/**
 * Connect Hbase.
 *
 */
bool CHbaseOperate::connect()
{
  if (isConnected) {
    LOG4CXX_INFO(logger, "Already connected, don't need to connect it again");
    return true;
  }
  
  try {
    socket.reset(new TSocket(hbaseServiceHost, hbaseServicePort));
    transport.reset(new TBufferedTransport(socket));
    protocol.reset(new TBinaryProtocol(transport));
  
    client = new HbaseClient(protocol);
  
    transport->open();
  } catch (const TException &tx) {
    LOG4CXX_ERROR(logger, "Connect Hbase error : " << tx.what());
    return false;
  }
  
  isConnected = true;
  return isConnected;
}
  
/**
 * Connect Hbase.
 *
 */
bool CHbaseOperate::connect(std::string host, int port)
{
  hbaseServiceHost = host;
  hbaseServicePort = port;
  
  return connect();
}
  
/**
 * Disconnect from Hbase.
 *
 */
bool CHbaseOperate::disconnect()
{
  if (!isConnected) {
    LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't disconnect from it");
    return false;
  }
  
  if (NULL != transport) {
    try {
      transport->close();
    } catch (const TException &tx) {
      LOG4CXX_ERROR(logger, "Disconnect Hbase error : " << tx.what());
      return false;
    }
  } else {
    return false;
  }
  
  isConnected = false;
  return true;
}
  
/**
 * Put a row to Hbase.
 *
 * @param tableName   [IN] The table name.
 * @param rowKey      [IN] The row key.
 * @param column      [IN] The "column family : qualifier".
 * @param rowValue    [IN] The row value.
 * @return True for successfully put the row, false otherwise.
 */
bool CHbaseOperate::putRow(const string &tableName, const string &rowKey, const string &column, const string &rowValue)
{
  if (!isConnected) {
    LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't put row");
    return false;
  }
  
  try {
    std::vector<Mutation> mutations;
    mutations.push_back(Mutation());
    mutations.back().column = column;
    mutations.back().value = rowValue;
    client->mutateRow(tableName, rowKey, mutations);
  
  } catch (const TException &tx) {
    LOG4CXX_ERROR(logger, "Operate Hbase error : " << tx.what());
    return false;
  }
  
  return true;
}
  
/**
 * Get a Hbase row.
 *
 * @param result      [OUT] The object which contains the returned data.
 * @param tableName   [IN] The Hbase table name, e.g. "MyTable".
 * @param rowKey      [IN] The Hbase row key, e.g. "kdr23790".
 * @param columnName  [IN] The "column family : qualifier".
 * @return True for successfully get the row, false otherwise.
 */
bool CHbaseOperate::getRow(hbaseRet &result, const std::string &tableName, const std::string &rowKey, const std::string &columnName)
{
  if (!isConnected) {
    LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't read data from it");
    return false;
  }
  
  std::vector<std::string> columnNames;
  columnNames.push_back(columnName);
  
  std::vector<TRowResult> rowResult;
  try {
    client->getRowWithColumns(rowResult, tableName, rowKey, columnNames);
  } catch (const TException &tx) {
    LOG4CXX_ERROR(logger, "Operate Hbase error : " << tx.what());
    return false;
  }
  
  if (0 == rowResult.size()) {
    LOG4CXX_WARN(logger, "Got no record with the key : [" << rowKey << "]");
    return false;
  }
  
  std::map<std::string, TCell>::const_iterator it = rowResult[rowResult.size() -1].columns.begin();
  result.rowValue = it->second.value;
  result.ts = it->second.timestamp;
  
  return true;
}

注意我在程序中使用了Apache
log4cxx
这个记录日志的库来打印/保存程序运行日志,使用方法可参考此链接。如果你不想用,可以自己改为std::cout。
代码有了,使用方法为:可以在你的程序中创建一个全局对象:

1
CHbaseOperate g_ho;

在需要操作Hbase之前:

1
g_ho.connect("192.168.55.66", 9090);

其中,“192.168.55.66”和9090分别是你的Hbase Thrift service的服务器地址和端口号,你需要正确地配置好,才能使用。本文开头已经说了,本文不讨论这方面的问题。
在操作完Hbase之后:

1
g_ho.disconnect();

文章来源:http://www.codelast.com/
现在再来说一下读写操作Hbase的两个函数:putRow()和getRow()。
putRow():

1
2
3
4
bool  putRow(const std::string &tableName,
              const std::string &rowKey,
              const std::string &column,
              const std::string &rowValue);

这是向Hbase写入一条记录的函数,参数tableName为Hbase表名,即你要将记录写到哪个Hbase表中;参数rowKey为待写入的记录的key;参数column为待写入的记录的“column family:qualifier”组合,参数rowValue为待写入的记录的value。

getRow():

1
2
3
4
bool  getRow(hbaseRet &result,
              const std::string &tableName,
              const std::string &rowKey,
              const std::string &columnName);

这是从Hbase中读取一条记录的函数,参数tableName为Hbase表名,即你要从哪个Hbase表中读取记录;参数rowKey为你要查询的记录的key;参数columnName为你要查询的记录的“column family:qualifier”组合;参数result为返回的Hbase的数据,它包含记录的value和记录的时间戳:

1
2
3
4
5
6
7
8
9
typedef struct hbaseRet {
  std::string rowValue;
  time_t ts; 
  
  hbaseRet() {
    ts = 0;
  }
  
} hbaseRet;

文章来源:http://www.codelast.com/
至于操作的结果对不对,可以在Hbase shell中用get, scan等命令来验证,具体方法请看Hbase shell的help。另外,最好再写一些unit test来测试。
如果你要为CHbaseOperate类添加功能,可以参考Hbase.h文件中的函数定义。如你所见,CHbaseOperate类主要也是调用了里面的函数,只不过这个类可以让一些不太熟悉Hbase概念的人可以更方便地操作Hbase罢了。

请关注公众号获取更多资料

发表评论