博客地址: http://blog.csdn.net/yueqian_zhu/


    先贴一下上一节的例子

      object NetworkWordCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println("Usage: NetworkWordCount <hostname> <port>")
          System.exit(1)
        }
    
        StreamingExamples.setStreamingLogLevels()
    
        // Create the context with a 1 second batch size
        val sparkConf = new SparkConf().setAppName("NetworkWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(1))
    
        // Create a socket stream on target ip:port and count the
        // words in input stream of 
     delimited text (eg. generated by 'nc')
        // Note that no duplication in storage level only for running locally.
        // Replication necessary in distributed scenario for fault tolerance.
        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
     
    这一节学习一下Dstream上的operation部分

    1、调用socketTextStream方法,返回一个ReceiverInputDStream类型。 它继承与 InputDStream, InputDStream又继承于 DStream

    (1) 设置本身的 InputDStream到DStreamGraph中

    (2)获取streamId

    2、调用flatMap方法,返回一个flatMappedDStream。

    看一下FlatMappedDStream的成员

      private[streaming]
    class FlatMappedDStream[T: ClassTag, U: ClassTag](
        parent: DStream[T],
        flatMapFunc: T => Traversable[U]
      ) extends DStream[U](parent.ssc) {
    
      override def dependencies: List[DStream[_]] = List(parent)
    
      override def slideDuration: Duration = parent.slideDuration
    
      override def compute(validTime: Time): Option[RDD[U]] = {
        parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
      }
    }
     
    RDD的操作非常类似

    dependencies:即调用flatmap操作的DStream,这里指ReceiverInputDStream

    slideDuration:Dstream产生RDD的时间间隔,即批处理间隔

    compute:根据参数得到一个RDD,继而在这个RDD上调用flatmap操作。flatmap操作的方法参数实际上附加到了RDD的身上。

     
     

    2、调用m ap方法,其实是将map方法附加给了RDD。之后的reduceByKey同理。

    3、调用print方法,它是一个输出操作。默认输出RDD的前10个元素。调用print方法得到一个ForEachDStream,并将这个ForEachDStream注册到DStreamGraph中。


    至此,operation部分就结束了。此时,还没有真正执行起来,这需要调用StreamingContext的start方法才行。