SPARK STREAMING之1:编程指南(翻译v1.4.1)

By | 2018年11月27日
版权声明:本文为博主原创文章,转载请注明来自http://blog.csdn.net/jediael_lu/ https://blog.csdn.net/jediael_lu/article/details/77043575

SPARK STREAMING之1:编程指南(翻译v1.4.1)

@(SPARK)[spark, 大数据]

概述

Spark Streaming是Spark核心API的一个扩展,它使得spark可扩展、高吞吐、可容错的对实时数据流进行处理。可以通过集成外部系统获取数据来源,如 Kafka, Flume, Twitter, ZeroMQ, Kinesis, 或者 TCP sockets,还可以使用高度抽象的复杂算法,如map、reduce、join、window等。最后,处理完的数据可以直接发送到文件系统、数据库或者一些仪表盘。事实上,你还可以spark的机器学习和图处理算法直接运用于数据流中。

Alt text

它的内部实现原理如下图: Spark Streaming收到实时输入数据,然后将它切分为多个批次,这些批次被spark引擎处理后同样以批次的形式输出最终的数据流。(注:也就是说Spark Streaming不是真正的实时系统,而是一个准实时系统,它会等待收集到一定量的数据后汇总成一个批次,然后再统一处理)

Alt text

Spark Streaming有一个高度抽象概念叫做discretized stream or DStream(分离式流),它表示一个持续的数据流。DStream可以通过集成外部系统来获取(如kafka,flume等),或者通过对其它DStream操作后生成新的Dstream。在spark内部,DStream以一系列的RDD表示。

本文介绍了如何编写Spark Streaming的程序。你可以使用scala、java、python(1.2版本后)编写程序,这些内容都将在本文中介绍。在本文中,你可以通过点击标签选择查看不同语言所实现的代码。

注意:Spark Streaming的python API在1.2版本后引入的,它包括了scala/java所有的DStream transformation操作,以及大部分的输出操作。但是,只它支持基本的数据源(如文本文件,socket的文本数据等),集成kafka,flume的API将在以后的版本中实现。

快速入门例子

在详细介绍如何编写Spark Streaming程序前,我们先看一个简单的例子,在这个例子中,我们对从TCP scoket中收到的文本数据进行单词统计。步骤如下:

首先,我们导入Spark Streaming的相关类,以及StreamingContext(1.3版本后不再需要)。StreamingContext是所有streaming功能的进入点。在这个例子中,我们创建了一个local StreamingContext,它有2个执行线程,每1秒形成一个批次。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

使用这个context,我们创建一个从TCP socket获取数据的Dstream,它指定了主机名(localhost)以及端口号(9999)。

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

变量lines表示从数据服务器收到的数据流,DStream中的每一个数据记录表示一行。然后,我们将这些行以空格切分成单词。(注:事实上,lines代表的是每秒钟收到的数据,而在map操作中以行作为单位进行输入)

// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMap是一个一对多的DStream操作,它从源DStream中读取一个记录,然后生成多个记录并发送到新的DStream中。在这个例子中,每一行内容都将切分为多个单词,这个单词流以words这个DStream表示。接下来,我们统计这些单词。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

words这个DStream经过map操作后转换为一个(word,1)对的新DStream,然后统计每个单词的出现频率。最后,wordCouns.print()会每秒钟输出一些统计结果。

记住到这里为止,Spark Streaming只是定义了它会进行什么处理,但这些处理还没启动。你可以通过以下代码启动真正的处理进程:

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

完整代码请见 Spark Streaming example NetworkWordCount.

如果你已经下载了spark,你可以按照下面的步骤运行。首先你需要运行netcat(一个在大多数类unix系统中都能找到的小工具)作为一个数据服务器:

$ nc -lk 9999

然后,在另一个终端中,你可以这样启动上面的例子:

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

此时,你在nc终端中输入的每一行文本都将被统计并输出结果。如下:

# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
hello world


# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

基本概念

下面,我们开始详细描述Spark Streaming的基本概念。

Linking

与spark 类似,spark streaming可以通过maven的中央仓库获取。开发Spark Streaming程序前,你需要将以下依赖关系放到你的maven或者sbt项目中。

 <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.10</artifactId>
      <version>1.4.1</version>
 </dependency>    

如果需要集成kafka, flume,kinesis等未在Spark Streaming 核心API中包含的数据源,你需要在依赖中添加类似 spark-streaming-xyz_2.10的依赖。一引起常用的如下:

Source  Artifact
Kafka   spark-streaming-kafka_2.10
Flume   spark-streaming-flume_2.10
Kinesis
spark-streaming-kinesis-asl_2.10 [Amazon Software License]
Twitter spark-streaming-twitter_2.10
ZeroMQ  spark-streaming-zeromq_2.10
MQTT    spark-streaming-mqtt_2.10

请参考maven的中央仓库以获得最新最全的依赖关系。

发表评论