Hbase的API使用(create,put,delete,get)

By | 2019年2月12日

构建三张表:1.SpaceName:weibo

表1:内容表:weibo:content  列族:info(用户发布的信息),列:content K      值:content V  Version:时间戳

表2:用户关系表:weibo:relation  列族1:ateend(关注的人)列:attend K  值:attend V  列族2:fans(粉丝),列:uid k 值:  uid

表3:收件箱:weibo:index,列族:info,列:uid K   值:uid V    

三张的表的行键都是:uid+"_"+timestamp

rowKey设计都是:用户ID_timestamp

Maven用到的pom 文件


        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

class1 初始化名字表格名字:

public class Content {
    //定义微博名字  Hbase 创建表 构建表空间以及表
    public static final String NAMESPACE="weibo";
    public static final String RELATION="weibo:relation";
    public static final String CONTENT="weibo:content";
    public static final String INDEX="weibo:index";

}

class2:构建Hbase的工具类

package Weibo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.mortbay.log.Log;

import java.io.IOException;
import java.util.ArrayList;

public class HbaseUtils {
    //创建表 需要获取配置消息
    private static Configuration configuration = HBaseConfiguration.create();
    static {
        configuration.set("hbase.zookeeper.quorum","192.168.128.111");
        configuration.set("hbase.rootdir","hdfs://192.168.128.111:9000/hbase");
    }

    //创建表需要创建表名称
    public static void createNS(String name) throws IOException {
        //获取客户端连接
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取连接
        Admin admin = connection.getAdmin();
        //获取admin
        NamespaceDescriptor ns = NamespaceDescriptor.create(name).build();
        //创建表空间
        admin.createNamespace(ns);
        HbaseUtils.nameSpaceExits(Content.NAMESPACE);
        close(admin,connection);
    }
    //删除表空间
    public static void deleteNS(String name) throws IOException {
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();
        admin.deleteNamespace(name);
        close(admin,connection);

    }
    //创建表
    public static void createTable(String name,int version,String...strings) throws IOException {
        //判断表空间以及表是否存在
        if(!HbaseUtils.nameSpaceExits(Content.NAMESPACE))return;
        if(!HbaseUtils.isTableExits(name))return;
        //获取连接构建表
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();
        //创建表 还有列族
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(name));
        //构建列族描述器
        /**
         * 1.在插入数据的时候防止数据出现意外丢失 可以开启wal 日志模式
         * ASYNC_WAL 当数据变动时,异步写WAL日志
         * SYNC_WAL 当数据变动时,同步写WAL日志
         * FSYNC_WAL 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘
         * SKIP_WAL 不写WAL日志
         * USE_DEFAULT 默认级别,即SYNC_WAL
         */

        for(String s:strings){
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(Bytes.toBytes(s));
            hTableDescriptor.addFamily(hColumnDescriptor);
            // 压缩内存中和文件中的数据,默认为NONE(不压缩)
            hColumnDescriptor.setDataBlockEncoding(DataBlockEncoding.NONE);
            //bloom过滤器
            hColumnDescriptor.setBloomFilterType(BloomType.ROW);
            //数据保存的最长时间,即TTL,单位是ms
            hColumnDescriptor.setTimeToLive(18000);
            //数据存储的压缩类型,默认为无压缩(默认none)
//            hColumnDescriptor.setCompressionType(Compression.Algorithm.SNAPPY);
            //是否保存那些已经删除掉的cell
            hColumnDescriptor.setKeepDeletedCells(false);
            //设置数据保存在内存中已提高相应速度
            hColumnDescriptor.setInMemory(true);
            //块缓存,保存这每个HFile数据块的startKey
            hColumnDescriptor.setBlockCacheEnabled(true);
            //块的大小,默认是值65536 64MB
            hColumnDescriptor.setBlocksize(64 * 1024);
            hColumnDescriptor.setMaxVersions(version);
            hColumnDescriptor.setMinVersions(version);
        }
        admin.createTable(hTableDescriptor);
        close(admin,connection);
    }
    //删除表
    public static void dropTable(String name) throws IOException {
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();
        admin.disableTable(TableName.valueOf(name));
        admin.disableTable(TableName.valueOf(name));
        //关闭资源
        close(admin,connection);
    }
    //插入数据
    public static void publish(String uid,String content) throws IOException {
        //插入数据
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取表
        Table table = connection.getTable(TableName.valueOf(Content.CONTENT));
        //构建rowkey = uid + timestaps
        long time = System.currentTimeMillis();
        String rowKey = uid + "_" + time;
        //构建put对象
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content));
        //将数据插入到内容表里取
        table.put(put);
        //收件箱 粉丝以及你关注的对象
        Table relTable = connection.getTable(TableName.valueOf(Content.RELATION));
        //获取粉丝信息
        Get get = new Get(Bytes.toBytes(uid));
        get.addFamily(Bytes.toBytes("fans"));
        Result result = relTable.get(get);
        //判断
        if (result.isEmpty()){
            relTable.close();
            table.close();
            close(null,connection);
            return;
        }
        //不为空
        Table indexTable = connection.getTable(TableName.valueOf(Content.INDEX));
        //我需要将粉丝的id插入inbox中
        ArrayList<Put> puts = new ArrayList<Put>();
        for(Cell cell:result.rawCells()){
            byte[] qualifier = CellUtil.cloneQualifier(cell);
            Put put1 = new Put(qualifier);
            put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), Bytes.toBytes(rowKey));
            puts.add(put1);
        }
        indexTable.put(puts);
        //关闭资源
        relTable.close();
        table.close();
        indexTable.close();
        close(null,connection);
    }

    /**
     * 关注用户
     */
    public static void attend(String uid,String...strings) throws IOException {
        //关注用户  本人成为关注者的粉丝,收件箱 rowkey 就是本人 列值就是关注的用户
        if(strings.length<=0) return;
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table relTable = connection.getTable(TableName.valueOf(Content.RELATION));
        //将关注者插入到relation表中
        Put put = new Put(Bytes.toBytes(uid));
        ArrayList<Put> puts = new ArrayList<Put>();
        //关注好友
        for (String s:strings){
            put.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(s), Bytes.toBytes(s));
            //粉丝表
            Put fanPut = new Put(Bytes.toBytes(s));
            fanPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(s), Bytes.toBytes(uid));
            puts.add(fanPut);
        }
        puts.add(put);
        relTable.put(puts);
        //更新收件箱
        Table index = connection.getTable(TableName.valueOf(Content.INDEX));
        Table conTable = connection.getTable(TableName.valueOf(Content.CONTENT));
        //获取本人收件箱
        Put inPut = new Put(Bytes.toBytes(uid));
        //我收到信息都是我关注的人
        long time = System.currentTimeMillis();
        //将我关注的人信息插入的我的收件箱中
        for (String s:strings){
            //构建rowkey 的预分区
            Scan scan = new Scan(Bytes.toBytes(s + "_"), Bytes.toBytes(s + "|"));
            //获取关注者最近发布的微博信息,将微博信息插入到我收件箱中
            ResultScanner results = conTable.getScanner(scan);
            //将results 数据插入到收件箱中
            for (Result r:results){
                byte[] row = r.getRow();
                inPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s), time++, row);
            }
        }
        //收件箱插入数据
        if(!inPut.isEmpty()){
            index.put(inPut);
        }
        //关流  释放资源
        conTable.close();
        index.close();
        relTable.close();
        close(null,connection);
    }
    //取消关注
    public static void delUser(String uid,String...dels) throws IOException {
        if(dels.length<=0) return;
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table relTable = connection.getTable(TableName.valueOf(Content.RELATION));
        Delete delete = new Delete(Bytes.toBytes(uid));
        ArrayList<Delete> deles = new ArrayList<Delete>();
        for(String d:dels){
            delete.addColumn(Bytes.toBytes("attend"), Bytes.toBytes(d));
            //删除 关注者的粉丝 我的数据
            Delete ad = new Delete(Bytes.toBytes(d));
            ad.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
            deles.add(ad);
        }
        deles.add(delete);
        relTable.delete(deles);
        //删除index中的数据
        Table inTable = connection.getTable(TableName.valueOf(Content.INDEX));
        Delete inD = new Delete(Bytes.toBytes(uid));
        for (String d:dels){
            inD.addColumn(Bytes.toBytes("info"),Bytes.toBytes(d));
        }
        inTable.delete(inD);
        inTable.close();
        relTable.close();
        close(null,connection);
    }
    //获取某个用户所有的数据信息
    public static void getData(String uid) throws IOException {
        //1.获取连接
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table conTable = connection.getTable(TableName.valueOf(Content.CONTENT));
        //g构建过滤器
        Scan scan = new Scan();
        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_"));
        scan.setFilter(rowFilter);
        //获取数据
        ResultScanner results = conTable.getScanner(scan);
        //便利结果输出
        for(Result r:results){
            for (Cell cell:r.rawCells()){
                System.out.println("RowKey:"+Bytes.toString(CellUtil.cloneRow(cell))+",Content:"+Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
        //释放资源
        conTable.close();
        close(null,connection);
    }
    //初始化用户界面首页 显示数据条目
public static void getInit(String uid) throws IOException {
        //获取用户界面的数据
    Connection connection = ConnectionFactory.createConnection(configuration);
    Table inTable = connection.getTable(TableName.valueOf(Content.INDEX));
    //获取数据  获取关注用户的信息
    Get get = new Get(Bytes.toBytes(uid));
    //限制的是该用户获取其他用户的最大信息条数
    get.setMaxVersions(1);
    Result result = inTable.get(get);
    ArrayList<Get> gets = new ArrayList<Get>();
    for (Cell cell:result.rawCells()){
        Get g = new Get(CellUtil.cloneValue(cell));
        gets.add(g);
    }
    //获取自己信息
    Table conTable = connection.getTable(TableName.valueOf(Content.CONTENT));
    Result[] results = conTable.get(gets);
    //循环输出内容
    for(Result r:results){
        for (Cell cell:r.rawCells()){
            System.out.println("RowKey:"+Bytes.toString(CellUtil.cloneRow(cell))+",Content:"+Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }
    //释放资源
    conTable.close();
    inTable.close();
    close(null,connection);
}
    //写个方法 判断当前表是否存在
    public static boolean isTableExits(String name) throws IOException {
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();
        boolean tableExists = admin.tableExists(TableName.valueOf(name));
        close(admin,connection);
        return tableExists;
    }

    //写个方法 判断当前namespace 是否存在
    public static boolean nameSpaceExits(String name) throws IOException {
        //查看当前表空间
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取admin
        Admin admin = connection.getAdmin();
        NamespaceDescriptor ns = admin.getNamespaceDescriptor(name);
        Log.info("获取表空间的信息"+ns.getConfiguration());
        return Boolean.TRUE;

    }
    public static void close(Admin admin,Connection connection) throws IOException {
        if(admin !=null){
            admin.close();
        }
        if (connection!=null){
            connection.close();
        }
    }

}

class3:测试类

package wang;

import java.io.IOException;

public class Weibo {

    private static void init() throws IOException {
        //创建命名空间
        HBaseUtilss.createNS(Content.NAMESPACE);

        //创建三张表
        HBaseUtilss.createTable(Content.CONTENT, 1, "info");
        HBaseUtils.createTable(Content.RELATION, 1, "attends", "fans");
        HBaseUtils.createTable(Content.INBOX, 2, "info");
    }

    /**
     * 删除重来
     *
     * @throws Exception
     */
    private static void dele() throws Exception {
        //        删除表
        HBaseUtils.dropTable(Content.CONTENT);
        HBaseUtils.dropTable(Content.INBOX);
        HBaseUtils.dropTable(Content.RELATION);
//        删除命名空间
        HBaseUtils.deleteNS(Content.NAMESPACE);
    }


    public static void main(String[] args) throws Exception {
        //初始化命名空间、三个表
//        dele();
//        init();


        //关注  第一个为关注者、后边的为fans
//        HBaseUtils.attend("1001", "1002", "1003");
//        HBaseUtils.attend("1002", "1001", "1003");

        //取消关注
        HBaseUtils.delUser("1001", "1002");

        //发布微博
//        HBaseUtils.publish("1001", "我是Andy一号!!");
//        HBaseUtils.publish("1001", "我是Andy二号!!");
//        HBaseUtils.publish("1001", "我是1001");
//        HBaseUtils.publish("1002", "我发了1条消息");
//        HBaseUtils.publish("1002", "我发了2条消息");
//        HBaseUtils.publish("1002", "我发了3条消息");
//        HBaseUtils.publish("1002", "我发了3条消息");
//        HBaseUtils.publish("1002", "我发了4条消息");
//        HBaseUtils.publish("1003", "我是1003一号");
//        HBaseUtils.publish("1003", "我是1003二号");
//        HBaseUtils.getData("1001");
        System.out.println("-------1001getInt-------");
        HBaseUtils.getInit("1001");
//        System.out.println("-------1002getInt-------");
//        HBaseUtils.getInit("1002");
//
//        System.out.println("-------1001getData-------");
//        HBaseUtils.getData("1001");
//
//        System.out.println("-------1002getData-------");
//        HBaseUtils.getData("1002");
//
//        System.out.println("-------1003getData-------");
//        HBaseUtils.getData("1003");
    }
}

 

请关注公众号获取更多资料

发表评论