作者:周志湖
微信号:zhouzhihubeyond

本节主要内容

  1. Window Operation
  2. 入门案例

1. Window Operation

Spark Streaming提供窗口操作(Window Operation),如下图所示:
这里写图片描述
上图中,红色实线表示窗口当前的滑动位置,虚线表示前一次窗口位置,窗口每滑动一次,落在该窗口中的RDD被一起同时处理,生成一个窗口DStream(windowed DStream),窗口操作需要设置两个参数:
(1)窗口长度(window length),即窗口的持续时间,上图中的窗口长度为3
(2)滑动间隔(sliding interval),窗口操作执行的时间间隔,上图中的滑动间隔为2
这两个参数必须是原始DStream 批处理间隔(batch interval)的整数倍(上图中的原始DStream的batch interval为1)

2. 入门案例

  1. WindowWordCount——reduceByKeyAndWindow方法使用
   
    import org
    
     .apache
    
    
     .spark
    
    .{SparkContext, SparkConf}
import org
    
     .apache
    
    
     .spark
    
    
     .storage
    
    
     .StorageLevel
    
    import org
    
     .apache
    
    
     .spark
    
    
     .streaming
    
    ._
import org
    
     .apache
    
    
     .spark
    
    
     .streaming
    
    
     .StreamingContext
    
    ._

object WindowWordCount {
  def main(args: Array[String]) {
    //传入的参数为localhost
    
     9999
    
    
     30
    
    
     10
    
    if (args
    
     .length
    
    !=
    
     4
    
    ) {
      System
    
     .err
    
    
     .println
    
    (
    
     "Usage: WindowWorldCount <hostname> <port> <windowDuration> <slideDuration>"
    
    )
      System
    
     .exit
    
    (
    
     1
    
    )
    }
    StreamingExamples
    
     .setStreamingLogLevels
    
    ()

    val conf = new SparkConf()
    
     .setAppName
    
    (
    
     "WindowWordCount"
    
    )
    
     .setMaster
    
    (
    
     "local[4]"
    
    )
    val sc = new SparkContext(conf)

    // 创建StreamingContext,batch interval为
    
     5
    
    秒
    val ssc = new StreamingContext(sc, Seconds(
    
     5
    
    ))


    //Socket为数据源
    val lines = ssc
    
     .socketTextStream
    
    (args(
    
     0
    
    ), args(
    
     1
    
    )
    
     .toInt
    
    , StorageLevel
    
     .MEMORY
    
    _ONLY_SER)

    val words = lines
    
     .flatMap
    
    (_
    
     .split
    
    (
    
     " "
    
    ))

    // windows操作,对窗口中的单词进行计数
    val wordCounts = words
    
     .map
    
    (
    
     x
    
    => (
    
     x
    
    ,
    
     1
    
    ))
    
     .reduceByKeyAndWindow
    
    ((a:Int,b:Int) => (a + b), Seconds(args(
    
     2
    
    )
    
     .toInt
    
    ), Seconds(args(
    
     3
    
    )
    
     .toInt
    
    ))

    wordCounts
    
     .print
    
    ()
    ssc
    
     .start
    
    ()
    ssc
    
     .awaitTermination
    
    ()
  }
}
   
  

通过下列代码启动netcat server

   
    root
    
     @sparkmaster
    
    
     :~
    
    
     # nc -lk 9999
    
   
  

再运行WindowWordCount
输入下列语句

   
    root
    
     @sparkmaster
    
    
     :~
    
    
     # nc -lk 9999
    
    
     Spark
    
    is a fast
    
     and
    
    general cluster computing system
    
     for
    
    
     Big
    
    
     Data
    
    .
    
     It
    
    provides
   
  

观察执行情况:

   
    
     -------------------------------------------
Time: 1448778805000 ms(10秒,第一个滑动窗口时间)
-------------------------------------------
    
    (provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
    
     ...
    
    
     -------------------------------------------
Time: 1448778815000 ms(10秒后,第二个滑动窗口时间)
-------------------------------------------
    
    (provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
    
     ...
    
    
     -------------------------------------------
Time: 1448778825000 ms(10秒后,第三个滑动窗口时间)
-------------------------------------------
    
    (provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
    
     ...
    
    
     -------------------------------------------
Time: 1448778835000 ms(再经10秒后,超出window length窗口长度,不在计数范围内)
-------------------------------------------
    
    
     -------------------------------------------
Time: 1448778845000 ms
-------------------------------------------
    
   
  

同样的语句输入两次

   
    root@sparkmaster:~
    
     # nc -lk 9999
    
    Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
   
  

观察执行结果如下:

   
    
     Time: 1448779205000 ms
-------------------------------------------
    
    (provides,2)
(is,2)
(general,2)
(Big,2)
(fast,2)
(cluster,2)
(Data.,2)
(computing,2)
(Spark,2)
(a,2)
    
     ...
    
   
  

再输入一次

   
    root@sparkmaster:~
    
     # nc -lk 9999
    
    Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
   
  

计算结果如下:

   
    
     -------------------------------------------
Time: 1448779215000 ms
-------------------------------------------
    
    (provides,3)
(is,3)
(general,3)
(Big,3)
(fast,3)
(cluster,3)
(Data.,3)
(computing,3)
(Spark,3)
(a,3)
    
     ...
    
   
  

再输入一次

   
    root@sparkmaster:~
    
     # nc -lk 9999
    
    Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
Spark is
    
     a
    
    fast
    
     and
    
    general cluster computing
    
     system
    
    
     for
    
    Big Data. It provides
   
  

计算结果如下:

   
    
     -------------------------------------------
Time: 1448779225000 ms
-------------------------------------------
    
    (provides,4)
(is,4)
(general,4)
(Big,4)
(fast,4)
(cluster,4)
(Data.,4)
(computing,4)
(Spark,4)
(a,4)
    
     ...
    
    
     -------------------------------------------
Time: 1448779235000 ms
-------------------------------------------
    
    (provides,2)
(is,2)
(general,2)
(Big,2)
(fast,2)
(cluster,2)
(Data.,2)
(computing,2)
(Spark,2)
(a,2)
    
     ...
    
    
     -------------------------------------------
Time: 1448779245000 ms
-------------------------------------------
    
    (provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
    
     ...
    
    
     -------------------------------------------
Time: 1448779255000 ms
-------------------------------------------
    
    
     -------------------------------------------
Time: 1448779265000 ms
-------------------------------------------
    
   
  

2 WindowWordCount——countByWindow方法使用

   
    import org
    
     .apache
    
    
     .spark
    
    .{SparkContext, SparkConf}
import org
    
     .apache
    
    
     .spark
    
    
     .storage
    
    
     .StorageLevel
    
    import org
    
     .apache
    
    
     .spark
    
    
     .streaming
    
    ._
import org
    
     .apache
    
    
     .spark
    
    
     .streaming
    
    
     .StreamingContext
    
    ._

object WindowWordCount {
  def main(args: Array[String]) {
    if (args
    
     .length
    
    !=
    
     4
    
    ) {
      System
    
     .err
    
    
     .println
    
    (
    
     "Usage: WindowWorldCount <hostname> <port> <windowDuration> <slideDuration>"
    
    )
      System
    
     .exit
    
    (
    
     1
    
    )
    }
    StreamingExamples
    
     .setStreamingLogLevels
    
    ()

    val conf = new SparkConf()
    
     .setAppName
    
    (
    
     "WindowWordCount"
    
    )
    
     .setMaster
    
    (
    
     "local[2]"
    
    )
    val sc = new SparkContext(conf)

    // 创建StreamingContext
    val ssc = new StreamingContext(sc, Seconds(
    
     5
    
    ))
    // 定义checkpoint目录为当前目录
    ssc
    
     .checkpoint
    
    (
    
     "."
    
    )


    val lines = ssc
    
     .socketTextStream
    
    (args(
    
     0
    
    ), args(
    
     1
    
    )
    
     .toInt
    
    , StorageLevel
    
     .MEMORY
    
    _ONLY_SER)
    val words = lines
    
     .flatMap
    
    (_
    
     .split
    
    (
    
     " "
    
    ))

    //countByWindowcountByWindow方法计算基于滑动窗口的DStream中的元素的数量。
    val countByWindow=words
    
     .countByWindow
    
    (Seconds(args(
    
     2
    
    )
    
     .toInt
    
    ), Seconds(args(
    
     3
    
    )
    
     .toInt
    
    ))

    countByWindow
    
     .print
    
    ()
    ssc
    
     .start
    
    ()
    ssc
    
     .awaitTermination
    
    ()
  }
}
   
  

启动

   
    root
    
     @sparkmaster
    
    
     :~
    
    
     # nc -lk 9999
    
   
  

然后运行WindowWordCount
输入

   
    root
    
     @sparkmaster
    
    
     :~
    
    
     # nc -lk 9999
    
    
     Spark
    
    is a fast
    
     and
    
    general cluster computing system
    
     for
    
    
     Big
    
    
     Data
    
   
  

察看运行结果:

   
    
     -------------------------------------------
Time: 1448780625000 ms
-------------------------------------------
    
    0
    
     -------------------------------------------
Time: 1448780635000 ms
-------------------------------------------
    
    12
    
     -------------------------------------------
Time: 1448780645000 ms
-------------------------------------------
    
    12
    
     -------------------------------------------
Time: 1448780655000 ms
-------------------------------------------
    
    12
    
     -------------------------------------------
Time: 1448780665000 ms
-------------------------------------------
    
    0
    
     -------------------------------------------
Time: 1448780675000 ms
-------------------------------------------
    
    0
   
  

3 WindowWordCount——reduceByWindow方法使用

   
    //reduceByWindow方法基于滑动窗口对源DStream中的元素进行聚合操作,返回包含单元素的一个新的DStream。
 val reduceByWindow=words
    
     .map
    
    (
    
     x
    
    =>
    
     1
    
    )
    
     .reduceByWindow
    
    (_+_,_-_Seconds(args(
    
     2
    
    )
    
     .toInt
    
    ), Seconds(args(
    
     3
    
    )
    
     .toInt
    
    ))
   
  

上面的例子其实是countByWindow的实现,可以在countByWindow源码实现中得到验证

   
    
     
      def
     
     
      countByWindow
     
     
      (
      windowDuration: Duration,
      slideDuration: Duration)
     
     :
    
    DStream[Long] = ssc.withScope {
    this.map(_ =>
    
     1L
    
    ).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
  }
   
  

而reduceByWindow又是通过reduceByKeyAndWindow方法来实现的,具体代码如下

   
    
     
      def
     
     
      reduceByWindow
     
     
      (
      reduceFunc:
      
       (T, T)
      
      => T,
      invReduceFunc:
      
       (T, T)
      
      => T,
      windowDuration: Duration,
      slideDuration: Duration
    )
     
     :
    
    DStream[T] = ssc.withScope {
      this.map(x => (
    
     1
    
    , x))
          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration,
    
     1
    
    )
          .map(_._2)
  }
   
  

与前面的例子中的reduceByKeyAndWindow方法不同的是这里的reduceByKeyAndWindow方法多了一个invReduceFunc参数,方法完整源码如下:

   
    /**
   * Return a new DStream
    
     by
    
    applying incremental `reduceByKey`
    
     over
    
    a sliding window.
   * The reduced value
    
     of
    
    
     over
    
    a new window
    
     is
    
    calculated using
    
     the
    
    old window's reduced value :
   *
    
     1.
    
    reduce
    
     the
    
    new values
    
     that
    
    entered
    
     the
    
    window (e.g., adding new counts)
   *
   *
    
     2.
    
    
     "inverse reduce"
    
    
     the
    
    old values
    
     that
    
    left
    
     the
    
    window (e.g., subtracting old counts)
   *
   * This
    
     is
    
    more efficient than reduceByKeyAndWindow
    
     without
    
    
     "inverse reduce"
    
    function.
   * However,
    
     it
    
    
     is
    
    applicable
    
     to
    
    only
    
     "invertible reduce functions"
    
    .
   * Hash partitioning
    
     is
    
    used
    
     to
    
    generate
    
     the
    
    RDDs
    
     with
    
    Spark's default
    
     number
    
    
     of
    
    partitions.
   * @param reduceFunc associative reduce function
   * @param invReduceFunc inverse reduce function
   * @param windowDuration width
    
     of
    
    
     the
    
    window; must be a multiple
    
     of
    
    this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval
    
     of
    
    
     the
    
    window (i.e.,
    
     the
    
    interval
    
     after
    
    which
   *
    
     the
    
    new DStream will generate RDDs); must be a multiple
    
     of
    
    this
   *                       DStream's batching interval
   * @param filterFunc     Optional function
    
     to
    
    filter expired key-value pairs;
   *                       only pairs
    
     that
    
    satisfy
    
     the
    
    function are retained
   */
  def reduceByKeyAndWindow(
      reduceFunc: (V, V) => V,
      invReduceFunc: (V, V) => V,
      windowDuration: Duration,
      slideDuration: Duration = self.slideDuration,
      numPartitions: Int = ssc.sc.defaultParallelism,
      filterFunc: ((K, V)) => Boolean = null
    ): DStream[(K, V)] = ssc.withScope {
    reduceByKeyAndWindow(
      reduceFunc, invReduceFunc, windowDuration,
      slideDuration, defaultPartitioner(numPartitions), filterFunc
    )
  }
   
  

具体来讲,下面两个方法得到的结果是一样的,只是效率不同,后面的方法方式效率更高:

   
    
     //以过去5秒钟为一个输入窗口,每1秒统计一下WordCount,本方法会将过去5秒钟的每一秒钟的WordCount都进行统计
    
    
     //然后进行叠加,得出这个窗口中的单词统计。 这种方式被称为叠加方式,如下图左边所示
    
    
     val
    
    wordCounts = words.map(x => (x,
    
     1
    
    )).reduceByKeyAndWindow(_ + _, Seconds(
    
     5
    
    s),seconds(
    
     1
    
    ))
   
  

   
    
     //计算t+4秒这个时刻过去5秒窗口的WordCount,可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量
    
    
     //再减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率。 这种方式被称为增量方式,如下图的右边所示
    
    
     val
    
    wordCounts = words.map(x => (x,
    
     1
    
    )).reduceByKeyAndWindow(_ + _, _ - _, Seconds(
    
     5
    
    s),seconds(
    
     1
    
    ))
   
  

这里写图片描述

DStream支持的全部Window操作方法如下:
这里写图片描述