16、Spark Sql(二),Spark on Hive

By | 2019年2月13日

一、Spark Sql整合Hive

6、读取hive中的数据创建一个DataFrame(spark-on-hive)
6.1、在master上启动hive的metastore服务
  hive –service metastore
6.2、在spark集群的客户端节点spark/conf上新建一个hive-site.xml文件

<configuration>
<property>
    <name>hive.metastore.uris</name>
    <value>thrift://master:9083</value>
</property>
</configuration>

6.3、开启spark-shell,验证是否连接上hive的metastore服务
(1)启动spark-shell:./spark-shell –master spark://master:7077
(2)import org.apache.spark.sql.hive.HiveContext
(3)sc.setLogLevel(“INFO”)
(4)用HiveContext操作hive中的数据,尽量不要使用SqlContext对象:val hc = new HiveContext(sc)
日志中可以看到:INFO metastore: Trying to connect to metastore with URI thrift://master:9083 INFO metastore: Connected to metastore.
(5)hc.sql(“show databases”).show()
(6)hc.sql(“use default”).show()
(7)hc.sql(“show tables”).show()

public class HiveDataSource {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("HiveDataSource")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //是SQLContext的子类。
        SQLContext hiveContext = new HiveContext(sc);
        //删除hive中的student_infos表
        hiveContext.sql("DROP TABLE IF EXISTS student_infos");
        //在hive中创建student_infos表
        hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING, age INT) row format delimited fields terminated by '\t'");
        hiveContext.sql("LOAD DATA "
                + "LOCAL INPATH '/root/resource/student_infos' "
                + "INTO TABLE student_infos");  
        hiveContext.sql("DROP TABLE IF EXISTS student_scores"); 
        hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'");  
        hiveContext.sql("LOAD DATA "
                + "LOCAL INPATH '/root/resource/student_scores'"
                + "INTO TABLE student_scores");
        DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score "
                + "FROM student_infos si "
                + "JOIN student_scores ss ON si.name=ss.name "
                + "WHERE ss.score>=80");
        hiveContext.sql("USE result");  
        hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");  
        /**
         * 将goodStudentsDF的计算结果保存到Hive的good_student_infos表中,这张表可以存在也可以不存在,如果不存在那么会创建一个新表,然后将goodStudentsDF的计算结果保存到这张表中
         */
        goodStudentsDF.write().saveAsTable("good_student_infos");
        /**
         * 将Hive中的good_student_infos表与df对象形成映射
         */
        DataFrame df = hiveContext.table("good_student_infos");
        df.show();
        sc.close();
    }
}

二、spark sql例子:统计日销售量

public class DailySale {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("DailySale").setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(jsc);

        List<String> logList = Arrays.asList(
                "2016-9-01,55,1122"
                ,"2016-9-01,55,1133"
                ,"2016-9-01,15,"
                ,"2016-9-02,56,1144"
                ,"2016-9-02,78,1155"
                ,"2016-9-03,113,1123");
        JavaRDD<String> logRDD = jsc.parallelize(logList);
        // 过滤脏数据
        JavaRDD<String> logRDD2 = logRDD.filter(new Function<String, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(String log) throws Exception {
                return log.split(",").length == 3 ? true : false;
            }
        });
        // 把JavaRDD<String>转成JavaRDD<Row>
        JavaRDD<Row> logRowRDD = logRDD2.map(new Function<String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Row call(String log) throws Exception {
                String[] split = log.split(",");
                return RowFactory.create(split[0], Integer.valueOf(split[1]), split[2]);
            }
        });
        // 动态创建schema方式,注册临时表
        List<StructField> structFieldList = new ArrayList<StructField>();
        structFieldList.add(DataTypes.createStructField("date", DataTypes.StringType, true));
        structFieldList.add(DataTypes.createStructField("amount", DataTypes.IntegerType, true));
        structFieldList.add(DataTypes.createStructField("id", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(structFieldList);
        DataFrame logDF = sqlContext.createDataFrame(logRowRDD, schema);
        logDF.registerTempTable("sale");

        // 写sql语句查询
        sqlContext.sql("select date, sum(amount) from sale group by date").show();
        // 调用查询方式查询
        logDF.groupBy(logDF.col("date")).sum("amount").show();
        jsc.stop();
    }
}

三、开窗函数

1、语法:row_number() over(partition by … order by …)
2、to_number()开窗函数的作用:按照每一个分组的数据的顺序,打上一个分组内的行号,实现分组取topN
  如:id=2016[111, 112, 113]使用to_number()之后结果为:
  id=2016[111 1, 112 2, 113 3]
3、注意:如果Sql语句中使用了开窗函数,那么这个Sql语句必须使用HiveContext,HiveContext默认情况下在本地无法创建

public class WindowFunction {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("WindowFunction");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext hiveContext = new HiveContext(jsc);
        jsc.setLogLevel("INFO");

        hiveContext.sql("drop table if exists sales");
        hiveContext.sql("create table if not exists sales (" +
                "product string, " +
                "category string, " +
                "revenue bigint) row format delimited fields terminated by '\t'");
        hiveContext.sql("load data local inpath '/usr/local/profiles/sales.txt' " +
                "into table sales");
        DataFrame resultDF = hiveContext.sql("select * from (" +
                "select product, category, revenue, row_number() " +
                "over(partition by category order by revenue desc) rank from sales" +
                ") t where rank <= 3");
        // 将每组排名前3的数据,保存到一个表中
        hiveContext.sql("use default");
        hiveContext.sql("drop table if exists top3sales");
        resultDF.write().saveAsTable("top3sales");
        jsc.close();
    }
}

4、注意:spark on hive步骤:
(1)启动 hdfs,yarn,hive的metastore服务,顺序不能变
(2)启动 spark-submit 脚本,提交代码

发表评论