Apache Hadoop准实时数据处理的架构模式

来自:董老师在硅谷(微信号:donglaoshi-123)

原文:Architectural Patterns for Near Real-Time Data Processing with Apache Hadoop 

译:Robin robinlee@cmu.edu

评估好哪一种流架构模式最适合你的案例,是成功生产开发的先决条件。

Apache Hadoop 生态系统已成为企业实时地处理和挖掘大数据的首选。 Apache的Kafka, Flume, Spark, Storm, Samza等技术在不断地推进新的可能。人们很容易泛化大规模实时数据案例,但其实他们可以细分为几种架构模式,Apache系统里的不同组件适合于不同的案例。

这篇文章探讨四种主要的设计模式,案例来自于我们企业客户的数据中心的实例,并解释如何在Hadoop上实现这些架构模式。

流处理模式

四种流模式(经常串联使用)为:

流采集:低延迟将数据输入到HDFS,Apache HBase和Apache Solr。

基于外部环境的准实时事件处理: 对事件采取警报,标示,转化,过滤等动作。这些动作的触发可能取决于复杂的标准,例如异常监测模型。通常的使用案例,例如准实时的欺诈监测和推荐系统,需要达到100毫秒以内的延迟。

准实时事件分割处理:类似于准实时事件处理,但通过将数据分割获得一些好处—例如将更多相关外部信息存入内存。这个模式也要求延迟在100毫秒以内。

为整合或机器学习使用的复杂拓扑结构:流处理的精髓:实时地通过复杂而灵活的操作从数据中获取答案。这里,因为结果通常依赖于一段窗口内的计算,需要更多的活跃的数据, 于是重点从获得超低延迟转移到了功能性和准确性。

接下来,我们将介绍如何用可检测的,可被证明的和可维护的方式来实现这些设计模式。

流采集

传统上,Flume是最为推荐的流采集系统。它的大的源和池囊括了所有关于消费什么和写到哪里的基础(关于如何配置和管理flume,参考Using Flume,由O’Reilly 出版的Cloudera工程师/Flume 项目管理委员会成员Hari Shreedharan编写)。

Figure 1译者附:Flume架构

在过去的一年中,Kafka也变得非常受欢迎,因为playback和replication等特性。由于Flume和Kafka有重叠的目标,他们的关系常常令人困惑。他们如何配合?答案是简单的:Kafka的管道和Flume的通道类似,虽然是一个更好的管道,原因就是刚才所述的特性。一个通行的方法就是用Flume作为源(source)和池(sink),而Kafka是他们中间的管道。

下图阐明kafka如何作为Flume的上游数据和下游目的,或Flume管道。

下图的设计是具有大规模拓展性,经过实战检验的架构设计,由Cloudera 管理者监控,容错,并且支持回放。

值得注意的一件事就是,这个设计多么优雅地处理故障。Flume 池从Kafka消费者群里取回。通过Apache zookeeper的帮助,Kafka 消费者群取回topic的位移。如果一个Flume池发生故障,Kafka消费者将把负载重新分发到其他的池中。当那个池恢复了,消费者池将重新分发。

基于外部环境的准实时事件处理

重申一下,这个模式的适用案例通常是观察事件流入然后采取立即动作,可以是转化数据或一些外部操作。决策的逻辑依赖于外部的档案或元数据。一个简单并且可拓展的实现方法是,在你的Kafka/Flume架构中添加源(Source)或池(Sink)Flume拦截器。只需简单配置,不难达到低毫秒级延迟。

Flume拦截器允许用户的代码对事件或批量事件进行修改或采取动作。用户代码可以与本地内存或外部Hbase交互,以获取决策需要的档案。Hbase通常可以4-25毫秒内给予我们信息,根据网络状况,模式概要,设计和配置而有所不同。你也可以将Hbase配置为永远不停止服务或被中断,即便在故障情况下。

这一设计的实现除了拦截器中应用的具体逻辑几乎不需要编程。Cloudera管理器提供直观的用户界面,可以部署这个逻辑,包括连结,配置,监测这一服务。

准实时基于外部环境的分割化的事件处理

下图的架构(未分割方案),你将需要频繁查询Hbase,因为针对某一事件的外部上下文环境在flume拦截器的本地内存中装不下。

但是,如果你定义一个键值来分割数据,你将可以把数据流匹配到相关上下文的一个子集。如果你将数据分割成十部分,那么你只需要将十分之一的档案放入内存里。 Hbase是快,但本地内存更快。Kafka允许你自定义分割器来分割数据。

注意,在这里,Flume并不是必须的;根本的方案只是一个Kafka消费者。所以,你可以只用一个YARN消费者或只有Mapper的MapReduce。

针对集成或机器学习的复杂拓扑

到此为止,我们探索了事件层面的操作。然而,有时你需要更复杂的操作,例如计数,求平均,会话流程,或基于流数据的机器学习建模。在这种情况,Spark流处理是最理想的的工具,因为:

和其他工具相比,Spark易于开发

Spark丰富简明的API让建设复杂拓扑变得容易

实时流处理和批处理的代码相似

只需很少的修改,实时小量流处理的代码就可以用于大规模离线的批处理。不仅减少了代码量,这个方法减少测试和整合需要的时间。

只需了解一个技术引擎

训练员工了解一个分布式处理引擎的机制和构件是有成本的。使用spark并标准化将会合并流处理和批处理的成本。

微批处理帮你更可靠地进行拓展规模

在批处理层面的应答将允许更多的吞吐量,允许无需顾忌双发的解决方案。微批处理也帮助在大规模下高效发送修改到HDFS或Hbase。

与Hadoop生态系统的集成

Spark与HDFS,Hbase和Kafka有很深的集成。

无丢失数据的风险

由于有了WAL和Kafka,Spark流处理避免了故障时丢失数据的风险

易于排错和运行

在本地的IDE中你就可以对你的spark流处理代码进行排错和逐步检查。而且,代码和普通函数式程序代码类似,对Java或Scala程序员来说,无需花很多时间就能熟悉。(Python也支持)

流处理是天然状态化的

Spark流处理中,状态是“第一公民”,意味着很容易写基于状态的流处理应用,对节点的故障可恢复。

作为实际的标准,Spark现在正在得到整个生态系统的长期投入

在写此文时,spark在30天内已有700次左右的提交—和其它框架相比,例如Storm,只有15次的提交。

你可以使用机器学习的库

Spark的MLlib库越来越受欢迎,它的功能只会越来越强大。

如果需要,你可使用SQL结构化查询语言

通过Spark SQL,你可以为你的流处理应用添加SQL逻辑,从而简化代码

结论

流处理和几种可能的模式有很强大的功能,但正如你在这篇文章所了解,你可以通过了解哪一种设计模式适合你的案例,从而最少量的代码做非常好的事情。

Ted Malaska是Cloudera解决方案架构师,Spark,Flume和Hbase的贡献者,O’Reilly书籍 《Hadoop 应用架构》的合作作者。

你不需要Hadoop做数据分析的10个理由 —— 使用之前必须测试其他替代品

来自:开源中国社区

链接:http://www.oschina.net/translate/hadoop-when-to-use

原文:http://www.fromdev.com/2013/06/hadoop-when-to-use.html

为你的业务使用大数据技术是一个非常有吸引力的事情,现在Apache Hadoop使得它更加吸引人了。

Hadoop是一个大规模可伸缩的数据存储平台,被用作许多大数据项目的基础。

Hadoop很强大,但是它有一个很陡峭的学习曲线,需要公司在时间和其他资源上作大量的投资。

如果正确地应用它,对你的公司来说,Hadoop可以成为一个真正的游戏规则改变者,但它存在很多被错误使用的可能。

另一方面,许多企业(不像是谷歌、Facebook或Twitter)都没有真正的“大数据”来需要用一个巨大的hadoop集群分析事物,然而 hadoop 这个流行语却吸引着他们。

如大卫惠勒所说的:“所有计算机科学的问题都可以用另一个间接的中间层来解决”。 Hadoop提供了这样一种间接层;作为一个软件架构师,当你的最高管理层对一些流行语有很不专业的偏颇认识时,也许真的很难采取正确的决定。

在本文中,我想要建议“应在投资到Hadoop之前尝试一些替代品”。

了解你的数据

总体数据的大小

Hadoop被设计用来在大型数据集上能进行有效的工作。简单给点提示:

  • Hadoop有一个专为大尺寸文件(如几G)设计的文件系统(HDFS)。因此,如果你的数据文件尺寸只是几M的话,建议你合并(通过zip或tar)多个文件到一个文件中,使其尺寸在几百M到几G范围内。
  • HDFS把大文件们拆分存储到以64MB或128MB或更大的块单元中。

如果你的数据集相对较小,那它就不会是hadoop的巨型生态系统的最佳使用之地。这需要你去对你的数据比以往理解更多一些,分析需要什么类型的查询,看看你的数据是否真得“大”。

另一方面,只是通过数据库的大小来测量数据可能是骗人的,因为你的计算量可能会更大。 有时你可能会做更多的数学计算或分析小数据集的排列,这些可以远远大于实际的数据。所以关键是要“了解你的数据,并且很清楚它”。

数据增长数度(增长速率)

你的数据仓库或是其它数据源中可能拥有数个TB的数据。然而,在建立 Hadoop 集群前,你必须考虑到数据的增长。

向数据分析师问几个简单的问题:

  • 数据增长的有多快?这个数据增长的步伐很快么?
  • 数月或数年之后,这个数据将会达到什么样的尺寸?

许多公司的数据增长是以数年而非数月或数日计算的。如果你的数据增长数度非常快,我见建议你考虑一下归档及清理技术(将在本文后面的内容中详述),而非立即上马 Hadoop 集群。

如何减少你的数据量

如果你觉得你的数据实在是太大了,你可以考虑使用下面的方法将数据减少到相对可控的规模上。下面的几个选项都已经被业内成功使用多年。

归档

数据归档是将陈旧数据移动到一个独立数据储存器以长期保留(如果需要)的过程。

这需要对数据、对应用使用情况的充分了解。处理大数据的电子商务公司在现场数据库中保存近期3个月的订单细节数据,而早期订单则保存在一个独立的数据存储器中。

这个方法也可以使用到你的数据仓库中。你可以保存近期的数据以便更快的查询和报告,而将访问频率较低的数据保存在一个其它不同的存储设备中。

考虑清除数据

我们忙于收集数据时经常并不真正确定我们应该保留多少。如果你存储大量可能不是很有用的数据,它就会拖慢你近期数据的处理。弄清你的业务需求,看看是否可以删除旧的数据,把从那些数据分析的趋势存储起来以供后用。这不仅会节省你的空间,而且还可以在分析近期数据时帮助你加快速度。

对这种情况的一个常见的最佳实践是在您的数据仓库中有一些标准列,像创建日期,创建者,更新日期,更新者。现在根据这些列创建一个每日/每月的cron作业,用它清除你不想在你的数据仓库中看到的时段的数据。清除数据的逻辑基于你的领域可能不同,因此在实施它之前应作一些考虑。

如果您正在使用一个归档工具,它也可能是通过很轻松地配置就能清除无用的存档数据。

所有的数据都不重要

你可能受不了为你的业务保留所有数据的诱惑。你的数据有各种各样的来源,比如日志文件、现场交易、供应商整合、ETL工作、营销活动数据等等。但你应该知道,不是所有的数据都是关键业务,把它们都保存在一个数据仓库中可能不是很有帮助反而有害。在它们被存储到你的数据仓库之前,应从源头上过滤不需要的数据。如果你真需要在你的数据库的表里每一列存储和分析那些数据,就准备好发疯吧。

想好你想收集哪些作为数据

假设你进入一个在线视频编辑的业务。你想保存你的用户在每个视频上做的全部更改吗?这会产生巨大的体积。当你感觉到你的数据仓库可能无法处理它的情况下,你可能需要考虑只存储元数据。视频编辑是一个很可能的例子,不过它可能适用于许多其他与你存储数据相关的信息。

一般来说,如果你有一些有关系的数据,你就有机会从多个来源得到它们,而且不是所有的都需要存储在你的数据仓库中。

更智能的分析

聘请理解业务的分析师

现在,你可能已经明白“了解数据”对于有效地管理它们来说是非常重要的。相信我,当你觉得我已经试了所有这些东西时,这一步会帮到你。是时候让我们进入一个如Hadoop这样的大数据解决方案中了。

如果你的数据分析师不懂应从中提取什么出来,Hadoop就将几乎无用。应寄希望于那些理解业务的人。鼓励他们做实验和学习新的方法来看待相同的数据。找出哪些可以与现有基础设施取得唾手可得的收益。

为制订决策使用统计抽样

统计抽样是研究人员和数学家为了对大型数据推断合理结论而使用的一种非常古老的技术。

通过执行一个统计的样本,我们的体积可以极大地减少。不用跟踪数百万或数十亿的数据点,我们只需要随机挑选几千或几百个即可。

该技术不能提供准确的结果,但是它可以被用于对一个大型数据集获得高水平的理解。

定标技术

你真地把关系数据库的处理发挥到极致了吗? 

在你真去探索其他技术之前,我希望你去看看关系型数据库是否能够处理它。人们使用关系数据库已经很久了,已经托管了一些几T字节大小的数据仓库。在你决定进入hadoop之前,你可以对关系数据库尝试以下方法。

数据分区

数据分区就是逻辑上和/或物理上把数据划分成一些更容易维护或访问的部分的过程。分区支持最流行的开放源代码关系数据库(MySQL 分区 和 Postgres 分区 )。

对关系数据库尝试数据库分片的方法

数据库分片可以作为对关系数据库的处理速度发挥到极限的最后一个手段。这种方法可以应用于你可以逻辑上分离数据到不同的节点,并在你的分析中有更少的交叉节点连接的时候。在web应用程序中,一个常见的分片方法是基于把用户和所有与一个用户相关的信息存储在一个节点上来确保最佳的速度。

分片并不容易,如果你有很多复杂的关系,并且没有简单的方法来分离数据到不同的节点上,这个方案可能不适合你。如果你的应用需要有很多交叉节点连接,分片的打算可能会失败。

结论

我曾在不同的公司被高层管理人员要求把Hadoop作为一个可选项去做某些事。要说服他们总是很难,但是当我把这个信息告诉他们后,他们不得不三思而后行。我很幸运,能为我工作的这些公司节省一些钱。

如果你发现为了扩大你的关系数据库,你已经尝试了所有可能的选项,这才是你应该开始考虑建立一个Hadoop集群的时候。

首先,您可能应该使用cloudera提供的虚拟机镜像。它们对于在你现有的基础设施上使用hadoop做快速的概念证明真的是很方便。

你对大数据有何经验?请在评论部分与我们分享。

实时处理日均 50 亿会话,解析 Twitter Answers 的架构

英文:Twitter

译者:伯乐在线-刘志成

链接:http://blog.jobbole.com/87067/

去年我们发布了Answers,至今移动社区产生了惊人的使用量,让我们感到兴奋不已。现在Answers每天处理50亿次会话,并且这个数量在持续增加。上亿设备每秒向Answers端点发送数以百万计的请求。在你已经阅读到此处的这段时间里,Answers后台收到并处理了一千万次分析事件。

其中的挑战是如何利用这些信息向移动开发者提供可靠的、实时的、有实际价值的洞见(视角)去了解他们的移动应用。

在高层,我们依靠 组件解耦、异步通信、在应对灾难性故障时优雅地服务降级等原则来帮助架构决策。我们使用Lambda架构将数据完整性和实时数据更新结合起来。

在实践过程中,我们需要设计一个能够接收并保存事件、执行离线和实时计算且能将上述两种计算结果整合成相关信息的系统。这些行为全部都要以百万次每秒的规模执行。

让我们从第一个挑战开始:接受并处理这些事件。

事件接收

在设计设备-服务器通信的时候,我们的目标是:减少对电池和网络使用的影响;确保数据的可靠性;接近实时地获取数据。为了减少对设备的影响,我们批量地发送分析数据并且在发送前对数据进行压缩。为了保证这些宝贵的数据始终能够到达我们的服务器,在传输失败随机退避后以及达到设备存储达到上限时,设备会进行重传。为了确保数据能够尽快到达服务器,我们设置来多个触发器来使设备尝试发送:当程序运行于前台的时候,事件触发器每分钟触发一次;一个消息数量触发器和程序转入后台触发器。

这样的通信协议导致设备每秒发送来数以万计压缩过的有效载荷。每一个载荷都包含数十条事件。为了能够可靠的、易于线性伸缩的方式去处理载荷,接收事件的服务必须极度简单。

这个服务使用GO语言编写,这个服务使用了亚马逊弹性负载均衡器(ELB),并将每一个消息负荷放入一个持久化的Kafka队列。

存储

Kafka是一个持久存储器,因为它把收到的消息写入磁盘并且每个消息都有多份冗余。因此一旦我们知道信息到了Kafka队列,我们就可以通过延迟处理、再处理来容忍下游延迟和下游失败。然而,Kafka不是我们历史数据的永久真理之源——按照上文提到的速度,仅仅是几天的数据,我们也需要数以百计的box来存储。因此我们把Kafka集群配置为将消息只保留几个小时(这些时间足够我们处理不期而至的重大故障)并且将数据尽快地存入永久存储——亚马逊简易存储服务(Amazon S3)。

我们广泛地使用Storm来进行实时数据处理,第一个相关的Topology就是从Kafka读取信息并存储到Amazon S3上。

批量计算

一旦这些数据存到了S3上,我们可以使用亚马逊弹性MapReduce(Amazon EMR)来计算我们的数据能够计算的任何东西。这既包括要展示在客户的仪表盘上的数据,也包括我们为了开发新功能而开发的实验性的任务。

我们使用Cascading框架编写、Amazon EMR执行MapReduce程序。 Amazon EMR将我们存储到S3上的数据作为输入,处理完毕后,再将结果存入S3。我们通过运行在Storm上的调度topology来探测程序执行完毕,并将结果灌入Cassandra集群,这样结果就能用于亚秒级查询API。

实时计算

迄今,我们描述的是一个能够执行分析计算的持久的容错的框架。然而,存在一个显眼的问题——这个框架不是实时的。一些计算每小时计算一次,有的计算需要一整天的数据作为输入。计算时间从几分钟到几小时不等,把S3上的输出导入到服务层也需要这么多时间。因此,在最好情况下,我们的数据也总是拖后几个小时,显然不能满足实时和可操作的目标。

为了达成实时的目标,数据涌入后进行存档的同时,我们对数据进行流式计算。

就像我们的存储Topology读取数据一样,一个独立的Storm Topology实时地从Kafka Topic中读取数据然后进行实时计算,计算的逻辑和MapReduce任务一样。这些实时计算的结果放在另一个独立的Cassandra集群里以供实时查询。

为了弥补我们在时间以及在资源方面可能的不足,我们没有在批量处理层中而是在实时计算层中使用了一些概率算法,如布隆过滤器、HyperLogLog(也有一些自己开发的算法)。相对于那些蛮力替代品,这些算法在空间和时间复杂度上有数量级的优势,同时只有可忽略的精确度损失。

合并

现在我们拥有两个独立生产出的数据集(批处理和实时处理),我们怎么将二者合并才能得到一个一致的结果?

我们在API的逻辑中,根据特定的情况分别使用两个数据集然后合并它们。

因为批量计算是可重现的,且相对于实时计算来说更容错,我们的API总是倾向于使用批量产生的数据。例如,API接到了一个三十天的时间序列的日活跃用户数量数据请求,它首先会到批量数据Cassandra集群里查询全范围的数据。如果这是一个历史数据检索,所有的数据都已经得到。然而,查询的请求更可能会包含当天,批量产生的数据填充了大部分结果,只有近一两天的数据会被实时数据填充。

错误处理

让我们来温习几个失效的场景,看一下这样的架构在处理错误的时候, 是如何避免宕机或者损失数据,取之以优雅地降级。

我们在上文中已经讨论过设备上的回退重试策略。在设备端网络中断、服务器端短时无服务情况下,重试保证数据最终能够到达服务器。随机回退确保设备不会在某区域网络中断或者后端服务器短时间不可用之后,不会压垮(DDos攻击)服务器。

当实时处理层失效时,会发生什么?我们待命的工程师会受到通知并去解决问题。因为实时处理层的输入是存储在持久化的Kafka集群里,所以没有数据会丢失;等实时处理恢复之后,它会赶上处理那些停机期间应该处理的数据。

因为实时处理和批处理是完全解耦的,批处理层完全不会受到影响。因此唯一的影响就是实时处理层失效期间,对数据点实时更新的延迟。

如果批处理层有问题或者严重延迟的话,会发生什么?我们的API会无缝地多获取实时处理的数据。一个时间序列数据的查询,可能先前只取一天的实时处理结果,现在就需要查询两到三天的实时处理结果。因为实时处理和批处理是完全解耦的,实时处理不受影响继续运行。同时,我们的待命工程师会得到消息并且解决批处理层的问题。一旦批处理层恢复正常,它会执行那些延迟的数据处理任务,API也会无缝切换到使用现在可以得到的批处理的结果。

我们系统后端架构由四大组件构成:事件接收,事件存储,实时计算和批量计算。各个组件之间的持久化队列确保任意组件的失效不会扩散到其他组件,并且后续可以从中断中恢复。API可以在计算层延迟或者失效时无缝地优雅降级,在服务恢复后重新恢复;这些都是由API内部的检索逻辑来保证的。

Answer的目标是创建一个仪表盘,这个仪表盘能够把了解你的用户群变得非常简单。因此你可以将时间花费在打造令人惊叹的用户体验上,而不是用来掘穿数据。

非常感谢致力于将此架构实现(付诸现实)的Answers团队。还有《Big Data》这本书的作者Nathan Marz。