SpringBoot –集成HBASE/基于SpringDataHadoop

By | 2019年2月2日

版权声明:本文为博主原创文章,转载、引用需署名作者以及注明文章出处。 https://blog.csdn.net/cwenao/article/details/57980188

前置工作

  • Hadoop安装配置 : hadoop-2.7.3
  • Hbase安装配置: hbase-1.2.4
  • zookeeper安装配置: zookeeper-3.4.9
  • hbase-client中guava版本与SpringDataHadoop(2.4.0.RELEASE)版本中的guava版本问题
  • Springboot的其他章节,需要了解
  • Hostname 绑定
  • hadoop home问题,配置HADOOP_HOME
  • 测试时程序内指定 hadoop.home.dir:System.setProperty(“hadoop.home.dir”, “D:\\dev_evn\\hadoop-2.7.3”);

Hadoop基础依赖包

  • 因为hbase-client中guava(12.1)版本与SpringDataHadoop版本中的guava(18.0)版本冲突
    所以做了依赖基础包,目前可用;思路来自于ElasticSearch官方解决方案

创建hadoop-base-bootcwenao moudle

引入相关依赖

build.gradle

apply plugin: 'org.springframework.boot'

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:" + springCloudVersion
        mavenBom "org.springframework.boot:spring-boot-starter:"+ springBootVersion
    }
}
repositories {
    mavenCentral()
}

dependencies {
    compile ('org.springframework.data:spring-data-hadoop:'+ springDataHadoopVersion)
    compile ('org.apache.hadoop:hadoop-common:'+hadoopVersion)
}
configurations {
    all*.exclude module: 'spring-boot-starter-logging'
    all*.exclude module: 'logback-classic'
    all*.exclude module: 'log4j-over-slf4j'
    all*.exclude module: 'slf4j-log4j12'
    all*.exclude module: 'snappy-java'
}
jar {
    baseName = 'hadoop-base-bootcwenao'
}

创建main入口(很重要)

HadoopBaseApplication.java

/**
 * @author cwenao
 * @version $Id HadoopBaseApplication.java, v 0.1 2017-02-23 13:51 cwenao Exp $$
 */
public class HadoopBaseApplication {
    public static void main(String[] args) {

    }
}

创建 BigData module

引入hbase-client,排除servlet-api、guava:18.0,引入hadoop基础依赖包hadoop-base-bootcwenao

dependencies {

    compile project(':hadoop-base-bootcwenao')

    compile ('org.springframework.data:spring-data-redis')

    compile ('org.springframework.boot:spring-boot-starter-data-mongodb:'+springBootVersion)
    compile ('org.apache.hbase:hbase-client:'+hbaseClientVersion)
    compile ('org.springframework.boot:spring-boot-starter-web:'+springBootVersion)
    compile('org.springframework.cloud:spring-cloud-starter-eureka')
    compile ('mysql:mysql-connector-java:'+mysqlVersion)
    compile ('com.alibaba:druid:'+druidVersion)
    compile ('org.mybatis:mybatis-spring:'+mybatisSpringBootVersion)
    compile ('org.mybatis:mybatis:'+mybatisVersion)
    compile('org.springframework.boot:spring-boot-starter-log4j2')
    compile ('org.springframework.boot:spring-boot-starter-thymeleaf')
    compile ('net.sourceforge.nekohtml:nekohtml:'+nekoHtmlVersion)
    compile('org.apache.logging.log4j:log4j-1.2-api:'+ log4jAPIVersion)
    /*compile('org.springframework.boot:spring-boot-starter-jdbc')*/
    compile('org.springframework.boot:spring-boot-starter-aop')
    compile ('com.alibaba:fastjson:'+fastjsonVersion)
    compile ('redis.clients:jedis')
    compile ('com.google.guava:guava:12.0.1')

    testCompile ('org.springframework.boot:spring-boot-starter-test')
    testCompile group: 'junit', name: 'junit', version: '4.11'
}


configurations {
    all*.exclude module: 'spring-boot-starter-logging'
    all*.exclude module: 'servlet-api'
    all*.exclude group: 'com.google.guava', module: 'guava:18.0'
    all*.exclude module: 'logback-classic'
    all*.exclude module: 'log4j-over-slf4j'
    all*.exclude module: 'slf4j-log4j12'
    all*.exclude module: 'snappy-java'
}

创建hbase资源文件hbase.properties

hbase.zk.host=127.0.0.1
hbase.zk.port=2181

创建hbase-spring.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:hdp="http://www.springframework.org/schema/hadoop"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">

    <context:property-placeholder location="classpath:/config/hbase.properties"/>

    <hdp:configuration id="hadoopConfiguration">
        fs.defaultFS=hdfs://127.0.0.112:9000
    </hdp:configuration>

    <hdp:hbase-configuration configuration-ref="hadoopConfiguration" zk-quorum="${hbase.zk.host}" zk-port="${hbase.zk.port}" delete-connection="true"/>

    <bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
        <property name="configuration" ref="hbaseConfiguration"/>
    </bean>

</beans>

使用@ImportResource导入xml

/**
 * @author cwenao
 * @version $Id BigDataApplication.java, v 0.1 2017-02-21 22:38 cwenao Exp $$
 */
@SpringBootApplication
@EnableDiscoveryClient
@ImportResource(locations = {"classpath:/config/hbase-spring.xml"})
public class BigDataApplication {
    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "D:\\\\dev_evn\\\\hadoop-2.7.3");
        SpringApplication.run(BigDataApplication.class, args);
    }
}

配置 mongodb、thymeleaf、redis、eureka等

application.yml

server:
  port: 8686
eureka:
  instance:
    hostname: bigdataserver
    prefer-ip-address: true
  client:
    registerWithEureka: true
    fetchRegistry: true
    service-url:
      defaultZone: http://aa:abcd@localhost:8761/eureka/
spring:
  thymeleaf:
    cache: false
    mode: LEGACYHTML5
    prefix: classpath:/web/
    suffix: .html
    content-type: text/html
  redis:
    host: 127.0.0.1
    port: 6379
    password: password
    timeout: 5000
    pool:
      max-idle: 8
      min-idle: 0
      max-active: 8
      max-wait: -1
  data:
    mongodb:
      uri: mongodb://username:password@127.0.0.1:27017/kakme:27017/kakme

bootstrap.yml

spring:
  application:
    name: bigdataserver
  aop:
    auto: true
  cloud:
    stream:
      kafka:
        binder:
          brokers: 127.0.0.1:9092
          zk-nodes: 127.0.0.1:2181
logging:
  config: classpath:log4j2-spring.xml

配置Hostname(重要)

  • hbase需要zk,而zk在在hbase服务端返回的时候返回的是hostname
  • 所以需要将服务端的hostname,在本地也进行一次绑定
  • windowns下hosts中配置: xxx.xx.xx.xxx master, xxx.xx.xx.xxx为服务器端地址

创建查询方法

  • 如果只是测试或者不嫌麻烦可以用hbaseTemplate一个个写
  • 比较懒所以扩展了下,思想来自于网络大神

创建HbaseFindBuilder.java

HbaseFindBuilder.java

/**
 * Company
 * Copyright (C) 2014-2017 All Rights Reserved.
 */
package com.bootcwenao.bigdataserver.hbase.handler;

import com.bootcwenao.bigdataserver.utils.HumpNameOrMethodUtils;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.beans.PropertyDescriptor;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * 按qualifier返回结果
 * @author cwenao
 * @version $Id HbaseFindBuilder.java, v 0.1 2017-02-20 16:05 cwenao Exp $$
 */
public class HbaseFindBuilder<T> {

    private String family;

    private Result result;

    private String qualifier;

    private Map<String, PropertyDescriptor> fieldsMap;

    private Set<String> propertiesSet;

    private Set<String> qualifierSet;

    private BeanWrapper beanWrapper;

    private T tBean;

    /**
     * 按family查询
     * @param family
     * @param result
     * @param tclazz
     */
    public HbaseFindBuilder(String family, Result result, Class<T> tclazz) {

        this.family = family;
        this.result = result;
        fieldsMap = new HashMap();
        propertiesSet = new HashSet<>();

        reflectBean(tclazz);

    }

    /**
     * return the result by qulifier
     * @param qualifier
     * @return
     */
    public HbaseFindBuilder build(String qualifier) {

        return this.build(qualifier,"");
    }

    /**
     * by multiple qualifier
     * @param qualifiers
     * @return
     */
    public HbaseFindBuilder build(String... qualifiers) {

        if (qualifiers == null || qualifiers.length == 0) {
            return this;
        }
        PropertyDescriptor p = null;
        byte[] qualifierByte = null;

        for (String qualifier : qualifiers) {
            if (StringUtils.isEmpty(qualifier)) {
                continue;
            }
            p = fieldsMap.get(qualifier.trim());
            qualifierByte = result.getValue(family.getBytes(), HumpNameOrMethodUtils.humpEntityForVar(qualifier).getBytes());
            if (qualifierByte != null && qualifierByte.length > 0) {
                beanWrapper.setPropertyValue(p.getName(),Bytes.toString(qualifierByte));
                propertiesSet.add(p.getName());
            }
        }

        return this;
    }

    /**
     * by map
     * @param map
     * @return
     */
    public HbaseFindBuilder build(Map<String,String> map) {

        if (map == null || map.size() <= 0) {
            return this;
        }

        PropertyDescriptor p = null;
        byte[] qualifierByte = null;

        for (String value : map.values()) {
            if (StringUtils.isEmpty(value)) {
                continue;
            }

            p = fieldsMap.get(value.trim());
            qualifierByte = result.getValue(family.getBytes(), HumpNameOrMethodUtils.humpEntityForVar(value).getBytes());

            if (qualifierByte != null && qualifierByte.length > 0) {
                beanWrapper.setPropertyValue(p.getName(), Bytes.toString(qualifierByte));
                propertiesSet.add(p.getName());
            }
        }

        return this;
    }

    private void reflectBean(Class<T> tclazz) {

        tBean = BeanUtils.instantiate(tclazz);

        PropertyDescriptor[] propertyDescriptors = BeanUtils.getPropertyDescriptors(tclazz);

        for (PropertyDescriptor p : propertyDescriptors) {
            if (p.getWriteMethod() != null) {
                this.fieldsMap.put(p.getName(), p);
            }
        }

        beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(tBean);
    }

    public T fetch() {
        if (!CollectionUtils.isEmpty(propertiesSet)) {
            return this.tBean;
        }
        return null;
    }
}

创建 Bean对应 family

public class UserInfo {

    private String id;

    private String userName;

    private Integer age;
    //setter getter ......
}

创建bean中属性对应 qualifier转换 驼峰命名,hbase中table需要严格按要求

/**
 * Company
 * Copyright (C) 2014-2017 All Rights Reserved.
 */
package com.bootcwenao.bigdataserver.utils;

import org.springframework.util.StringUtils;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;

/**
 * Transform the entity attribute hbase column attribute
 * @author cwenao
 * @version $Id HumpNameOrMethodUtils.java, v 0.1 2017-02-20 16:27 cwenao Exp $$
 */
public class HumpNameOrMethodUtils {

    private final static String SEPARATOR_UNDER_SCORE = "_";

    /**
     * 用驼峰命名法 将参数转换为Entity属性
     * @param var
     * @return
     */
    public static String humpVarForEntity(String var) {

        if (StringUtils.isEmpty(var)) {
            return "";
        }

        StringBuffer varBf = new StringBuffer();

        var = var.replaceFirst(var.substring(0,1),var.substring(0,1).toLowerCase(Locale.US));

        if (var.indexOf(SEPARATOR_UNDER_SCORE) > 0) {

            String[] underStr = var.split(SEPARATOR_UNDER_SCORE);

            for(int i =0; i<underStr.length;i++) {

                if (i == 0) {
                    varBf.append(underStr[i]);
                } else {
                    varBf.append(str2LowerCase(underStr[i]));
                }
            }
        }

        return varBf.toString();
    }

    /**
     * 用驼峰命名法 将Entity属性转换为参数
     * @param var
     * @return
     */
    public static String humpEntityForVar(String var) {

        if (StringUtils.isEmpty(var)) {
            return "";
        }

        StringBuffer varBf = new StringBuffer();

        char[] varChar = var.toCharArray();

        int i = 0;
        for(char c : varChar) {

            if (i==0) {
                varBf.append(String.valueOf(c));
            } else {
                if (compareToLowerCase(String.valueOf(c))) {
                    varBf.append("_" + String.valueOf(c).toLowerCase());
                } else {
                    varBf.append(String.valueOf(c));
                }
            }
            i++;
        }

        return varBf.toString();
    }


    /**
     * 将首位字符转换为大写
     * @param str
     * @return
     */
    private static String str2LowerCase(String str) {
        if (StringUtils.isEmpty(str)) {
            return "";
        }
        return str.replaceFirst(str.substring(0, 1), str.substring(0, 1).toUpperCase());
    }

    /**
     * 是否大写字母
     * @param source
     * @return
     */
    private static Boolean compareToLowerCase(String source) {

        if (StringUtils.isEmpty(source)) {
            return false;
        }

        if (!source.equals(source.toLowerCase(Locale.US))) {
            return true;
        }
        return false;
    }
}

调用HbaseFindBuilder

/**
 * @author cwenao
 * @version $Id HbaseAccountInfoMapperImpl.java, v 0.1 2017-02-21 21:14 cwenao Exp $$
 */
@Repository("hbaseAccountInfoMapperImpl")
public class HbaseAccountInfoMapperImpl implements HbaseAccountInfoMapper {

 @Autowired
    private HbaseTemplate hbaseTemplate;

    public UserInfo findUserInfoByEntity(String table, String family, String rowKey, UserInfo userInfo) {

        return (UserInfo) hbaseTemplate.get(table, rowKey, family,
                (result, rowNum) -> new HbaseFindBuilder<>(family, result, userInfo.getClass()).build("userName","age","id").fetch());
    }
}

服务端插入数据

  • 使用hbase shell启用shell操作
  • 使用put插入数据

  • 创建表(‘user’)以及family(‘info’) : create ‘user’,’info’
  • 插入列数据: put ‘user’,’1’,’info:user_name’,’cwenao’
  • ‘1’: rowkey; ‘info:user_name’:表示创建family中的col user_name; ‘cwenao’: user_name 的值

这里写图片描述

创建controller

HbaseAccountController.java

/**
 * @author cwenao
 * @version $Id HbaseAccountController.java, v 0.1 2017-02-21 22:20 cwenao Exp $$
 */
@Controller
public class HbaseAccountController {

    private final static String TABLE_NAME = "user";

    private final static String FAMILY_INFO = "info";

    @Autowired
    private HbaseAccountInfoService hbaseAccountInfoServiceImpl;
    @RequestMapping(value = "/bigdata/find")
    public String findUserInfoByName(String name, ModelMap modelMap) {
        UserInfo userInfo = hbaseAccountInfoServiceImpl.findUserInfoByEntity(TABLE_NAME, FAMILY_INFO,
                "1", new UserInfo());

        modelMap.addAttribute("userInfo", userInfo);

        return "hbase/hbasetest";
    }
}

在hbase文件夹下创建hbasetest.html
hbasetest.html

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.w3.org/1999/xhtml">
<head>
    <meta charset="UTF-8">
    <title>Hbase Test</title>
</head>
<body>
<table>
    <tr><td th:text="UserInfo"></td></tr>
    <tr >
        <td th:text="${userInfo.id}">aabbcc</td>
    </tr>
    <tr>
       <td  th:text="${userInfo.userName}">123dds</td>
    </tr>
    <tr>
        <td th:text="${userInfo.age}">123dds</td>
    </tr>
</table>
</body>
</html>

配置apigateway

bigdataserver:
      path: /bigdataserver/**
      serviceId: BIGDATASERVER

测试

访问 http://localhost:10002/bigdataserver/bigdata/find?name=%22aaa%22

这里写图片描述

错误以及可能

  • 需要排除 servlet-api,不然启动都是问题
  • guava版本冲突,guava版本冲突主要是因为12.x与18.0 API不兼容
  • zk的hostname绑定: 主要是因为下发的是hostname
  • 奇葩问题主要是这几个引起的
  • 实在不行关闭一些日志再查找问题
  • winutils.exe问题请下载hadoop-xxx.tar.gz并解压,HADOOP_HOME以及path

如关闭确定不会引起错误的日志

  <loggers>
        <root level="DEBUG">
            <appenderref ref="CONSOLE" />
        </root>
        <logger name="com.netflix.discovery" level="ERROR"/>
        <logger name="org.apache.http" level="ERROR"/>
        <logger name="org.mongodb.driver.cluster" level="ERROR"/>
    </loggers>

代码

代码请移步 Github参考地址

如有疑问请加公众号(K171),如果觉得对您有帮助请 github start
公众号_k171

发表评论