spark例子整理

By | 2018年12月20日

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u011278012/article/details/73379423

Spark Streaming是一个准实时流处理框架,处理响应时间一般以分钟为单位,也就是说处理实时数据的延迟时间是秒级别的;Storm是一个实时流处理框架,处理响应是毫秒级的。所以在流框架选型方面要看具体业务场景。需要澄清的是现在很多人认为Spark Streaming流处理运行不稳定、数据丢失、事务性支持不好等等,那是因为很多人不会驾驭Spark Streaming及Spark本身。在Spark Streaming流处理的延迟时间方面,Spark定制版本,会将Spark Streaming的延迟从秒级别推进到100毫秒之内甚至更少。
SparkStreaming优点:
1、提供了丰富的API,企业中能快速实现各种复杂的业务逻辑。
2、流入Spark Streaming的数据流通过和机器学习算法结合,完成机器模拟和图计算。
3、Spark Streaming基于Spark优秀的血统。
SparkStreaming能不能像Storm一样,一条一条处理数据?
Storm处理数据的方式是以条为单位来一条一条处理的,而Spark Streaming基于单位时间处理数据的,SparkStreaming能不能像Storm一样呢?答案是:可以的。


下面是一个从kafka读取数据,然后利用foreachRdd遍历Rdd,在使用sparksql转换成表进行分析的demo
package com.sprakStream.demo
import java.util.regex.Matcher
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import com.sprakStream.bean.IpMapper
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.util.Properties
import org.apache.spark.sql.SparkSession
import com.sprakStream.util.CommUtil
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.DriverManager
import java.util.Arrays.ArrayList
import java.util.ArrayList
import java.util.Arrays.ArrayList
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType
import com.sprakStream.util.AppConstant
import org.apache.spark.rdd.RDD
import kafka.utils.Time
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{ Time, Seconds, StreamingContext }
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.hadoop.record.Record
import java.sql.Time

object KafkaExcamle3 {

  def main(args: Array[String]): Unit = {

    //val conf = new SparkConf()
    //val sc = new SparkContext()
    //    System.setProperty("spark.sql.warehouse.dir", "D:\\tools\\spark-2.0.0-bin-hadoop2.6");
    //    System.setProperty("hadoop.home.dir", "D:\\tools\\hadoop-2.6.0");
    println("success to Init...")
    val url = "jdbc:postgresql://172.16.12.190:5432/dataex_tmp"
    val prop = new Properties()
    prop.put("user", "postgres")
    prop.put("password", "issing")

    val conf = new SparkConf().setAppName("wordcount").setMaster("local")
    val ssc = new StreamingContext(conf, Seconds(1))
    val sparkSession = SparkSession.builder().config(conf).getOrCreate()
    val util = Utilities;
    util.setupLogging()
    // Construct a regular expression (regex) to extract fields from raw Apache log lines  
    val pattern = util.apacheLogPattern()
    // hostname:port for Kafka brokers, not Zookeeper  
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> AppConstant.KAFKA_HOST,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "example",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean))
    // List of topics you want to listen for from Kafka  
    val topics = List(AppConstant.KAFKA_TOPIC).toSet
    val lines = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)).map(_.value());

    val spiltWorks = lines.map(x => { val matcher: Matcher = pattern.matcher(x); if (matcher.matches()) matcher.group(0) })
    val spiltDesc = spiltWorks.map { x => x.toString() }.window(Seconds(30), Seconds(2))

    //调用foreachRDD方法,遍历DStream中的RDD
    spiltDesc.foreachRDD({
      rdd =>
        // Get the singleton instance of SQLContext
        println()
        println("=================================================开始你的表演111111111=================================================")
        println()
        val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
        import sqlContext.implicits._
        val wordsDataFrame = rdd.map(x => x.toString().split(" ")).map(x =>
          IpMapper(CommUtil.uuid(), x(0).toString(), x(1).toString(),
            x(2).toString(), x(3).toString(), x(4).toString(), x(5).toString(),
            x(6).toString(), x(7).toString(), x(8).toString())).toDF()
        wordsDataFrame.registerTempTable("wordsDataFrame")
        val wordCountsDataFrame =
          sqlContext.sql("select * from wordsDataFrame")
        wordCountsDataFrame.show()
    })

    //调用foreachRDD方法,遍历DStream中的RDD
    spiltWorks.foreachRDD({
      rdd =>
        // Get the singleton instance of SQLContext
        println()
        println("=================================================开始你的表演22222222222=================================================")
        println()
        val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
        import sqlContext.implicits._
        val wordsDataFrame = rdd.map(x => x.toString().split(" ")).map(x =>
          IpMapper(CommUtil.uuid(), x(0).toString(), x(1).toString(),
            x(2).toString(), x(3).toString(), x(4).toString(), x(5).toString(),
            x(6).toString(), x(7).toString(), x(8).toString())).toDF()
        wordsDataFrame.registerTempTable("wordsDataFrame")
        val wordCountsDataFrame =
          sqlContext.sql("select * from wordsDataFrame")
        wordCountsDataFrame.show()
    })

    // Kick it off  
    ssc.checkpoint("/user/root/spark/checkpoint")
    ssc.start()
    ssc.awaitTermination()
    println("KafkaExample-结束.................................")
  }

}
object SQLContextSingleton {

  @transient private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

发表评论