ApacheStrom
Hadoop, Storm, S4, HBase, Hive, Sqoop, Flume, ElasticSearch, Machine Learning, Kafka, Spring, Java, and J2EE.
Chapter 1 Setting up Storm on a Single Machine
batching processing front, Hadoop形成作为go-to框架处理大数据。知道最近,当一个人寻找框架构建实时流处理应用,是空的。这样的应用是很多商业的一个完整的部分因为它允许他们能快速反应事件,适应不同改变的情况。例子包括监控社交媒体来分析公众的反应。
Apache Strom演变成为平台对于工业leaders开发这样的分布式,实时,数据处理平台。它提供一系列原语能用来开发应用实时处理大规模数据在一个高度可scalable方式。
Strom是用于实时处理,Hadoop是用于块处理。Strom可用于以下情况:
- 流处理Stream processing:Storm用于处理流数据,实时更新一系列数据库。这过程发生实时地,处理速度需要跟上输入数据速度。
- 连续计算continuous computation:Strom能在数据流上连续计算,实时输送结果到客户端。这可能要求处理每个消息当它到达或创建small batches在很短时间内。一个连续计算的例子是streaming 趋势主题在Twitter到浏览器中。
- Distributed RPC:Storm 能并行一个任务繁重的查询这样就能实时计算。
- 实时分析:Storm能分析和相应数据从不同数据源如果它们实时发生。
在这个章节,会覆盖以下主题:
- Storm的特征
- Storm cluster的各种成分
- 什么是Storm 拓扑
- 本地和远程操作模式来执行storm拓扑
- 设置一个开发环境来开发storm拓扑
- 开发一个样本拓扑
- 设置一个单节点storm簇和它的要求
- 部署样本拓扑
Storm特征
- 快速:Storm处理1,000,000数组每秒每个节点
- 水平可缩放:快速是一个必要特征来构建一个大体积/大速度数据处理平台,但是一个单独节点有一个上限它每秒能处理多少数据。一个节点代表一个机器在设置storm应用中。Storm,作为一个分布式平台,允许你增加更多节点到storm cluster中,增加应用处理能力。同时,它也是线性可扩展的,意味着你可以双倍处理能力通过加倍节点数目。
- 容错:work 单元执行由work 进程在storm簇中。当一个worker死了,storm会重启那个worker。如果允许worker的node死了,storm会在其它节点运行这个worker。worker process的描述在第二章的配置拓扑的并行性中提到,setting up a storm cluster.
- 保证的数据处理:Storm保证每个消息传递到处理器会被至少处理一次。在失败的情况下,storm会重新播放丢失的数组。也可以配置每个消息只被处理一次。
- 容易操作:Storm简单部署和管理。一旦cluster被部署,要求很少的维护。
- 编程语言不限:虽然Storm运行在Java Virtual Machine上,在其上运行的应用可以用任意编程语言写,能读取和写入到标准输入输出流中。
Storm Components
Storm cluster遵从master-slave模型,master和slave进程通过ZooKeeper协调。以下是Storm cluster的组成部分:
- Nimbus:Nimbus节点是master节点在Storm cluster中。负责分布应用代码在不同worker节点中,赋予任务到不同机器,监管任务的任何失败情况,重启机器如果需要的话。
Nimbus是无状态的,存储所有数据在ZooKeeper中。只有一个单独的Nimbus在storm cluster中。它是设计变得快速fail,当Nimbus死亡,能快速重启没有任何负面效应在运行任务的worker节点上。不像Hadoop,如果JobTracker死了,所有运行的任务在一个不一致的状态中,需要被再次执行。 - Supervisor Nodes:监管节点是在storm cluster中的工作节点。每个监管节点运行一个监管守护进程负责创建,开始和停止工作进程来执行分配给那个节点的任务。类似Nimbus,a supervisor daemon也是快速fail,存储所有状态在ZooKeeper这样能重启而没有状态丢失。一个单独的supervisor daemon通常处理多个工作进程运行在那个机器上的。
The Zookeeper Cluster
在任意分布式应用中,各种进程需要和对方互相协调共享一些配置信息。ZooKeeper是一个应用提供这些服务在一种可靠的方式中。Storm作为一个分布式应用,用ZooKeeper cluster来协调各种应用。和cluster相关的状态和各种任务提交给Storm存储在ZooKeeper中。Nimbus和监管节点不直接和对方交流而是通过ZooKeeper。所有数据都存储在ZooKeeper中,Nimbus和supervisor daemon都可以很粗暴被杀,而不会有害影响cluster。
The Storm Data Model
数据的基本单元能被storm应用处理的叫做tuple。每个tuple包含一些预先定义的fields的lists。每个field的值可以是一个比特,字符,整数,等等。Storm也能提供API自定义数据类型,可以被序列化为fields在tuple中。
一个tuple可以被动态类型化,就是说,你需要定义fields的名字在tuple中而不是它们的数据类型。动态类型转换的选择帮助简化API。同时,由于一个处理单元在Storm中能处理多个tuple的类型,不可能预先定义filed 类型。
每个field在tuple中能被获取通过它的名字 getValueByField(string)或者位置索引。
Storm 拓扑的定义
在Storm术语中,一个拓扑是一个抽象定义计算的图。你能创建一个Storm拓扑,部署它在一个Storm cluster来处理数据。一个拓扑可以被代表为一个直接有向图,每个节点处理一些工作,转发到流中的下一个节点。
storm topology的成分如下:
- Stream:Storm的关键抽象是stream。一个stream是一个没有边界的数组能被storm并行处理。每个流能被一个或多个bolt(storm的处理单元)。因此,Storm可以被认为是一个平台转换stream。在上述图中,stream代表为箭头。
每个流在Storm应用中被给予一个ID,bolts能够产生和消费这些tuples从这些stream基于它们的ID。每个stream也有一个关联的策略对于tuple流经它们。 Spout:一个spout是tuples的源在storm拓扑中。它负责于读取或监听数据从外部源中,举例来说,通过从logfile读取或者队列中的新消息和发布它们——释放,在Storm的术语中——到流中。一个spout能释放多个流,每个不同的策略。举例来说,它能从一个logfile读取10-field,释放它们作为不同的流7-tuple和4-tuple。 无论何时spout释放一个stream,storm会追踪所有产生的tuple,当这个spout产生的所有tuple都执行完毕,storm会发送一个确认信息到spout。这个追踪发生当一个消息ID被提供当释放tuple时候。如果message ID 为null,追踪不会发生。
一个tuple处理超时可以被定义在一个拓扑内,如果一个tuple没有在timeout时间内被处理,失败消息会被发送到spout。这也只在messageID不为空情况下。小的性能增益可悲从storm提取以数据丢失为代价通过取消消息确认,可以通过跳过设置messageID当释放tuple时。
spout的重要函数:- nextTuple(),这个方法被storm调用来得到输入源的下一个tuple。
- ack(Object msgId)
- fail(Object msgId)
- open()这个方法只会被调用一次————当spout被初始化。如果需要连接外部数据源对于输入数据,定义连接外部数据源的逻辑在open方法中,获取外部数据源的数据释放它们在nextTuple方法中。
另一个需要注意的是写spout时没有任意一个方法应该阻塞,因为Storm调用所有方法在相同线程中。每个spout有一个内部缓存来追踪所有被释放的tuple。spout会保持这些tuple在缓存中指到它们被确定或者失败了。Storm会调用nexttuple当缓存不是满的时候。
- Bolt:一个bolt是Storm拓扑的处理powerhouse,负责转换流。理想调价下,每个bolt在拓扑中应该做tuple的简单转换,很多这样的bolt能结合起来做一个复杂的转换。
一个bolt可以订阅其他成分的多个流——可以是spouts或者bolt——相似的可以释放到多个流。
bolt执行的重要方法有:- execute(Tuple input):这个方法执行每个tuple从订阅的输入流。你可以做任何操作。没有要求立即处理tuple当这个函数被调用。tuple能被hold直到被要求。
- prepare(Map stormConf, TopologyContext context, OutputCollector collector): 一个bolt能被很多workers执行在一个拓扑中。一个bolt的实例在客户端机器中被创建,然后序列化提交到Nimbus。当Nimbus创建worker实例对于这个拓扑,发送序列化bolt到workers。这工作会去序列化bolt,调用prepare方法。在这个方法,必须确保bolt是正确配置来执行tuples。任何状态你想要维持能被存储作为实例变量对于bolt能被序列化和反序列化。
Operation Mode
操作模式显示topology如何在storm中部署。Storm支持两种操作模式来执行storm topology。
- The local mode:Storm运行在本地机器在一个单独的JVM中。这个模式模拟一个storm cluster在一个单独JVM中,用来测试和调试一个topology。
- The remote mode:在远程模式中,用storm客户端提交topology到master和所有包括的必须代码来执行topology。Nimbus会负责分布你的代码。
Setting up your development environment
- Java SDK 6
- Maven: Apache Maven是一个软件依赖管理工具用来管理项目的build,report和文档化。用这个我们就不需要手动下载所有依赖。
- Git:Distributed version control
- Spring Tool Suite:IDE
部署一个样本topology
样本topology覆盖了如何创建一个基本的storm topology,包括一个spout,bolt,编译它,执行它。
chapter 2 Setting up a Storm Cluster
上一章,我们看到如何写一个最小的storm拓扑运行在本地模式,一个单节点的storm cluste。这一章节,我们会做:
- 如何运行样本topology在分布式storm cluster中
- 如何配置topology的并行性
- 如何划分stream用不同stream分组
建议用偶数个ZooKeeper节点数目。ZooKeeper cluster继续工作只要绝大多数节点在运行。
我们会部署三节点的ZooKeeper集合处理一个节点的失败。
在ZooKeeper集合中,一个节点在cluster工作作为leader,其余事followers。如果leader 死了,选举新的leader在留下的节点中。所有写操作到leader节点,follower node只处理读操作。我们不能提升写操作的性能通过增加节点数目,因为写操作只去leader节点。
setting up a distributed storm cluster
设置三节点的storm cluster。一个节点是master节点,其余两个是worker节点(supervisors)。
Configuring the parallelism of a topology
在storm topology中有一些成分。topology的吞吐(处理速度)由每个成分并行运行的实例数目所决定。这是topology的并行性。
- The worker process:
Chapter 4 Storm and Kafka Integration
Apache Kafka是一个高贯穿的,分布的,容错的,和复制的消息系统首先在linkedin开发。Kafka使用情况包括log 聚集到流处理到替代其它消息系统。 Kafka形成为实时处理的关键组成部分,和Storm结合后。Kafka能作为一个缓冲器或者需要被Storm处理的消息。接下来,会讨论:
- Apache Kafka的综述和它与其它传统消息平台区别。
- 设置单节点和多借点kafka cluster.
- 产生数据到一个kafka partition
- 用kafka Spout在Storm 拓扑中消费从kafka来的数据
kafka的重要组成部分:
- The producer:
In kafka,messages are published by a producer to named entities called topics. A topic is a queue that can be consumed by multiple consumers. For parallelism, a Kafka topic can have multiple partitions. Reads and writes can happen to each partition in parallel.
在kafka,消息发布通过一个生产者到命名的实体叫做topics。一个主题是一个队列能被多个消费者消费。对于并行,一个kafka主题能有多个划分。读写能发生对于每个partition并行地。一个topic的每个partition存储在磁盘的不同目录。每个目录可以在不同磁盘,允许我们客服I/O限制在一个单独的磁盘中。一个单独topic的不同partitions能分配给不同的broker,因此增加吞吐作为每个partition,都是独立的。消息在一个partition有一个独特的sequence number关联它叫做offset。
- Replication:Kafka 支持topic的partitions的复制来容错。它自动处理partition的赋值,partition的复制被分配给不同的broker。Kafka选择一个broker作为partition的leader,所有的读写必须走向leader partitio。
- Consumer:一个消费者从一个broker读取消息的一个范围。group ID与每个consumer相关联。所有的消费者有相同的group ID看上去是一个单独的逻辑消费者。topic的每个消息发送给一个消费者从一个消费者组中。一个topic的不同消费者组能处理消息在他们自定义频率,因为消息不会从topics移除一旦他们被读取。事实上,是消费者的责任追踪有多少消息被消费。
- Broker:一个broker从一个生产者接收消息(push机制),发送消息到消费者(poll机制)。一个broker也处理消息在磁盘上的一致性。对于每个topic,会创建一个目录在磁盘上。这个目录会包含多个文件。kafka broker是很轻量级的,只打开文件句柄对于partitions来一致性消息和管理TCP连接。
- Data retention:每个人topic在kafka有一个关联的数据留存时间能在配置文件中控制。
Chapter 5 Exploring High-level abstraction in Storm with Trident
这章会包括:
- 介绍Trident
- Trident的数据模型
- Trident functions, filters, and projections
- Trident repartitioning操作
- Trident 总计 ...
介绍Trident
Trident是建立在Storm上的高级抽象。Trident支持有状态流处理,纯粹的Storm是一个无状态的处理框架。用Trident的主要优势在于每个消息进入拓扑只会被处理一次,再Vanilla Storm是难以达到的。Trident的概念和高层次块处理工具比如Cascading和Pig由Hadoop开发。Trident处理输入流作为小的块来达到处理一次在Storm。
Chapter 7 Integrating Storm with JMX,Fanglia,HBase, and Redis
Integrating Storm with HBase
Storm用于实时数据处理。在绝大多数例子,需要存储处理的数据在data store可以用存储的数据进一步分析,能执行分析查询在数据仓库中。如何存储处理过的数据在HBase中。
HBase是NoSQL,多维,稀疏,水平可扩展的数据模型建模在GoogleTable后。HBase建立在Hadoop上,它依赖于Hadoop,和MapReduce框架集成很好。Hadoop提供一下优势给HBase:
- 一个分布式数据仓库运行在日用硬件的顶部
- 容错fault tolerance
Integrating Storm with Redis
Redis is a key value data store. The key values can be strings, lists, sets, hashes, and so on. It is extremely fast because the entire dataset is stored in the memory.