注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

itoedr的it学苑

记录从IT文盲学到专家的历程

 
 
 

日志

 
 

Storm Spark Hadoop 三个流行并行计算框架理解  

2014-11-27 09:10:33|  分类: 开源云系统 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
理解spark

--spark的开发语言--

Scala是一门现代的多范式编程语言,志在以简练、优雅及类型安全的方式来表达常用编程模式。它平滑地集成了面向对象和函数语言的特性。

  • Scala是面向对象的:Scala是一个纯面向对象语言,在某种意义上来讲所有数值都是对象。对象的类型和行为是由class和trait来描述的。Class的抽象可由子类化和一种灵活的基于mixin的组合机制(它可作为多重继承的简单替代方案)来扩展。
  • Scala是函数式的: Scala还是一个函数式语言,在某种意义上来讲所有函数都是数值。Scala为定义匿名函数提供了一种轻量级的语法,它支持高阶(higher- order)函数、允许函数嵌套、支持局部套用(currying)。Scala的case类及其内置支持的模式匹配模型代数类型在许多函数式编程语言中 都被使用。
  • Scala是静态类型的:Scala配备了一套富有表现力的类型系统,该抽象概念以一种安全的和一致的方式被使用。
  • Scala是可扩展的:Scala的设计承认了实践事实,领域特定应用开发通常需要领域特定语言扩展。Scala提供了一个独特的语言组合机制,这可以更加容易地以类库的形式增加新的语言结构:
    • 任何方式可以被用作中缀(infix)或后缀(postfix)操作符
    • 闭包按照所期望的类型(目标类型)自动地被构造
    两者结合使用可方便地定义新语句,无需扩展语法,也无需使用类似宏的元编程工具。
  • Scala可与Java和.NET进行互操作:Scala 设计时就考虑了与流行编程环境良好交互,如Java 2运行时环境(JRE)和 .NET框架(CLR)。特别是与主流面向对象语言,如Java和C#尽量无缝交互。Scala有像Java和C#一样的编译模型(独立编译,动态装载 类),允许访问成千上万的高质量类库。

          Apache Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。

           Apache Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。


spark是一种内存计算框架,计算基本是在内存中进行操作,所以效率比hadoop高很多,但是使用内存那就必然代价比较大。

*****************
--关于spark--
*****************

Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更 好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。其架构如下图所示:

Storm Spark Hadoop 三个流行并行计算框架理解 - itoedr - itoedr的it学苑
 

 

Spark与Hadoop的对比

Spark的中间数据放到内存中,对于迭代运算效率更高。

Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。

Spark比Hadoop更通用。

Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup,mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。

这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。

不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。

容错性

在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。

可用性

Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。

Spark与Hadoop的结合

Spark可以直接对HDFS进行数据的读写,同样支持Spark on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。

Spark的适用场景

Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小

由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。

总的来说Spark的适用面比较广泛且比较通用。

运行模式

  • 本地模式
  • Standalone模式
  • Mesoes模式
  • yarn模式

Spark生态系统

Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark 通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。

Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部 分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它 处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需 要历史数据和实时数据联合分析的特定应用场合。

Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。

在业界的使用

Spark项目在2009年启动,2010年开源, 现在使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘宝等,豆瓣也在使用Spark的python克隆版Dpark。

Spark核心概念

Resilient Distributed Dataset (RDD)弹性分布数据集

RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark最核心的东 西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存 中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭 代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。

RDD的特点:

  1. 它是在集群节点上的不可变的、已分区的集合对象。

  2. 通过并行转换的方式来创建如(map, filter, join, etc)。

  3. 失败自动重建。

  4. 可以控制存储级别(内存、磁盘等)来进行重用。

  5. 必须是可序列化的。

  6. 是静态类型的。

RDD的好处

  1. RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。

  2. RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。

  3. RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。

  4. RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。

RDD的存储与分区

  1. 用户可以选择不同的存储级别存储RDD以便重用。

  2. 当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。

  3. RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。

RDD的内部表示

在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:

  1. 分区列表(数据块列表)

  2. 计算每个分片的函数(根据父RDD计算出此RDD)

  3. 对父RDD的依赖列表

  4. 对key-value RDD的Partitioner【可选】

  5. 每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】

RDD的存储级别

  RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:

 

val NONE = new StorageLevel(false, false, false)
    val DISK_ONLY = new StorageLevel(true, false, false)
    val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
    val MEMORY_ONLY = new StorageLevel(false, true, true)
    val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)
    val MEMORY_ONLY_SER = new StorageLevel(false, true, false)
    val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)
    val MEMORY_AND_DISK = new StorageLevel(true, true, true)
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
    val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) 

RDD定义了各种操作,不同类型的数据由不同的RDD类抽象表示,不同的操作也由RDD进行抽实现。

RDD的生成

RDD有两种创建方式:

  1、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建。

  2、从父RDD转换得到新RDD。

下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:

 

 // SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像
    // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。
    def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],
        classOf[Text], minSplits) .map(pair => pair._2.toString) }
 
    // 根据Hadoop配置,及InputFormat等创建HadoopRDD  
    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)  

对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎一样的:

 

   reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
 
    val key: K = reader.createKey()
    val value: V = reader.createValue()
 
    //使用Hadoop MapReduce的RecordReader读取数据,每个Key、Value对以元组返回。
    override def getNext() = {
    try {
      finished = !reader.next(key, value)
    } catch {
      case eof: EOFException =>
        finished = true
    }
      (key, value)
    }

RDD的转换与操作

对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。

转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到 Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

下面使用一个例子来示例说明Transformations与Actions在Spark的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"),
        Seq(System.getenv("SPARK_TEST_JAR")))
 
    val rdd_A = sc.textFile(hdfs://.....)
    val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1))
 
    val rdd_C = sc.textFile(hdfs://.....)
    val rdd_D = rdd_C.map(line => (line.substring(10), 1))
    val rdd_E = rdd_D.reduceByKey((a, b) => a + b)
 
    val rdd_F = rdd_B.jion(rdd_E)
 
    rdd_F.saveAsSequenceFile(hdfs://....)

Storm Spark Hadoop 三个流行并行计算框架理解 - itoedr - itoedr的it学苑
 

 

Lineage(血统)

利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据 容错性(节点实效/数据丢失)问题时采用的方案。为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它 RDD中演变过来的。相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换 (Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限 制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。

RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错的高效性。Narrow Dependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分 区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与 Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这 种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。

容错

在RDD计算,通过checkpint进行容错,做checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算生成 丢失的分区数据。

资源管理与作业调度

Spark对于资源管理与作业调度可以使用Standalone(独立模式),Apache Mesos及Hadoop YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在现在的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就非常容 易,Spark on Yarn的大致框架图。

Storm Spark Hadoop 三个流行并行计算框架理解 - itoedr - itoedr的it学苑
 

 

让Spark运行于YARN上与Hadoop共用集群资源可以提高资源利用率。

编程接口

Spark通过与编程语言集成的方式暴露RDD的操作,类似于DryadLINQ和FlumeJava,每个数据集都表示为RDD对象,对数据集的操作就 表示成对RDD对象的操作。Spark主要的编程语言是Scala,选择Scala是因为它的简洁性(Scala可以很方便在交互式下使用)和性能 (JVM上的静态强类型语言)。

Spark和Hadoop MapReduce类似,由Master(类似于MapReduce的Jobtracker)和Workers(Spark的Slave工作节点)组成。 用户编写的Spark程序被称为Driver程序,Dirver程序会连接master并定义了对各RDD的转换与操作,而对RDD的转换与操作通过 Scala闭包(字面量函数)来表示,Scala使用Java对象来表示闭包且都是可序列化的,以此把对RDD的闭包操作发送到各Workers节点。 Workers存储着数据分块和享有集群内存,是运行在工作节点上的守护进程,当它收到对RDD的操作时,根据数据分片信息进行本地化数据操作,生成新的 数据分片、返回结果或把RDD写入存储系统。

Storm Spark Hadoop 三个流行并行计算框架理解 - itoedr - itoedr的it学苑
 

 

Scala

Spark使用Scala开发,默认使用Scala作为编程语言。编写Spark程序比编写Hadoop MapReduce程序要简单的多,SparK提供了Spark-Shell,可以在Spark-Shell测试程序。写SparK程序的一般步骤就是创 建或使用(SparkContext)实例,使用SparkContext创建RDD,然后就是对RDD进行操作。如:

 


C
1
2
3
4
    val sc = new SparkContext(master, appName, [sparkHome], [jars])
    val textFile = sc.textFile("hdfs://.....")
    textFile.map(....).filter(.....).....
 

Java

Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是一样的,因为都是JVM上的语言,Scala与Java可以互操作,Java编程接口其实就是对Scala的封装。如:

  

1
2
3
4
5
6
7
8
9
10
    JavaSparkContext sc = new JavaSparkContext(...);  
    JavaRDD lines = ctx.textFile("hdfs://...");
    JavaRDD words = lines.flatMap(
      new FlatMapFunction<String, String>() {
         public Iterable call(String s) {
            return Arrays.asList(s.split(" "));
         }
       }
    );
Python

现在Spark也提供了Python编程接口,Spark使用py4j来实现python与java的互操作,从而实现使用python编写Spark程 序。Spark也同样提供了pyspark,一个Spark的python shell,可以以交互式的方式使用Python编写Spark程序。 如:

  


C
1
2
3
4
5
    from pyspark import SparkContext
    sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])
    words = sc.textFile("/usr/share/dict/words")
    words.filter(lambda w: w.startswith("spar")).take(5)

使用示例

Standalone模式

为方便Spark的推广使用,Spark提供了Standalone模式,Spark一开始就设计运行于Apache Mesos资源管理框架上,这是非常好的设计,但是却带了部署测试的复杂性。为了让Spark能更方便的部署和尝试,Spark因此提供了 Standalone运行模式,它由一个Spark Master和多个Spark worker组成,与Hadoop MapReduce1很相似,就连集群启动方式都几乎是一样。

以Standalone模式运行Spark集群

下载Scala2.9.3,并配置SCALA_HOME

下载Spark代码(可以使用源码编译也可以下载编译好的版本)这里下载 编译好的版本(http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz)

解压spark-0.7.3-prebuilt-cdh4.tgz安装包

修改配置(conf/*) slaves: 配置工作节点的主机名 spark-env.sh:配置环境变量。

  

1
2
3
4
5
6
7
8
9
10
SCALA_HOME=/home/spark/scala-2.9.3
JAVA_HOME=/home/spark/jdk1.6.0_45
SPARK_MASTER_IP=spark1            
SPARK_MASTER_PORT=30111
SPARK_MASTER_WEBUI_PORT=30118
SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g
SPARK_WORKER_PORT=30333
SPARK_WORKER_WEBUI_PORT=30119
SPARK_WORKER_INSTANCES=1
把Hadoop配置copy到conf目录下

在master主机上对其它机器做ssh无密码登录

把配置好的Spark程序使用scp copy到其它机器

在master启动集群

  

1
2
$SPARK_HOME/start-all.sh
 

Spark-shell现在还不支持Yarn模式,使用Yarn模式运行,需要把Spark程序全部打包成一个jar包提交到Yarn上运行。目录只有branch-0.8版本才真正支持Yarn。

以Yarn模式运行Spark

下载Spark代码.

 

2
git clone git://github.com/mesos/spark

切换到branch-0.8

  

1
2
3
cd spark
git checkout -b yarn --track origin/yarn

使用sbt编译Spark并

  


C
1
2
3
4
$SPARK_HOME/sbt/sbt
> package
> assembly

把Hadoop yarn配置copy到conf目录下

 运行测试

  


C
1
2
3
4
SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar \
./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \
--class spark.examples.SparkPi --args yarn-standalone

使用Spark-shell

Spark-shell使用很简单,当Spark以Standalon模式运行后,使用$SPARK_HOME/spark-shell进入shell即 可,在Spark-shell中SparkContext已经创建好了,实例名为sc可以直接使用,还有一个需要注意的是,在Standalone模式 下,Spark默认使用的调度器的FIFO调度器而不是公平调度,而Spark-shell作为一个Spark程序一直运行在Spark上,其它的 Spark程序就只能排队等待,也就是说同一时间只能有一个Spark-shell在运行。

在Spark-shell上写程序非常简单,就像在Scala Shell上写程序一样。

  

1
2
3
4
5
6
7
8
9
    scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data")
    textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
 
    scala> textFile.count() // Number of items in this RDD
    res0: Long = 21374
 
    scala> textFile.first() // First item in this RDD
    res1: String = # Spark

编写Driver程序

在Spark中Spark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是一样的,不同的地方就是SparkContext需要自己创建。如WorkCount程序如下:

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import spark.SparkContext
import SparkContext._
 
object WordCount {
  def main(args: Array[String]) {
    if (args.length ==0 ){
      println("usage is org.test.WordCount <master>")
    }
    println("the args: ")
    args.foreach(println)
 
    val hdfsPath = "hdfs://hadoop1:8020"
 
    // create the SparkContext, args(0)由yarn传入appMaster地址
    val sc = new SparkContext(args(0), "WrodCount",
    System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
 
    val textFile = sc.textFile(hdfsPath + args(1))
 
    val result = textFile.flatMap(line => line.split("\\s+"))
        .map(word => (word, 1)).reduceByKey(_ + _)
 
    result.saveAsTextFile(hdfsPath + args(2))
  }
}
********
关于strom
********
首先整体认识:Hadoop是磁盘级计算,进行计算时,数据在磁盘上,需要读写磁盘;Storm是内存级计算,数据直接通过网络导入内存。读写内存比读写磁盘速度快n个数量级。根据Harvard CS61课件,磁盘访问延迟约为内存访问延迟的75000倍。所以Storm更快。

注释:
1. 延时 , 指数据从产生到运算产生结果的时间,“快”应该主要指这个。
2. 吞吐, 指系统单位时间处理的数据量。

storm的网络直传、内存计算,其时延必然比hadoop的通过hdfs传输低得多;当计算模型比较适合流式时,storm的流式处理,省去了批处理的收集数据的时间;因为storm是服务型的作业,也省去了作业调度的时延。所以从时延上来看,storm要快于hadoop。

从原理角度来讲:
Hadoop M/R基于HDFS,需要切分输入数据、产生中间数据文件、排序、数据压缩、多份复制等,效率较低。
Storm 基于ZeroMQ这个高性能的消息通讯库,不持久化数据。


为什么storm比hadoop快,下面举一个应用场景
说一个典型的场景,几千个日志生产方产生日志文件,需要进行一些ETL操作存入一个数据库。

假设利用hadoop,则需要先存入hdfs,按每一分钟切一个文件的粒度来算(这个粒度已经极端的细了,再小的话hdfs上会一堆小文件),hadoop开始计算时,1分钟已经过去了,然后再开始调度任务又花了一分钟,然后作业运行起来,假设机器特别多,几钞钟就算完了,然后写数据库假设也花了很少的时间,这样,从数据产生到最后可以使用已经过去了至少两分多钟。
而流式计算则是数据产生时,则有一个程序去一直监控日志的产生,产生一行就通过一个传输系统发给流式计算系统,然后流式计算系统直接处理,处理完之后直接写入数据库,每条数据从产生到写入数据库,在资源充足时可以在毫秒级别完成。

同时说一下另外一个场景:
如果一个大文件的wordcount,把它放到storm上进行流式的处理,等所有已有数据处理完才让storm输出结果,这时候,你再把它和hadoop比较快慢,这时,其实比较的不是时延,而是比较的吞吐了。

--------------------------------------------------------------------------------------------------------------------------------------------------

最主要的方面:Hadoop使用磁盘作为中间交换的介质,而storm的数据是一直在内存中流转的。
两者面向的领域也不完全相同,一个是批量处理,基于任务调度的;另外一个是实时处理,基于流。
以水为例,Hadoop可以看作是纯净水,一桶桶地搬;而Storm是用水管,预先接好(Topology),然后打开水龙头,水就源源不断地流出来了。

---------------------------------------------------------------------------------------------------------------------------------------------------
Storm的主工程师Nathan Marz表示: Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。更棒的是你可以使用任意编程语言来做开发。
Storm的主要特点如下:
1.简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。
2.可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
3.容错性。Storm会管理工作进程和节点的故障。
4.水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
5.可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
6.快速。系统的设计保证了消息能得到快速的处理,使用?MQ作为其底层消息队列。
7.本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。
---------------------------------------------------------------------------------------------------------------------------------------------------------------

在消耗资源相同的情况下,一般来说storm的延时低于mapreduce。但是吞吐也低于mapreduce。storm是典型的流计算系统,mapreduce是典型的批处理系统。下面对流计算和批处理系统流程

这个个数据处理流程来说大致可以分三个阶段:
1. 数据采集与准备
2. 数据计算(涉及计算中的中间存储), 题主中的“那些方面决定”应该主要是指这个阶段处理方式。
3. 数据结果展现(反馈)

1)数据采集阶段,目前典型的处理处理策略:数据的产生系统一般出自页面打点和解析DB的log,流计算将数据采集中消息队列(比如kafaka,metaQ,timetunle)等。批处理系统一般将数据采集进分布式文件系统(比如HDFS),当然也有使用消息队列的。我们暂且把消息队列和文件系统称为预处理存储。二者在延时和吞吐上没太大区别,接下来从这个预处理存储进入到数据计算阶段有很大的区别,流计算一般在实时的读取消息队列进入流计算系统(storm)的数据进行运算,批处理一系统一般会攒一大批后批量导入到计算系统(hadoop),这里就有了延时的区别。
2)数据计算阶段,流计算系统(storm)的延时低主要有一下几个方面(针对题主的问题)
A: storm 进程是常驻的,有数据就可以进行实时的处理
mapreduce 数据攒一批后由作业管理系统启动任务,Jobtracker计算任务分配,tasktacker启动相关的运算进程
B: stom每个计算单元之间数据之间通过网络(zeromq)直接传输。
mapreduce map任务运算的结果要写入到HDFS,在于reduce任务通过网络拖过去运算。相对来说多了磁盘读写,比较慢
C: 对于复杂运算
storm的运算模型直接支持DAG(有向无环图)
mapreduce 需要肯多个MR过程组成,有些map操作没有意义的

3)数据结果展现
流计算一般运算结果直接反馈到最终结果集中(展示页面,数据库,搜索引擎的索引)。而mapreduce一般需要整个运算结束后将结果批量导入到结果集中。

实际流计算和批处理系统没有本质的区别,像storm的trident也有批概念,而mapreduce可以将每次运算的数据集缩小(比如几分钟启动一次),facebook的puma就是基于hadoop做的流计算系统。
  评论这张
 
阅读(637)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017