Hbase 读写操作的部分实践总结

2018-12-07 17:49 
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/weinierzui/article/details/79447632

Hbase 读写操作的部分实践总结

主要包含个人开发过程中遇到的操作hbase数据,读写操作的一些代码样例,已经全部测试通过的。

从hbase获取数据的功能代码,包含有filter的,根据rowkey的范围检索,以及全表扫描三部分的

structtype构造例子 
val strctTupe =  new StructType(Array(
      StructField("user_id",StringType, false),
      StructField("record_date", StringType, false),
      StructField("MULTIPLE_PROFESSION_SCORE", StringType, false)
    ))


import java.util

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HTable, Result, Scan}
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

import scala.collection.mutable.ArrayBuffer

/**
  * Created by ** on 2017/10/31.
  * 根据需要,传入需要展示的字段信息,以及过滤条件,直接注册生成有结果集生成的临时表
  * 返回的表名是:tbl_nm_tmp
  */
object GetHbaseTable {

  /**
    * 根据rowkey进行数据筛选
    * 测试:使用scan的setStartRow设置rowkey的起始位置,
    *        可以指定rowkey的前缀进行匹配查询,
    *        不可以指定rowkey中间的一部分进行数据筛选
    * @param sparkSesson
    * @param tbl_nm     表名
    * @param show_info 展示的列名
    * @param tuple      rowkey对日期筛选的条件,startTime,endTime   区间是[startTime,endTime)
    *                   注册成临时表:tabl_nm_tmp
    */
  def FilterRowkeyGetHtable(sparkSesson:SparkSession,tbl_nm:String,show_info:Array[(String,String)],tuple:Tuple2[String,String]): Unit ={
        val sparkContext = sparkSesson.sparkContext
    val sqlContext = sparkSesson.sqlContext

    val hbaseconf = HBaseConfiguration.create()
    hbaseconf.set(TableInputFormat.INPUT_TABLE,tbl_nm)
    val table = new HTable(hbaseconf,tbl_nm)
    val scan = new Scan()
    scan.setStartRow(Bytes.toBytes(tuple._1))
    scan.setStopRow(Bytes.toBytes(tuple._2))

    for(i <- show_info){
      scan.addColumn(Bytes.toBytes(i._1),Bytes.toBytes(i._2.toLowerCase))
    }
    val ColumnValueScanner = table.getScanner(scan)
    /**
      * -------------------------重点-----------------------------
      * 将扫描得到的结果值转换成可以匹配注册表用的RDD[ROW]
      * 使用
      *
      * 此处对ColumnValueScanner的处理不完善,需要后续修改,查找有效的方法,
      * 可以将structtype中的类型细化为具体类型,并在此处使用模式匹配处理类型,但是需要保证对空指针,异常数据等的预处理【这是个坑】。
 */
    val listRow = new util.ArrayList[Row]()
    var flag = true
    while(flag){
      try{
        //每个result都是一行数据
        val r:Result = ColumnValueScanner.next()
        val arr = ArrayBuffer[String]()
        //正常此处需要根据structType中元素类型,以及表中元素的类型指定类型,但是此处由于部分未知原因,转换为其他类型会报错,故统一转换成String
       // 上面这种情况,一部分是因为空指针的问题,以及脏数据的原因导致的。
 //获取rowkey
        arr+=Bytes.toString(r.getRow)
        for(col<-show_info){
          arr+=Bytes.toString(r.getValue(col._1.getBytes(),col._2.getBytes()))
        }
        val row = Row.fromSeq(arr.toSeq)
        listRow.add(row)
      }catch{
        case e:Exception=> flag = false
      }
    }

    //构建RDD
    //val seqRDD = sc.makeRDD(arr)
    val schema = StructType({
      val list = new util.ArrayList[StructField]()
      list.add(StructField("rowkey",StringType,true))
      for(col<-show_info){
        list.add( StructField(col._2,StringType,true))
      }
      list
    }
    )
    //创建dataframe
    val df = sqlContext.createDataFrame(listRow,schema)
    val tbl = tbl_nm.split(":")(1)
    df.createTempView(tbl+"_tmp")

   // println("----------------------------------"+sqlContext.sql(s"select * from $tbl"+"_tmp").count())
    ColumnValueScanner.close()

  }
  /**
    * 有filter参与的局部数据获取
    * @param sparkSesson
    * @param tbl_nm      表名
    * @param show_info  需要展示的列(列族,列名)
    * @param filter_info   过滤条件数组  (列族,列名,过滤值,过滤条件)
    *                      注册成临时表:tabl_nm_tmp
    */
  def FilterGetRegHtable(sparkSesson:SparkSession,tbl_nm:String,show_info:Array[(String,String)],filter_info:Array[(String,String,String,String)])={

    //构建sparkcontext和sqlContext
    val sc = sparkSesson.sparkContext
    val sqlContext = sparkSesson.sqlContext

    //构建hbaseConf
    val hbaseConf = HBaseConfiguration.create()
    //定义表Hbase表的名字
    val tableName = tbl_nm
    //设置需要在hbase中查询的表名
    hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
    //构建表
    val table = new HTable(hbaseConf,tableName)
    val scan = new Scan()
    //指定列族和需要显示的列名
    //添加多个需要用到的列
    val length = show_info.length
    for(i <- show_info){
      scan.addColumn(Bytes.toBytes(i._1),Bytes.toBytes(i._2))
    }
    /* scan.addColumn(Bytes.toBytes("basicinfo"),Bytes.toBytes("WAYBILL_NO"))
     scan.addColumn(Bytes.toBytes("basicinfo"),Bytes.toBytes("PENDING_TYPE"))*/
    //设置rowkey的范围,启示和结束
    //scan.setStartRow(Bytes.toBytes(""))
    //scan.setStopRow(Bytes.toBytes(""))
    val fil_len = filter_info.length
    val filter_arr = new util.ArrayList[Filter](fil_len)

    if(fil_len>0){
      for(i <- filter_info){
        i._4 match {
          case "=" =>{
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),
              Bytes.toBytes(i._2),CompareFilter.CompareOp.EQUAL,new BinaryComparator(Bytes.toBytes(i._3)))
            filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case "<" =>{
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),
              Bytes.toBytes(i._2),CompareFilter.CompareOp.LESS,new BinaryComparator(Bytes. toBytes( i._3)))
            filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case "<=" =>{
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),
              Bytes.toBytes(i._2),CompareFilter.CompareOp.LESS_OR_EQUAL,new BinaryComparator(Bytes. toBytes( i._3)))
            filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case ">" =>{
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),
              Bytes.toBytes(i._2),CompareFilter.CompareOp.GREATER,new BinaryComparator(Bytes. toBytes( i._3)))
            filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case ">=" =>{
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),
              Bytes.toBytes(i._2),CompareFilter.CompareOp.GREATER_OR_EQUAL,new BinaryComparator(Bytes. toBytes( i._3)))
            //filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case "!=" =>{
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),
              Bytes.toBytes(i._2),CompareFilter.CompareOp.NOT_EQUAL,new BinaryComparator(Bytes. toBytes( i._3)))
            filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case _=>{}
        }
      }
    }
    /**
      * 通过使用filterlist可以加载多个过滤器
      * 设置多个过滤器
      */
    val filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL,filter_arr)

    scan.setFilter(filterList)
    //获取表的扫描
    val ColumnValueScanner = table.getScanner(scan)

    /**
      * -------------------------重点-----------------------------
      * 将扫描得到的结果值转换成可以匹配注册表用的RDD[ROW]
      * 使用
      *
      * 此处对ColumnValueScanner的处理不完善,需要后续修改,查找有效的方法。【同上】
      */
    val listRow = new util.ArrayList[Row]()
    var flag = true
    while(flag){
      try{
        //每个result都是一行数据
        val r:Result = ColumnValueScanner.next()
        val arr = ArrayBuffer[String]()
        //正常此处需要根据structType中元素类型,以及表中元素的类型指定类型,但是此处由于部分未知原因,转换为其他类型会报错,故统一转换成String
        arr+=Bytes.toString(r.getRow)
        for(col<-show_info){
          arr+=Bytes.toString(r.getValue(col._1.getBytes(),col._2.getBytes()))
        }
        val row = Row.fromSeq(arr.toSeq)
        listRow.add(row)
      }catch{
        case e:Exception=> flag = false
      }
    }

    //构建RDD
    //val seqRDD = sc.makeRDD(arr)
    val schema = StructType({
      val list = new util.ArrayList[StructField]()
      list.add(StructField("rowkey",StringType,true))
      for(col<-show_info){
        list.add( StructField(col._2,StringType,true))
      }
      list
    }
    )
    //创建dataframe
    val df = sqlContext.createDataFrame(listRow,schema)
    df.createTempView(tbl_nm+"_tmp")
    ColumnValueScanner.close()
  }

  /**
    * 全量扫描的表数据获取
    * 注意:此处所有的列全部处理成string类型,如果后续需要改为具体的类型,可以传入一个构造好的structType,替换
    * @param sparkSession
    * @param tabMN   表名
    * @param cf      列族
    * @param cols   列名
    *               注册成临时表:tabl_nm_tmp
    */
  def NoFilterGetRegHtable(sparkSession:SparkSession,tabMN:String,cf:String,cols:Array[String]): Unit ={

    val sc = sparkSession.sparkContext
    val sqlContext = sparkSession.sqlContext
    //hbase配置
    val hbaseconf = HBaseConfiguration.create()
    //设置表名
    //hbaseconf.set(TableInputFormat.INPUT_TABLE,"db_oss_ywgl:ywhbase_sys_user_info")
    hbaseconf.set(TableInputFormat.INPUT_TABLE,tabMN)
    //获取返回数据集
    val userRDD = sc.newAPIHadoopRDD(hbaseconf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    userRDD.cache()

    val schema = StructType({
      val list = new util.ArrayList[StructField]()
      list.add(StructField("rowkey",StringType,true))
      for(col<-cols){
        list.add( StructField(col.toLowerCase,StringType,true))
      }
      list
    }
    )

    val rowRDD = userRDD.map(r=>{
      val arr =ArrayBuffer[String]()
      arr+=Bytes.toString(r._2.getRow)
      for(col<-cols){
        arr+=Bytes.toString(r._2.getValue("cf".getBytes(),col.getBytes()))
      }
      Row.fromSeq(arr.toSeq)
    }
    )

    //构建表名
    val tmp_tblNM = tabMN.split(":")(1)
    //println(tmp_tblNM+"-----------------")
    val df = sqlContext.createDataFrame(rowRDD,schema)
    df.createTempView(tmp_tblNM+"_tmp")
    //sqlContext.sql("select * from ywhbase_sys_user_info_tmp").show(10)
  }

}

object test{
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[2]").appName("RowkeyScanTest").getOrCreate()

    val show_cols = Array(
      ("cf","MAJOR_NM"),
      ("cf","difficulty_level"),
      ("cf","create_time"),
      ("cf","deal_result")
    )
    //GetHbaseTable.FilterRowkeyGetHtable(sparkSession,"db_oss_ywgl:ywhbase_adjust_res_bill",show_cols)

  }
}

写入hbase的两种方式   

saveAsNewAPIHadoopDataset和控制每次写入的量

package com.gh.HbaseWideTable.Tool
import java.util

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{HTable, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
/**
  * Created by admin2 on 2017/10/31.
  * 根据传入的数据集,将数据写入到hbase中        saveAsNewAPIHadoopDataset
 */
object WriteHbaseTable {
  /**
    * 将操作完成的结果写入hbase表中
    * @param df     存放结果的dataframe
    * @param sc     sparkcontext
    * @param tbl_NM  表名
    * @param cf       列族
    * @param cols    列名组成的数组
    */
  def runWriteHbaseTable(df:DataFrame,sc:SparkContext,tbl_NM:String,cf:String,cols:Array[String]): Unit ={
  //配置输出的表名
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tbl_NM)
    //配置job
    val job = new Job(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    var i=1
    val rdd = df.rdd.map(res=>{
      val put = new Put(if(res.isNullAt(0)) null else Bytes.toBytes(res.getString(0)))   //rowkey
       i=1
      val size = res.size
      while(i<size){
        put.add(Bytes.toBytes(cf),Bytes.toBytes(cols(i - 1).toLowerCase),if(res.isNullAt(i)) null else Bytes.toBytes(res.getString(i)))
        i+=1
      }
      (new ImmutableBytesWritable, put)
    })

    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
  }
}

/**
  * Created by Administrator on 2017/11/24.
  * 控制每次写入hbase 的数据量  
  */
object WriteHbase {
  def writeToHbase(data:RDD[Put],name: String) = {
    val tableName = name

    val myConf = HBaseConfiguration.create()
    // myConf.set("hbase.zookeeper.quorum", "ddp-nnlte-001,ddp-nnlte-002")
    myConf.set("hbase.zookeeper.property.clientPort", "2181")
    myConf.set("hbase.defaults.for.version.skip", "true")
    val myTable = new HTable(myConf, TableName.valueOf(tableName))
    myTable.setAutoFlush(false, false) //关键点1   关闭自动提交
    myTable.setWriteBufferSize(6 * 1024 * 1024) //关键点2   设置写缓存区的大小
    data.foreachPartition {
      p => {
        var i:Long = 0
        val plist = new util.ArrayList[Put]()
        p.foreach { put => {
          put.setWriteToWAL(false)
          i = i+1
          plist.add(put)

          if(i%1000==0){
            myTable.put(plist)
            plist.clear()
          }
          if(i%100000==0)
            myTable.flushCommits()
          if(i%100000==0)
            println(s"\n\n\n改分区写入$i\n\n\n")
        }
        }
        myTable.put(plist)
        myTable.flushCommits() //关键点3

        println(name+"------->>>> total_num   "+i)
      }
    }
  }
}

 



发表评论

您必须 登录 才能发表留言!