hadoop18–JavaAPI, 读写流程, 物理模型

By | 2018年11月27日
版权声明:本文为博主原创文章,转载请标明出处。https://blog.csdn.net/kXYOnA63Ag9zqtXx0/article/details/82954503 https://blog.csdn.net/forever428/article/details/84111881

hbase的物理模型

hbase的物理模型, 就是说在hbase中数据是如何存储的, 以及存储的位置和原理

在hbase中最小的存储单元是cell(单元格): rowkey + 列簇 + 时间戳 + value, 可以唯一确定一个单元格的值

在hbase物理模型中, rowkey所在行的方向上可以划分多个region, 并且是按照字典排序的

region可以理解为分区的概念, 一个region 里边会存在一个列簇+value

hbase中存在系统管理表

  1. hbase: namespace: 记录了当前系统中所有的namespace的信息

    查看系统中所有的namespace

hbase(main):017:0> scan 'hbase:namespace'
ROW                                        COLUMN+CELL
 default                                   column=info:d, timestamp=1542080423695, value=\x0A\x07default
 hbase                                     column=info:d, timestamp=1542080424908, value=\x0A\x05hbase
2 row(s) in 0.0150 seconds
  1. hbase:meta: 记录了所有表的region信息
hbase(main):020:0> scan 'hbase:meta'
ROW                                        COLUMN+CELL
 hadoop,,1542146532516.edec440c50dc8b6a571 column=info:regioninfo, timestamp=1542146534724, value={ENCODED => edec440c50dc8b6a571ddbe678ce0c9c, NAME => 'hadoop,,1542
 ddbe678ce0c9c.                            146532516.edec440c50dc8b6a571ddbe678ce0c9c.', STARTKEY => '', ENDKEY => ''}
 hadoop,,1542146532516.edec440c50dc8b6a571 column=info:seqnumDuringOpen, timestamp=1542146534724, value=\x00\x00\x00\x00\x00\x00\x00\x02
 ddbe678ce0c9c.
 hadoop,,1542146532516.edec440c50dc8b6a571 column=info:server, timestamp=1542146534724, value=hadoop02:16020
 ddbe678ce0c9c.
 hadoop,,1542146532516.edec440c50dc8b6a571 column=info:serverstartcode, timestamp=1542146534724, value=1542142878250
 ddbe678ce0c9c.
 hbase:namespace,,1542080421925.833f0607b5 column=info:regioninfo, timestamp=1542143164503, value={ENCODED => 833f0607b5229ac2884356c723234928, NAME => 'hbase:namesp
 229ac2884356c723234928.                   ace,,1542080421925.833f0607b5229ac2884356c723234928.', STARTKEY => '', ENDKEY => ''}
 hbase:namespace,,1542080421925.833f0607b5 column=info:seqnumDuringOpen, timestamp=1542143164503, value=\x00\x00\x00\x00\x00\x00\x00
 229ac2884356c723234928.
 hbase:namespace,,1542080421925.833f0607b5 column=info:server, timestamp=1542143164503, value=hadoop03:16020
 229ac2884356c723234928.
 hbase:namespace,,1542080421925.833f0607b5 column=info:serverstartcode, timestamp=1542143164503, value=1542142884167
 229ac2884356c723234928.
2 row(s) in 0.0440 seconds

hbase的读取流程

  1. 客户端根据表名和rowkey 找到对应的region
  2. 在zookeeper中记录了meta表中的region信息, 客户端就可以从zookeeper中得到region信息
  3. 在客户端拿到region信息之后, 就可以得到regionserver信息, 就可以访问regionserver
  4. 客户端去找对应的region

hbase的写入流程

  1. 客户端根据表名和rowkey找到对就的region

  2. 在zookeeper中存储了region的信息表, 客户端从meta表中得到对应的region信息

  3. 预写日志 : WAL (write ahead log)找到对应的regionserver ,不是直接写在region上, 写到region中的store中的memstore中. 所有的客户端的命令都会记录在这个日志里, 他的作用是为了防止在进行数据写入的时候, 服务器突然宕机 , 造成数据的丢失, 这样可以从hlog中进行恢复

  4. 每次用户写入的时候, 同时写入数据到hlog和memstore, 并且在regionserver上只有一个hlog

  5. 当memstore达到一个阈值的时候, 会进行flush操作, 就会把memstore中的数据写到storeFile文件中, 该文件是保存到HDFS上的.

  6. 当storeFile文件数量增长到一定数量时, 会进行合并操作, 会形成更大的storeFile, 在合并文件的过程中存在三种合并机制minor, major, campaction

  7. 当region上的文件过大, 达到一定大小的时候会进行等分, 会进行split操作, 分裂成两个region, 其中一个region会由master分配到其他的 regionserver 进行负载均衡

  • 在hbase上对于文件的删除

    先给要删除的文件数据打上删除标记, 在合并的过程中, 进行删除, 例如版本号较老的数据, 在hbase上会存在保存时间, 当达到一定时间之后, 版本老的数据将会在合并的过程中删除

hbase的Java API

对于hbase的API操作, 其实就是调用hbase中的接口, 通过java进行数据库的操作, 类似于jdbc的操作

  1. 创建namespace
@Test
public void createNamespace() throws IOException {
    // 获得配置信息
    Configuration conf = HBaseConfiguration.create();
    // 获得连接
    Connection connection = ConnectionFactory.createConnection(conf);
    // 获得管理
    Admin admin = connection.getAdmin();
    // namespace描述
    NamespaceDescriptor nsDesc = NamespaceDescriptor.create("hadoop").build();
    // 创建namespace
    admin.createNamespace(nsDesc);
}
  1. 创建表, 以及列簇
@Test
public void creaetTable() throws IOException {
    // 获得配置信息
    Configuration conf = HBaseConfiguration.create();
    // 获得连接
    Connection connection = ConnectionFactory.createConnection(conf);
    // 获得管理
    Admin admin = connection.getAdmin();
    // 添加表名
    TableName tableName = TableName.valueOf("hadoop:student");
    // 添加表描述
    HTableDescriptor hTableDesc = new HTableDescriptor(tableName);
    // 添加列簇描述
    HColumnDescriptor hcol = new HColumnDescriptor("f1");
    // 将列簇添加到表中
    hTableDesc.addFamily(hcol);
    // 创建表
    admin.createTable(hTableDesc);
}
  1. 删除表
@Test
public void delTable() throws IOException {
    // 获得配置信息
    Configuration conf = HBaseConfiguration.create();
    // 获得连接
    Connection connection = ConnectionFactory.createConnection(conf);
    // 获得管理
    Admin admin = connection.getAdmin();
    // 添加表名
    TableName tableName = TableName.valueOf("hadoop:student");
    // 禁用表
    admin.disableTable(tableName);
    // 删除表
    admin.deleteTable(tableName);
}
  1. 添加数据
@Test
public void putData() throws IOException {
    // 获得配置信息
    Configuration conf = HBaseConfiguration.create();
    // 获得连接
    Connection connection = ConnectionFactory.createConnection(conf);
    // 添加表名
    TableName tableName = TableName.valueOf("hadoop:student");
    // 获得表
    Table table = connection.getTable(tableName);
    // 添加rowkey
    Put put = new Put(Bytes.toBytes("1001"));
    // 要添加的数据
    put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("zhangsan"));
    // 执行
    table.put(put);
}
  1. 删除数据
@Test
public void delData() throws IOException {
    // 获得配置信息
    Configuration conf = HBaseConfiguration.create();
    // 获得连接
    Connection connection = ConnectionFactory.createConnection(conf);
    // 添加表名
    TableName tableName = TableName.valueOf("hadoop:student");
    // 获得表
    Table table = connection.getTable(tableName);
    // 添加rowkey	
    Delete del = new Delete(Bytes.toBytes("1001"));
    // 执行删除
    table.delete(del);
}
  1. 查询数据
@Test
public void getData() throws IOException {
    // 获得配置信息
    Configuration conf = HBaseConfiguration.create();
    // 获得连接
    Connection connection = ConnectionFactory.createConnection(conf);
    // 添加表名
    TableName tableName = TableName.valueOf("hadoop:student");
    // 获得表
    Table table = connection.getTable(tableName);
    // 添加rowkey
    Get get = new Get(Bytes.toBytes("1001"));
    // 拿到值
    Result result = table.get(get);
    // 放到单元格里
    Cell[] cells = result.rawCells();
    // 遍历
    for (Cell cell : cells) {
    // 列簇
    System.out.println(Bytes.toString(CellUtil.cloneFamily(cell)));
    // 列名
    System.out.println(Bytes.toString(CellUtil.cloneQualifier(cell)));
    // rowkey
    System.out.println(Bytes.toString(CellUtil.cloneRow(cell)));
    // 值
    System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
    // 时间戳
    System.out.println(cell.getTimestamp());
    }
}
  1. 通过scan进行查询
@Test
public void scanDate() throws IOException {

    // 读取配置文件; 先读取默认的配置文件, 在读取自定义的配置文件
    Configuration conf = HBaseConfiguration.create();

    // 建立hbase的连接
    Connection connection = ConnectionFactory.createConnection(conf);

    TableName tableName = TableName.valueOf("hadoop25:stuinfo");

    Table table = connection.getTable(tableName);

    Scan scan = new Scan();

    // 当表中存在多个列簇的时候,可以指定列簇进行查询
    scan.addFamily(Bytes.toBytes("info"));

    //			scan.addColumn(family, qualifier)
    // family: 列簇
    // qualifier: 列
    ResultScanner scanner = table.getScanner(scan);

    for (Result result : scanner) {

    Cell[] cells = result.rawCells();

    for (Cell cell : cells) {

    // rowkey
    System.out.println(Bytes.toString(CellUtil.cloneRow(cell)));

    // 列簇
    System.out.println(Bytes.toString(CellUtil.cloneFamily(cell)));

    // 列
    System.out.println(Bytes.toString(CellUtil.cloneQualifier(cell)));

    // 数据内容
    System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));

    // 时间戳
    System.out.println(cell.getTimestamp());
    }
    }

}

发表评论