Apache Storm是一个分布式实时大数据处理系统。Storm设计用于在容错和水平可扩展方法中处理大量数据。它是一个流数据框架,具有最高的摄取率。虽然Storm是无状态的,它通过Apache ZooKeeper管理分布式环境和集群状态。它很简单,您可以并行地对实时数据执行各种操作。
Apache Storm继续成为实时数据分析的领导者。Storm易于设置和操作,并且它保证每个消息将通过拓扑至少处理一次。
基本上Hadoop和Storm框架用于分析大数据。两者互补,在某些方面有所不同。Apache Storm执行除持久性之外的所有操作,而Hadoop在所有方面都很好,但滞后于实时计算。下表比较了Storm和Hadoop的属性。
Storm | Hadoop |
---|---|
实时流处理 | 批量处理 |
无状态 | 有状态 |
主/从架构与基于ZooKeeper的协调。主节点称为nimbus,从属节点是主管。 | 具有/不具有基于ZooKeeper的协调的主 - 从结构。主节点是作业跟踪器,从节点是任务跟踪器。 |
Storm流过程在集群上每秒可以访问数万条消息。 | Hadoop分布式文件系统(HDFS)使用MapReduce框架来处理大量的数据,需要几分钟或几小时。 |
Storm拓扑运行直到用户关闭或意外的不可恢复故障。 | MapReduce作业按顺序执行并最终完成。 |
两者都是分布式和容错的 | |
如果nimbus / supervisor死机,重新启动使它从它停止的地方继续,因此没有什么受到影响。 | 如果JobTracker死机,所有正在运行的作业都会丢失。 |
Apache Storm对于实时大数据流处理非常有名。因此,大多数公司都将Storm用作其系统的一个组成部分。一些值得注意的例子如下 -
Twitter - Twitter正在使用Apache Storm作为其“发布商分析产品”。 “发布商分析产品”处理Twitter平台中的每个tweets和点击。 Apache Storm与Twitter基础架构深度集成。
NaviSite - NaviSite正在使用Storm进行事件日志监控/审计系统。系统中生成的每个日志都将通过Storm。Storm将根据配置的正则表达式集检查消息,如果存在匹配,那么该特定消息将保存到数据库。
Wego - Wego是位于新加坡的旅行元搜索引擎。旅行相关数据来自世界各地的许多来源,时间不同。Storm帮助Wego搜索实时数据,解决并发问题,并为最终用户找到最佳匹配。
下面是Apache Storm提供的好处列表:
Storm是开源的,强大的,用户友好的。它可以用于小公司和大公司。
Storm是容错的,灵活的,可靠的,并且支持任何编程语言。
允许实时流处理。
Storm是令人难以置信的快,因为它具有巨大的处理数据的力量。
Storm可以通过线性增加资源来保持性能,即使在负载增加的情况下。它是高度可扩展的。
Storm在几秒钟或几分钟内执行数据刷新和端到端传送响应取决于问题。它具有非常低的延迟。
Storm有操作智能。
Storm提供保证的数据处理,即使群集中的任何连接的节点死或消息丢失。
Apache Storm从一端读取实时数据的原始流,并将其传递通过一系列小处理单元,并在另一端输出处理/有用的信息。
下图描述了Apache Storm的核心概念。
现在让我们仔细看看Apache Storm的组件 -
组件 | 描述 |
---|---|
Tuple | Tuple是Storm中的主要数据结构。它是有序元素的列表。默认情况下,Tuple支持所有数据类型。通常,它被建模为一组逗号分隔的值,并传递到Storm集群。 |
Stream | 流是元组的无序序列。 |
Spouts | 流的源。通常,Storm从原始数据源(如Twitter Streaming API,Apache Kafka队列,Kestrel队列等)接受输入数据。否则,您可以编写spouts以从数据源读取数据。“ISpout”是实现spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。 |
Bolts | Bolts是逻辑处理单元。Spouts将数据传递到Bolts和Bolts过程,并产生新的输出流。Bolts可以执行过滤,聚合,加入,与数据源和数据库交互的操作。Bolts接收数据并发射到一个或多个Bolts。 “IBolt”是实现Bolts的核心接口。一些常见的接口是IRichBolt,IBasicBolt等。 |
让我们来看一个“Twitter分析”的实时示例,看看如何在Apache Storm中建模。下图描述了结构。
“Twitter分析”的输入来自Twitter Streaming API。Spout将使用Twitter Streaming API读取用户的tweets,并作为元组流输出。来自spout的单个元组将具有twitter用户名和单个tweet作为逗号分隔值。然后,这个元组的蒸汽将被转发到Bolt,并且Bolt将tweet拆分成单个字,计算字数,并将信息保存到配置的数据源。现在,我们可以通过查询数据源轻松获得结果。
Spouts和Bolts连接在一起,形成拓扑结构。实时应用程序逻辑在Storm拓扑中指定。简单地说,拓扑是有向图,其中顶点是计算,边缘是数据流。
简单拓扑从spouts开始。Spouts将数据发射到一个或多个Bolts。Bolt表示拓扑中具有最小处理逻辑的节点,并且Bolts的输出可以发射到另一个Bolts作为输入。
Storm保持拓扑始终运行,直到您终止拓扑。Apache Storm的主要工作是运行拓扑,并在给定时间运行任意数量的拓扑。
现在你有一个关于Spouts和Bolts的基本想法。它们是拓扑的最小逻辑单元,并且使用单个Spout和Bolt阵列构建拓扑。应以特定顺序正确执行它们,以使拓扑成功运行。Storm执行的每个Spout和Bolt称为“任务”。简单来说,任务是Spouts或Bolts的执行。在给定时间,每个Spout和Bolt可以具有在多个单独的螺纹中运行的多个实例。
拓扑在多个工作节点上以分布式方式运行。Storm将所有工作节点上的任务均匀分布。工作节点的角色是监听作业,并在新作业到达时启动或停止进程。
数据流从Spouts流到Bolts,或从一个Bolts流到另一个Bolts。流分组控制元组在拓扑中的路由方式,并帮助我们了解拓扑中的元组流。有四个内置分组,如下所述。
在随机分组中,相等数量的元组随机分布在执行Bolts的所有工人中。下图描述了结构。
元组中具有相同值的字段组合在一起,其余的元组保存在外部。然后,具有相同字段值的元组被向前发送到执行Bolts的同一进程。例如,如果流由字段“字”分组,则具有相同字符串“Hello”的元组将移动到相同的工作者。下图显示了字段分组的工作原理。
所有流可以分组并向前到一个Bolts。此分组将源的所有实例生成的元组发送到单个目标实例(具体来说,选择具有最低ID的工作程序)。
所有分组将每个元组的单个副本发送到接收Bolts的所有实例。这种分组用于向Bolts发送信号。所有分组对于连接操作都很有用。
Apache Storm的主要亮点是,它是一个容错,快速,没有“单点故障”(SPOF)分布式应用程序。我们可以根据需要在多个系统中安装Apache Storm,以增加应用程序的容量。
让我们看看Apache Storm集群如何设计和其内部架构。下图描述了集群设计。
Apache Storm有两种类型的节点,Nimbus(主节点)和Supervisor(工作节点)。Nimbus是Apache Storm的核心组件。Nimbus的主要工作是运行Storm拓扑。Nimbus分析拓扑并收集要执行的任务。然后,它将任务分配给可用的supervisor。
Supervisor将有一个或多个工作进程。Supervisor将任务委派给工作进程。工作进程将根据需要产生尽可能多的执行器并运行任务。Apache Storm使用内部分布式消息传递系统来进行Nimbus和管理程序之间的通信。
组件 | 描述 |
---|---|
Nimbus(主节点) | Nimbus是Storm集群的主节点。集群中的所有其他节点称为工作节点。主节点负责在所有工作节点之间分发数据,向工作节点分配任务和监视故障。 |
Supervisor(工作节点) | 遵循指令的节点被称为Supervisors。Supervisor有多个工作进程,它管理工作进程以完成由nimbus分配的任务。 |
Worker process(工作进程) | 工作进程将执行与特定拓扑相关的任务。工作进程不会自己运行任务,而是创建执行器并要求他们执行特定的任务。工作进程将有多个执行器。 |
Executor(执行者) | 执行器只是工作进程产生的单个线程。执行器运行一个或多个任务,但仅用于特定的spout或bolt。 |
Task(任务) | 任务执行实际的数据处理。所以,它是一个spout或bolt。 |
ZooKeeper framework(ZooKeeper框架) | Apache的ZooKeeper的是使用群集(节点组)自己和维护具有强大的同步技术共享数据之间进行协调的服务。Nimbus是无状态的,所以它依赖于ZooKeeper来监视工作节点的状态。 ZooKeeper的帮助supervisor与nimbus交互。它负责维持nimbus,supervisor的状态。 |
Storm是无状态的。即使无状态性质有它自己的缺点,它实际上帮助Storm以最好的可能和最快的方式处理实时数据。
Storm虽然不是完全无状态的。它将其状态存储在Apache ZooKeeper中。由于状态在Apache ZooKeeper中可用,故障的网络可以重新启动,并从它离开的地方工作。通常,像monit这样的服务监视工具将监视Nimbus,并在出现任何故障时重新启动它。
Apache Storm还有一个称为Trident拓扑的高级拓扑,它具有状态维护,并且还提供了一个高级API,如Pig。我们将在接下来的章节中讨论所有这些功能。
一个工作的Storm集群应该有一个Nimbus和一个或多个supervisors。另一个重要的节点是Apache ZooKeeper,它将用于nimbus和supervisors之间的协调。
现在让我们仔细看看Apache Storm的工作流程 −
最初,nimbus将等待“Storm拓扑”提交给它。
一旦提交拓扑,它将处理拓扑并收集要执行的所有任务和任务将被执行的顺序。
然后,nimbus将任务均匀分配给所有可用的supervisors。
在特定的时间间隔,所有supervisor将向nimbus发送心跳以通知它们仍然运行着。
当supervisor终止并且不向心跳发送心跳时,则nimbus将任务分配给另一个supervisor。
当nimbus本身终止时,supervisor将在没有任何问题的情况下对已经分配的任务进行工作。
一旦所有的任务都完成后,supervisor将等待新的任务进去。
同时,终止nimbus将由服务监控工具自动重新启动。
重新启动的网络将从停止的地方继续。同样,终止supervisor也可以自动重新启动。由于网络管理程序和supervisor都可以自动重新启动,并且两者将像以前一样继续,因此Storm保证至少处理所有任务一次。
一旦处理了所有拓扑,则网络管理器等待新的拓扑到达,并且类似地,管理器等待新的任务。
默认情况下,Storm集群中有两种模式:
本地模式 -此模式用于开发,测试和调试,因为它是查看所有拓扑组件协同工作的最简单方法。在这种模式下,我们可以调整参数,使我们能够看到我们的拓扑如何在不同的Storm配置环境中运行。在本地模式下,storm拓扑在本地机器上在单个JVM中运行。
生产模式 -在这种模式下,我们将拓扑提交到工作Storm集群,该集群由许多进程组成,通常运行在不同的机器上。如在storm的工作流中所讨论的,工作集群将无限地运行,直到它被关闭。
Apache Storm处理实时数据,并且输入通常来自消息排队系统。外部分布式消息系统将提供实时计算所需的输入。Spout将从消息系统读取数据,并将其转换为元组并输入到Apache Storm中。有趣的是,Apache Storm在内部使用其自己的分布式消息传递系统,用于其nimbus和主管之间的通信。
分布式消息传递基于可靠消息队列的概念。消息在客户端应用程序和消息系统之间异步排队。分布式消息传递系统提供可靠性,可扩展性和持久性的好处。
大多数消息模式遵循发布 - 订阅模型(简称发布 - 订阅),其中消息的发送者称为发布者,而想要接收消息的那些被称为订阅者。
一旦消息已经被发送者发布,订阅者可以在过滤选项的帮助下接收所选择的消息。通常我们有两种类型的过滤,一种是基于主题的过滤,另一种是基于内容的过滤。
需要注意的是,pub-sub模型只能通过消息进行通信。它是一个非常松散耦合的架构;甚至发件人不知道他们的订阅者是谁。许多消息模式使消息代理能够交换发布消息以便由许多订户及时访问。一个现实生活的例子是Dish电视,它发布不同的渠道,如运动,电影,音乐等,任何人都可以订阅自己的频道集,并获得他们订阅的频道时可用。
下表描述了一些流行的高吞吐量消息传递系统 -
分布式消息系统 | 描述 |
---|---|
Apache Kafka | Kafka是在LinkedIn公司开发的,后来它成为Apache的一个子项目。 Apache Kafka基于brokerenabled的,持久的,分布式的发布订阅模型。 Kafka是快速,可扩展和高效的。 |
RabbitMQ | RabbitMQ是一个开源的分布式鲁棒消息应用程序。它易于使用并在所有平台上运行。 |
JMS(Java Message Service) | JMS是一个开源API,支持创建,读取和从一个应用程序向另一个应用程序发送消息。它提供有保证的消息传递并遵循发布 - 订阅模型。 |
ActiveMQ | ActiveMQ消息系统是JMS的开源API。 |
ZeroMQ | ZeroMQ是无代理的对等体消息处理。它提供推拉,路由器 - 经销商消息模式。 |
Kestrel | Kestrel是一个快速,可靠,简单的分布式消息队列。 |
Thrift在Facebook上构建,用于跨语言服务开发和远程过程调用(RPC)。后来,它成为一个开源的Apache项目。Apache Thrift是一种接口定义语言,允许以容易的方式在定义的数据类型之上定义新的数据类型和服务实现。
Apache Thrift也是一个支持嵌入式系统,移动应用程序,Web应用程序和许多其他编程语言的通信框架。与Apache Thrift相关的一些关键功能是它的模块化,灵活性和高性能。此外,它可以在分布式应用程序中执行流式处理,消息传递和RPC。
Storm广泛使用Thrift协议进行内部通信和数据定义。Storm拓扑只是Thrift Structs。在Apache Storm中运行拓扑的Storm Nimbus是一个Thrift服务。
现在,让我们来看看如何在你的机器上安装Apache Storm框架。这里有三个步骤 -
使用以下命令检查系统上是否已安装Java。
$ java -version
如果Java已经存在,那么你会看到它的版本号。否则,下载最新版本的JDK。
使用以下链接 - www.oracle.com下载最新版本的JDK
最新版本为JDK 8u 60,文件为“jdk-8u60-linux-x64.tar.gz”。在您的机器上下载文件。
通常文件被下载到下载文件夹。使用以下命令解压tar设置。
$ cd /go/to/download/path$ tar -zxf jdk-8u60-linux-x64.gz
要使Java对所有用户可用,请将提取的Java内容移动到“/ usr / local / java”文件夹。
$ supassword: (type password of root user)$ mkdir /opt/jdk$ mv jdk-1.8.0_60 /opt/jdk/
要设置路径和JAVA_HOME变量,请将以下命令添加到〜/ .bashrc文件。
export JAVA_HOME =/usr/jdk/jdk-1.8.0_60export PATH=$PATH:$JAVA_HOME/bin
现在将所有更改应用到当前运行的系统。
$ source ~/.bashrc
使用以下命令更改Java替代项。
update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100
现在使用第1步中解释的验证命令(java -version)验证Java安装。
要在您的计算机上安装ZooKeeper框架,请访问以下链接并下载最新版本的ZooKeeper http://zookeeper.apache.org/releases.html
到目前为止,最新版本的ZooKeeper是3.4.6(ZooKeeper-3.4.6.tar.gz)。
使用以下命令解压tar文件 -
$ cd opt/$ tar -zxf zookeeper-3.4.6.tar.gz$ cd zookeeper-3.4.6$ mkdir data
使用命令“vi conf / zoo.cfg”打开名为“conf / zoo.cfg”的配置文件,并将所有以下参数设置为起点。
$ vi conf/zoo.cfgtickTime=2000dataDir=/path/to/zookeeper/dataclientPort=2181initLimit=5syncLimit=2
配置文件保存成功后,可以启动ZooKeeper服务器。
使用以下命令启动ZooKeeper服务器。
$ bin/zkServer.sh start
执行此命令后,您将收到一个响应如下 -
$ JMX enabled by default$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg$ Starting zookeeper ... STARTED
使用以下命令启动CLI。
$ bin/zkCli.sh
执行上述命令后,您将连接到ZooKeeper服务器并获得以下响应。
Connecting to localhost:2181................................................Welcome to ZooKeeper!................................WATCHER::WatchedEvent state:SyncConnected type: None path:null[zk: localhost:2181(CONNECTED) 0]
连接服务器并执行所有操作后,可以使用以下命令停止ZooKeeper服务器。
bin/zkServer.sh stop
您已成功在计算机上安装Java和ZooKeeper。现在让我们看看安装Apache Storm框架的步骤。
要在您的计算机上安装Storm框架,请访问以下链接并下载最新版本的Storm http://storm.apache.org/downloads.html
到目前为止,最新版本的Storm是“apache-storm-0.9.5.tar.gz”。
使用以下命令解压tar文件
$ cd opt/$ tar -zxf apache-storm-0.9.5.tar.gz$ cd apache-storm-0.9.5$ mkdir data
当前版本的Storm在“conf / storm.yaml”中包含一个配置Storm守护程序的文件。将以下信息添加到该文件。
$ vi conf/storm.yamlstorm.zookeeper.servers: - "localhost"storm.local.dir: “/path/to/storm/data(any path)”nimbus.host: "localhost"supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
应用所有更改后,保存并返回到终端。
$ bin/storm nimbus
$ bin/storm supervisor
$ bin/storm ui
启动Storm用户界面应用程序后,在您最喜欢的浏览器中键入URL http:// localhost:8080,您可以看到Storm群集信息及其运行的拓扑。该页面应类似于以下屏幕截图。
我们已经经历了Apache Storm的核心技术细节,现在是时候编写一些简单的场景。
移动呼叫及其持续时间将作为对Apache Storm的输入,Storm将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数。
Spout是用于数据生成的组件。基本上,一个spout将实现一个IRichSpout接口。 “IRichSpout”接口有以下重要方法 -
open -为Spout提供执行环境。执行器将运行此方法来初始化喷头。
nextTuple -通过收集器发出生成的数据。
close -当spout将要关闭时调用此方法。
declareOutputFields -声明元组的输出模式。
ack -确认处理了特定元组。
fail -指定不处理和不重新处理特定元组。
open方法的签名如下 -
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf - 为此spout提供storm配置。
context - 提供有关拓扑中的spout位置,其任务ID,输入和输出信息的完整信息。
collector - 使我们能够发出将由bolts处理的元组。
nextTuple方法的签名如下 -
nextTuple()
nextTuple()从与ack()和fail()方法相同的循环中定期调用。它必须释放线程的控制,当没有工作要做,以便其他方法有机会被调用。因此,nextTuple的第一行检查处理是否已完成。如果是这样,它应该休眠至少一毫秒,以减少处理器在返回之前的负载。
close方法的签名如下-
close()
declareOutputFields方法的签名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
declarer -它用于声明输出流id,输出字段等
此方法用于指定元组的输出模式。
ack方法的签名如下 -
ack(Object msgId)
该方法确认已经处理了特定元组。
nextTuple方法的签名如下-
ack(Object msgId)
此方法通知特定元组尚未完全处理。 Storm将重新处理特定的元组。
在我们的场景中,我们需要收集呼叫日志详细信息。呼叫日志的信息包含。
由于我们没有呼叫日志的实时信息,我们将生成假呼叫日志。假信息将使用Random类创建。完整的程序代码如下。
import java.util.*;//import storm tuple packagesimport backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;//import Spout interface packagesimport backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;//Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; }}
Bolt是一个使用元组作为输入,处理元组,并产生新的元组作为输出的组件。Bolts将实现IRichBolt接口。在此程序中,使用两个Bolts
类CallLogCreatorBolt和CallLogCounterBolt来执行操作。
IRichBolt接口有以下方法 -
prepare -为bolt提供要执行的环境。执行器将运行此方法来初始化spout。
execute -处理单个元组的输入
cleanup -当spout要关闭时调用。
declareOutputFields -声明元组的输出模式。
prepare(Map conf, TopologyContext context, OutputCollector collector)
conf -为此bolt提供Storm配置。
context -提供有关拓扑中的bolt位置,其任务ID,输入和输出信息等的完整信息。
collector -使我们能够发出处理的元组。
execute方法的签名如下-
execute(Tuple tuple)
这里的元组是要处理的输入元组。
execute方法一次处理单个元组。元组数据可以通过Tuple类的getValue方法访问。不必立即处理输入元组。多元组可以被处理和输出为单个输出元组。处理的元组可以通过使用OutputCollector类发出。
cleanup方法的签名如下 -
cleanup()
declareOutputFields方法的签名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
这里的参数declarer用于声明输出流id,输出字段等。
此方法用于指定元组的输出模式。
呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。完整的代码如下。
//import util packagesimport java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;//import Storm IRichBolt packageimport backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;//Create a class CallLogCreatorBolt which implement IRichBolt interfacepublic class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; }}
呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。完整的代码如下。
import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Storm拓扑基本上是一个Thrift结构。 TopologyBuilder类提供了简单而容易的方法来创建复杂的拓扑。TopologyBuilder类具有设置spout(setSpout)和设置bolt(setBolt)的方法。最后,TopologyBuilder有createTopology来创建拓扑。使用以下代码片段创建拓扑 -
TopologyBuilder builder = new TopologyBuilder();builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout");builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping和fieldsGrouping方法有助于为spout和bolts设置流分组。
为了开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑。 “submitTopology”的参数之一是“Config”类的实例。“Config”类用于在提交拓扑之前设置配置选项。此配置选项将在运行时与集群配置合并,并使用prepare方法发送到所有任务(spout和bolt)。一旦拓扑提交到集群,我们将等待10秒钟,集群计算提交的拓扑,然后使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下 -
import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;//import storm configuration packagesimport backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;//Create main class LogAnalyserStorm submit topology.public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); }}
完整的应用程序有四个Java代码。它们是 -
应用程序可以使用以下命令构建 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
应用程序可以使用以下命令运行 -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
一旦应用程序启动,它将输出有关集群启动过程,spout和螺栓处理的完整详细信息,最后是集群关闭过程。在“CallLogCounterBolt”中,我们打印了呼叫及其计数详细信息。此信息将显示在控制台上如下 -
1234123402 - 1234123401 : 781234123402 - 1234123404 : 881234123402 - 1234123403 : 1051234123401 - 1234123404 : 741234123401 - 1234123403 : 811234123401 - 1234123402 : 811234123403 - 1234123404 : 861234123404 - 1234123401 : 631234123404 - 1234123402 : 821234123403 - 1234123402 : 831234123404 - 1234123403 : 861234123403 - 1234123401 : 93
Storm拓扑通过Thrift接口实现,这使得轻松地提交任何语言的拓扑。Storm支持Ruby,Python和许多其他语言。让我们来看看python绑定。
Python是一种通用的解释,交互,面向对象和高级编程语言。Storm支持Python实现其拓扑。Python支持发射,锚定,acking和日志操作。
如你所知,bolt可以用任何语言定义。用另一种语言编写的bolt作为子进程执行,Storm通过stdin / stdout与JSON消息进行通信。首先拿一个支持python绑定的样例bolt WordCount。
public static class WordCount implements IRichBolt { public WordSplit() { super("python", "splitword.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }}
这里的类WordCount实现IRichBolt接口和运行与python实现指定超级方法参数“splitword.py”。现在创建一个名为“splitword.py”的python实现。
import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word])WordCountBolt().run()
这是Python的示例实现,它计算给定句子中的单词。同样,您也可以与其他支持语言绑定。
Trident是Storm的延伸。像Storm,Trident也是由Twitter开发的。开发Trident的主要原因是在Storm上提供高级抽象,以及状态流处理和低延迟分布式查询。
Trident使用spout和bolt,但是这些低级组件在执行之前由Trident自动生成。 Trident具有函数,过滤器,联接,分组和聚合。
Trident将流处理为一系列批次,称为事务。通常,这些小批量的大小将是大约数千或数百万个元组,这取决于输入流。这样,Trident不同于Storm,它执行元组一元组处理。
批处理概念非常类似于数据库事务。每个事务都分配了一个事务ID。该事务被认为是成功的,一旦其所有的处理完成。然而,处理事务的元组中的一个的失败将导致整个事务被重传。对于每个批次,Trident将在事务开始时调用beginCommit,并在结束时提交。
Trident API公开了一个简单的选项,使用“TridentTopology”类创建Trident拓扑。基本上,Trident拓扑从流出接收输入流,并对流上执行有序的操作序列(滤波,聚合,分组等)。Storm元组被替换为Trident元组,bolt被操作替换。一个简单的Trident拓扑可以创建如下 -
TridentTopology topology = new TridentTopology();
Trident Tuples是一个命名的值列表。TridentTuple接口是Trident拓扑的数据模型。TridentTuple接口是可由Trident拓扑处理的数据的基本单位。
Trident spout与类似于Storm spout,附加选项使用Trident的功能。实际上,我们仍然可以使用IRichSpout,我们在Storm拓扑中使用它,但它本质上是非事务性的,我们将无法使用Trident提供的优点。
具有使用Trident的特征的所有功能的基本spout是“ITridentSpout”。它支持事务和不透明的事务语义。其他的spouts是IBatchSpout,IPartitionedTridentSpout和IOpaquePartitionedTridentSpout。
除了这些通用spouts,Trident有许多样品实施trident spout。其中之一是FeederBatchSpout输出,我们可以使用它发送trident tuples的命名列表,而不必担心批处理,并行性等。
FeederBatchSpout创建和数据馈送可以如下所示完成 -
TridentTopology topology = new TridentTopology();FeederBatchSpout testSpout = new FeederBatchSpout( ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));topology.newStream("fixed-batch-spout", testSpout)testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
Trident依靠“Trident操作”来处理trident tuples的输入流。Trident API具有多个内置操作来处理简单到复杂的流处理。这些操作的范围从简单验证到复杂的trident tuples分组和聚合。让我们经历最重要和经常使用的操作。
过滤器是用于执行输入验证任务的对象。Trident过滤器获取trident tuples字段的子集作为输入,并根据是否满足某些条件返回真或假。如果返回true,则该元组保存在输出流中;否则,从流中移除元组。过滤器将基本上继承自BaseFilter类并实现isKeep方法。这里是一个滤波器操作的示例实现 -
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(1) % 2 == 0; }}input[1, 2][1, 3][1, 4]output[1, 2][1, 4]
可以使用“each”方法在拓扑中调用过滤器功能。“Fields”类可以用于指定输入(trident tuple的子集)。示例代码如下 -
TridentTopology topology = new TridentTopology();topology.newStream("spout", spout).each(new Fields("a", "b"), new MyFilter())
函数是用于对单个trident tuple执行简单操作的对象。它需要一个trident tuple字段的子集,并发出零个或多个新的trident tuple字段。
函数基本上从BaseFunction类继承并实现execute方法。下面给出了一个示例实现:
public class MyFunction extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { int a = tuple.getInteger(0); int b = tuple.getInteger(1); collector.emit(new Values(a + b)); }}input[1, 2][1, 3][1, 4]output[1, 2, 3][1, 3, 4][1, 4, 5]
与过滤操作类似,可以使用每个方法在拓扑中调用函数操作。示例代码如下 -
TridentTopology topology = new TridentTopology();topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
聚合是用于对输入批处理或分区或流执行聚合操作的对象。Trident有三种类型的聚合。他们如下 -
aggregate -单独聚合每批trident tuple。在聚合过程期间,首先使用全局分组将元组重新分区,以将同一批次的所有分区组合到单个分区中。
partitionAggregate -聚合每个分区,而不是整个trident tuple。分区集合的输出完全替换输入元组。分区集合的输出包含单个字段元组。
persistentaggregate -聚合所有批次中的所有trident tuple,并将结果存储在内存或数据库中。
TridentTopology topology = new TridentTopology();// aggregate operationtopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .aggregate(new Count(), new Fields(“count”)) // partitionAggregate operationtopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .partitionAggregate(new Count(), new Fields(“count")) // persistentAggregate - saving the count to memorytopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
可以使用CombinerAggregator,ReducerAggregator或通用Aggregator接口创建聚合操作。上面例子中使用的“计数”聚合器是内置聚合器之一,它使用“CombinerAggregator”实现,实现如下 -
public class Count implements CombinerAggregator<Long> { @Override public Long init(TridentTuple tuple) { return 1L; } @Override public Long combine(Long val1, Long val2) { return val1 + val2; } @Override public Long zero() { return 0L; }}
分组操作是一个内置操作,可以由groupBy方法调用。groupBy方法通过在指定字段上执行partitionBy来重新分区流,然后在每个分区中,它将组字段相等的元组组合在一起。通常,我们使用“groupBy”以及“persistentAggregate”来获得分组聚合。示例代码如下 -
TridentTopology topology = new TridentTopology();// persistentAggregate - saving the count to memorytopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .groupBy(new Fields(“d”) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
合并和连接可以分别通过使用“合并”和“连接”方法来完成。合并组合一个或多个流。加入类似于合并,除了加入使用来自两边的trident tuple字段来检查和连接两个流的事实。此外,加入将只在批量级别工作。示例代码如下 -
TridentTopology topology = new TridentTopology();topology.merge(stream1, stream2, stream3);topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
Trident提供了状态维护的机制。状态信息可以存储在拓扑本身中,否则也可以将其存储在单独的数据库中。原因是维护一个状态,如果任何元组在处理过程中失败,则重试失败的元组。这会在更新状态时产生问题,因为您不确定此元组的状态是否已在之前更新过。如果在更新状态之前元组已经失败,则重试该元组将使状态稳定。然而,如果元组在更新状态后失败,则重试相同的元组将再次增加数据库中的计数并使状态不稳定。需要执行以下步骤以确保消息仅处理一次 -
小批量处理元组。
为每个批次分配唯一的ID。如果重试批次,则给予相同的唯一ID。
状态更新在批次之间排序。例如,第二批次的状态更新将不可能,直到第一批次的状态更新完成为止。
分布式RPC用于查询和检索Trident拓扑结果。 Storm有一个内置的分布式RPC服务器。分布式RPC服务器从客户端接收RPC请求并将其传递到拓扑。拓扑处理请求并将结果发送到分布式RPC服务器,分布式RPC服务器将其重定向到客户端。Trident的分布式RPC查询像正常的RPC查询一样执行,除了这些查询并行运行的事实。
在许多使用情况下,如果要求是只处理一次查询,我们可以通过在Trident中编写拓扑来实现。另一方面,在Storm的情况下将难以实现精确的一次处理。因此,Trident将对那些需要一次处理的用例有用。Trident不适用于所有用例,特别是高性能用例,因为它增加了Storm的复杂性并管理状态。
我们将把上一节中制定的呼叫日志分析器应用程序转换为Trident框架。由于其高级API,Trident应用程序将比普通风暴更容易。Storm基本上需要执行Trident中的Function,Filter,Aggregate,GroupBy,Join和Merge操作中的任何一个。最后,我们将使用LocalDRPC类启动DRPC服务器,并使用LocalDRPC类的execute方法搜索一些关键字。
FormatCall类的目的是格式化包括“呼叫者号码”和“接收者号码”的呼叫信息。完整的程序代码如下 -
import backtype.storm.tuple.Values;import storm.trident.operation.BaseFunction;import storm.trident.operation.TridentCollector;import storm.trident.tuple.TridentTuple;public class FormatCall extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String fromMobileNumber = tuple.getString(0); String toMobileNumber = tuple.getString(1); collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber)); }}
CSVSplit类的目的是基于“comma(,)”拆分输入字符串,并发出字符串中的每个字。此函数用于解析分布式查询的输入参数。完整的代码如下 -
import backtype.storm.tuple.Values;import storm.trident.operation.BaseFunction;import storm.trident.operation.TridentCollector;import storm.trident.tuple.TridentTuple;public class CSVSplit extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { for(String word: tuple.getString(0).split(",")) { if(word.length() > 0) { collector.emit(new Values(word)); } } }}
这是主要的应用程序。最初,应用程序将使用FeederBatchSpout初始化TridentTopology并提供调用者信息。Trident拓扑流可以使用TridentTopology类的newStream方法创建。类似地,Trident拓扑DRPC流可以使用TridentTopology类的newDRCPStream方法创建。可以使用LocalDRPC类创建一个简单的DRCP服务器。LocalDRPC有execute方法来搜索一些关键字。完整的代码如下。
import java.util.*;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.LocalDRPC;import backtype.storm.utils.DRPCClient;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import storm.trident.TridentState;import storm.trident.TridentTopology;import storm.trident.tuple.TridentTuple;import storm.trident.operation.builtin.FilterNull;import storm.trident.operation.builtin.Count;import storm.trident.operation.builtin.Sum;import storm.trident.operation.builtin.MapGet;import storm.trident.operation.builtin.Debug;import storm.trident.operation.BaseFilter;import storm.trident.testing.FixedBatchSpout;import storm.trident.testing.FeederBatchSpout;import storm.trident.testing.Split;import storm.trident.testing.MemoryMapState;import com.google.common.collect.ImmutableList;public class LogAnalyserTrident { public static void main(String[] args) throws Exception { System.out.println("Log Analyser Trident"); TridentTopology topology = new TridentTopology(); FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber", "toMobileNumber", "duration")); TridentState callCounts = topology .newStream("fixed-batch-spout", testSpout) .each(new Fields("fromMobileNumber", "toMobileNumber"), new FormatCall(), new Fields("call")) .groupBy(new Fields("call")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); LocalDRPC drpc = new LocalDRPC(); topology.newDRPCStream("call_count", drpc) .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count")); topology.newDRPCStream("multiple_call_count", drpc) .each(new Fields("args"), new CSVSplit(), new Fields("call")) .groupBy(new Fields("call")) .stateQuery(callCounts, new Fields("call"), new MapGet(), new Fields("count")) .each(new Fields("call", "count"), new Debug()) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("trident", conf, topology.build()); Random randomGenerator = new Random(); int idx = 0; while(idx < 10) { testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123403", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123404", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123402", "1234123403", randomGenerator.nextInt(60)))); idx = idx + 1; } System.out.println("DRPC : Query starts"); System.out.println(drpc.execute("call_count","1234123401 - 1234123402")); System.out.println(drpc.execute("multiple_call_count", "1234123401 - 1234123402,1234123401 - 1234123403")); System.out.println("DRPC : Query ends"); cluster.shutdown(); drpc.shutdown(); // DRPCClient client = new DRPCClient("drpc.server.location", 3772); }}
完整的应用程序有三个Java代码。他们如下 -
可以使用以下命令构建应用程序 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以使用以下命令运行应用程序 -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident
一旦应用程序启动,应用程序将输出有关集群启动过程,操作处理,DRPC服务器和客户端信息的完整详细信息,以及最后的集群关闭过程。此输出将显示在控制台上,如下所示。
DRPC : Query starts[["1234123401 - 1234123402",10]]DEBUG: [1234123401 - 1234123402, 10]DEBUG: [1234123401 - 1234123403, 10][[20]]DRPC : Query ends
在本章中,我们将讨论Apache Storm的实时应用程序。我们将看到Storm如何在Twitter中使用。
Twitter是一种在线社交网络服务,提供发送和接收用户推文的平台。注册用户可以阅读和发布tweet,但未注册的用户只能阅读tweets。 Hashtag用于按关键字在相关关键字之前附加#来对tweet进行分类。现在让我们来看一个实时场景,找到每个主题使用最多的hashtag。
spout的目的是尽快收到人们提交的tweets。Twitter提供了“Twitter Streaming API”,一个基于Web服务的工具,用于实时检索人们提交的tweets。Twitter Streaming API可以使用任何编程语言访问。
twitter4j是一个开源的非官方Java库,它提供了一个基于Java的模块,可以轻松访问Twitter Streaming API。twitter4j提供了一个基于监听器的框架来访问tweet。要访问Twitter Streaming API,我们需要登录Twitter开发人员帐户,并获取以下OAuth身份验证详细信息。
Storm在其入门套件中提供了一个twitter spout,TwitterSampleSpout。我们将使用它来检索tweet。该邮件需要OAuth身份验证详细信息和至少一个关键字。该spout将发出基于关键字的实时tweet。完整的程序代码如下。
import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import twitter4j.FilterQuery;import twitter4j.StallWarning;import twitter4j.Status;import twitter4j.StatusDeletionNotice;import twitter4j.StatusListener;import twitter4j.TwitterStream;import twitter4j.TwitterStreamFactory;import twitter4j.auth.AccessToken;import twitter4j.conf.ConfigurationBuilder;import backtype.storm.Config;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;@SuppressWarnings("serial")public class TwitterSampleSpout extends BaseRichSpout { SpoutOutputCollector _collector; LinkedBlockingQueue<Status> queue = null; TwitterStream _twitterStream; String consumerKey; String consumerSecret; String accessToken; String accessTokenSecret; String[] keyWords; public TwitterSampleSpout(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, String[] keyWords) { this.consumerKey = consumerKey; this.consumerSecret = consumerSecret; this.accessToken = accessToken; this.accessTokenSecret = accessTokenSecret; this.keyWords = keyWords; } public TwitterSampleSpout() { // TODO Auto-generated constructor stub } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { queue = new LinkedBlockingQueue<Status>(1000); _collector = collector; StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); } @Override public void onDeletionNotice(StatusDeletionNotice sdn) {} @Override public void onTrackLimitationNotice(int i) {} @Override public void onScrubGeo(long l, long l1) {} @Override public void onException(Exception ex) {} @Override public void onStallWarning(StallWarning arg0) { // TODO Auto-generated method stub } }; ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret); _twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); _twitterStream.addListener(listener); if (keyWords.length == 0) { _twitterStream.sample(); }else { FilterQuery query = new FilterQuery().track(keyWords); _twitterStream.filter(query); } } @Override public void nextTuple() { Status ret = queue.poll(); if (ret == null) { Utils.sleep(50); } else { _collector.emit(new Values(ret)); } } @Override public void close() { _twitterStream.shutdown(); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } @Override public void ack(Object id) {} @Override public void fail(Object id) {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweet")); }}
由spout发出的tweet将被转发到HashtagReaderBolt,它将处理该tweet并发出所有可用的hashtag。HashtagReaderBolt使用twitter4j提供的getHashTagEntities方法。getHashTagEntities读取tweet并返回hashtag的列表。完整的程序代码如下 -
import java.util.HashMap;import java.util.Map;import twitter4j.*;import twitter4j.conf.*;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class HashtagReaderBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { Status tweet = (Status) tuple.getValueByField("tweet"); for(HashtagEntity hashtage : tweet.getHashtagEntities()) { System.out.println("Hashtag: " + hashtage.getText()); this.collector.emit(new Values(hashtage.getText())); } } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
发出的hashtag将被转发到HashtagCounterBolt。这个bolt将处理所有的hashtags,并使用Java Map对象将每个hashtags及其计数保存在内存中。完整的程序代码如下。
import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class HashtagCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String key = tuple.getString(0); if(!counterMap.containsKey(key)){ counterMap.put(key, 1); }else{ Integer c = counterMap.get(key) + 1; counterMap.put(key, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println("Result: " + entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
提交拓扑是主要应用程序。Twitter拓扑由TwitterSampleSpout,HashtagReaderBolt和HashtagCounterBolt组成。以下程序代码显示如何提交拓扑。
import java.util.*;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;public class TwitterHashtagStorm { public static void main(String[] args) throws Exception{ String consumerKey = args[0]; String consumerSecret = args[1]; String accessToken = args[2]; String accessTokenSecret = args[3]; String[] arguments = args.clone(); String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keyWords)); builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt()) .shuffleGrouping("twitter-spout"); builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt()) .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TwitterHashtagStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); }}
完整的应用程序有四个Java代码。他们如下 -
您可以使用以下命令编译应用程序 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java
使用以下命令执行应用程序 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret><keyword1> <keyword2> … <keywordN>
应用程序将打印当前可用的主题标签及其计数。输出应类似于以下内容 -
Result: jazztastic : 1Result: foodie : 1Result: Redskins : 1Result: Recipe : 1Result: cook : 1Result: android : 1Result: food : 2Result: NoToxicHorseMeat : 1Result: Purrs4Peace : 1Result: livemusic : 1Result: VIPremium : 1Result: Frome : 1Result: SundayRoast : 1Result: Millennials : 1Result: HealthWithKier : 1Result: LPs30DaysofGratitude : 1Result: cooking : 1Result: gameinsight : 1Result: Countryfile : 1Result: androidgames : 1
雅虎财经是互联网领先的商业新闻和金融数据网站。它是雅虎的一部分,并提供有关金融新闻,市场统计,国际市场数据和其他任何人都可以访问的财务资源信息。
如果您是注册的Yahoo!用户,那么您可以自定义Yahoo! Finance以利用其特定产品。Yahoo! Finance API用于从Yahoo!查询财务数据。
此API显示实时延迟15分钟的数据,并每1分钟更新其数据库,以访问当前股票相关信息。现在让我们看一家公司的实时情景,看看当公司的股票价值低于100时如何提高警报。
spout的目的是获得公司的详细信息,并发出价格spout。您可以使用以下程序代码创建spout。
import java.util.*;import java.io.*;import java.math.BigDecimal;//import yahoofinace packagesimport yahoofinance.YahooFinance;import yahoofinance.Stock;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;public class YahooFinanceSpout implements IRichSpout { private SpoutOutputCollector collector; private boolean completed = false; private TopologyContext context; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){ this.context = context; this.collector = collector; } @Override public void nextTuple() { try { Stock stock = YahooFinance.get("INTC"); BigDecimal price = stock.getQuote().getPrice(); this.collector.emit(new Values("INTC", price.doubleValue())); stock = YahooFinance.get("GOOGL"); price = stock.getQuote().getPrice(); this.collector.emit(new Values("GOOGL", price.doubleValue())); stock = YahooFinance.get("AAPL"); price = stock.getQuote().getPrice(); this.collector.emit(new Values("AAPL", price.doubleValue())); } catch(Exception e) {} } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("company", "price")); } @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
这里的的目的是当价格低于100时处理给定公司的价格。它使用Java Map对象在股价低于100时设置截止价格限制警报为真;否则为false。完整的程序代码如下 -
import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class PriceCutOffBolt implements IRichBolt { Map<String, Integer> cutOffMap; Map<String, Boolean> resultMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.cutOffMap = new HashMap <String, Integer>(); this.cutOffMap.put("INTC", 100); this.cutOffMap.put("AAPL", 100); this.cutOffMap.put("GOOGL", 100); this.resultMap = new HashMap<String, Boolean>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String company = tuple.getString(0); Double price = tuple.getDouble(1); if(this.cutOffMap.containsKey(company)){ Integer cutOffPrice = this.cutOffMap.get(company); if(price < cutOffPrice) { this.resultMap.put(company, true); } else { this.resultMap.put(company, false); } } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("cut_off_price")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
这是YahooFinanceSpout.java和PriceCutOffBolt.java连接在一起并生成拓扑的主要应用程序。以下程序代码显示了如何提交拓扑。
import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;public class YahooFinanceStorm { public static void main(String[] args) throws Exception{ Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout()); builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt()) .fieldsGrouping("yahoo-finance-spout", new Fields("company")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); }}
完整的应用程序有三个Java代码。他们如下 -
应用程序可以使用以下命令构建 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java
应用程序可以使用以下命令运行 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.YahooFinanceStorm
输出将类似于以下内容 -
GOOGL : falseAAPL : falseINTC : true
Apache Storm框架支持许多当今最好的工业应用程序。我们将在本章中简要介绍Storm的一些最显着的应用。
Klout是一个应用程序,它使用社交媒体分析,根据在线社交影响力通过Klout得分,这是一个介于1和100之间的数值对用户排名。Klout使用Apache Storm的内置Trident抽象来创建流数据的复杂拓扑。
天气频道使用Storm拓扑来获取天气数据。它绑定了Twitter,以在Twitter和移动应用程序启用天气知道的广告。OpenSignal是一家专门从事无线覆盖制图的公司。StormTag和WeatherSignal是由OpenSignal创建的基于天气的项目。StormTag是一个蓝牙气象站,连接到钥匙串。由设备收集的天气数据发送到WeatherSignal应用程序和OpenSignal服务器。
电信提供商每秒处理数百万的电话呼叫。他们对掉话和低音质进行取证。呼叫详细记录以每秒百万的速率流入,Apache Storm实时处理这些流并识别任何令人不安的模式。Storm分析可以用来不断提高通话质量。
Apache Storm是一个分布式实时大数据处理系统。Storm设计用于在容错和水平可扩展方法中处理大量数据。它是一个流数据框架,具有最高的摄取率。虽然Storm是无状态的,它通过Apache ZooKeeper管理分布式环境和集群状态。它很简单,您可以并行地对实时数据执行各种操作。
Apache Storm继续成为实时数据分析的领导者。Storm易于设置和操作,并且它保证每个消息将通过拓扑至少处理一次。
基本上Hadoop和Storm框架用于分析大数据。两者互补,在某些方面有所不同。Apache Storm执行除持久性之外的所有操作,而Hadoop在所有方面都很好,但滞后于实时计算。下表比较了Storm和Hadoop的属性。
Storm | Hadoop |
---|---|
实时流处理 | 批量处理 |
无状态 | 有状态 |
主/从架构与基于ZooKeeper的协调。主节点称为nimbus,从属节点是主管。 | 具有/不具有基于ZooKeeper的协调的主 - 从结构。主节点是作业跟踪器,从节点是任务跟踪器。 |
Storm流过程在集群上每秒可以访问数万条消息。 | Hadoop分布式文件系统(HDFS)使用MapReduce框架来处理大量的数据,需要几分钟或几小时。 |
Storm拓扑运行直到用户关闭或意外的不可恢复故障。 | MapReduce作业按顺序执行并最终完成。 |
两者都是分布式和容错的 | |
如果nimbus / supervisor死机,重新启动使它从它停止的地方继续,因此没有什么受到影响。 | 如果JobTracker死机,所有正在运行的作业都会丢失。 |
Apache Storm对于实时大数据流处理非常有名。因此,大多数公司都将Storm用作其系统的一个组成部分。一些值得注意的例子如下 -
Twitter - Twitter正在使用Apache Storm作为其“发布商分析产品”。 “发布商分析产品”处理Twitter平台中的每个tweets和点击。 Apache Storm与Twitter基础架构深度集成。
NaviSite - NaviSite正在使用Storm进行事件日志监控/审计系统。系统中生成的每个日志都将通过Storm。Storm将根据配置的正则表达式集检查消息,如果存在匹配,那么该特定消息将保存到数据库。
Wego - Wego是位于新加坡的旅行元搜索引擎。旅行相关数据来自世界各地的许多来源,时间不同。Storm帮助Wego搜索实时数据,解决并发问题,并为最终用户找到最佳匹配。
下面是Apache Storm提供的好处列表:
Storm是开源的,强大的,用户友好的。它可以用于小公司和大公司。
Storm是容错的,灵活的,可靠的,并且支持任何编程语言。
允许实时流处理。
Storm是令人难以置信的快,因为它具有巨大的处理数据的力量。
Storm可以通过线性增加资源来保持性能,即使在负载增加的情况下。它是高度可扩展的。
Storm在几秒钟或几分钟内执行数据刷新和端到端传送响应取决于问题。它具有非常低的延迟。
Storm有操作智能。
Storm提供保证的数据处理,即使群集中的任何连接的节点死或消息丢失。
Apache Storm从一端读取实时数据的原始流,并将其传递通过一系列小处理单元,并在另一端输出处理/有用的信息。
下图描述了Apache Storm的核心概念。
现在让我们仔细看看Apache Storm的组件 -
组件 | 描述 |
---|---|
Tuple | Tuple是Storm中的主要数据结构。它是有序元素的列表。默认情况下,Tuple支持所有数据类型。通常,它被建模为一组逗号分隔的值,并传递到Storm集群。 |
Stream | 流是元组的无序序列。 |
Spouts | 流的源。通常,Storm从原始数据源(如Twitter Streaming API,Apache Kafka队列,Kestrel队列等)接受输入数据。否则,您可以编写spouts以从数据源读取数据。“ISpout”是实现spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。 |
Bolts | Bolts是逻辑处理单元。Spouts将数据传递到Bolts和Bolts过程,并产生新的输出流。Bolts可以执行过滤,聚合,加入,与数据源和数据库交互的操作。Bolts接收数据并发射到一个或多个Bolts。 “IBolt”是实现Bolts的核心接口。一些常见的接口是IRichBolt,IBasicBolt等。 |
让我们来看一个“Twitter分析”的实时示例,看看如何在Apache Storm中建模。下图描述了结构。
“Twitter分析”的输入来自Twitter Streaming API。Spout将使用Twitter Streaming API读取用户的tweets,并作为元组流输出。来自spout的单个元组将具有twitter用户名和单个tweet作为逗号分隔值。然后,这个元组的蒸汽将被转发到Bolt,并且Bolt将tweet拆分成单个字,计算字数,并将信息保存到配置的数据源。现在,我们可以通过查询数据源轻松获得结果。
Spouts和Bolts连接在一起,形成拓扑结构。实时应用程序逻辑在Storm拓扑中指定。简单地说,拓扑是有向图,其中顶点是计算,边缘是数据流。
简单拓扑从spouts开始。Spouts将数据发射到一个或多个Bolts。Bolt表示拓扑中具有最小处理逻辑的节点,并且Bolts的输出可以发射到另一个Bolts作为输入。
Storm保持拓扑始终运行,直到您终止拓扑。Apache Storm的主要工作是运行拓扑,并在给定时间运行任意数量的拓扑。
现在你有一个关于Spouts和Bolts的基本想法。它们是拓扑的最小逻辑单元,并且使用单个Spout和Bolt阵列构建拓扑。应以特定顺序正确执行它们,以使拓扑成功运行。Storm执行的每个Spout和Bolt称为“任务”。简单来说,任务是Spouts或Bolts的执行。在给定时间,每个Spout和Bolt可以具有在多个单独的螺纹中运行的多个实例。
拓扑在多个工作节点上以分布式方式运行。Storm将所有工作节点上的任务均匀分布。工作节点的角色是监听作业,并在新作业到达时启动或停止进程。
数据流从Spouts流到Bolts,或从一个Bolts流到另一个Bolts。流分组控制元组在拓扑中的路由方式,并帮助我们了解拓扑中的元组流。有四个内置分组,如下所述。
在随机分组中,相等数量的元组随机分布在执行Bolts的所有工人中。下图描述了结构。
元组中具有相同值的字段组合在一起,其余的元组保存在外部。然后,具有相同字段值的元组被向前发送到执行Bolts的同一进程。例如,如果流由字段“字”分组,则具有相同字符串“Hello”的元组将移动到相同的工作者。下图显示了字段分组的工作原理。
所有流可以分组并向前到一个Bolts。此分组将源的所有实例生成的元组发送到单个目标实例(具体来说,选择具有最低ID的工作程序)。
所有分组将每个元组的单个副本发送到接收Bolts的所有实例。这种分组用于向Bolts发送信号。所有分组对于连接操作都很有用。
Apache Storm的主要亮点是,它是一个容错,快速,没有“单点故障”(SPOF)分布式应用程序。我们可以根据需要在多个系统中安装Apache Storm,以增加应用程序的容量。
让我们看看Apache Storm集群如何设计和其内部架构。下图描述了集群设计。
Apache Storm有两种类型的节点,Nimbus(主节点)和Supervisor(工作节点)。Nimbus是Apache Storm的核心组件。Nimbus的主要工作是运行Storm拓扑。Nimbus分析拓扑并收集要执行的任务。然后,它将任务分配给可用的supervisor。
Supervisor将有一个或多个工作进程。Supervisor将任务委派给工作进程。工作进程将根据需要产生尽可能多的执行器并运行任务。Apache Storm使用内部分布式消息传递系统来进行Nimbus和管理程序之间的通信。
组件 | 描述 |
---|---|
Nimbus(主节点) | Nimbus是Storm集群的主节点。集群中的所有其他节点称为工作节点。主节点负责在所有工作节点之间分发数据,向工作节点分配任务和监视故障。 |
Supervisor(工作节点) | 遵循指令的节点被称为Supervisors。Supervisor有多个工作进程,它管理工作进程以完成由nimbus分配的任务。 |
Worker process(工作进程) | 工作进程将执行与特定拓扑相关的任务。工作进程不会自己运行任务,而是创建执行器并要求他们执行特定的任务。工作进程将有多个执行器。 |
Executor(执行者) | 执行器只是工作进程产生的单个线程。执行器运行一个或多个任务,但仅用于特定的spout或bolt。 |
Task(任务) | 任务执行实际的数据处理。所以,它是一个spout或bolt。 |
ZooKeeper framework(ZooKeeper框架) | Apache的ZooKeeper的是使用群集(节点组)自己和维护具有强大的同步技术共享数据之间进行协调的服务。Nimbus是无状态的,所以它依赖于ZooKeeper来监视工作节点的状态。 ZooKeeper的帮助supervisor与nimbus交互。它负责维持nimbus,supervisor的状态。 |
Storm是无状态的。即使无状态性质有它自己的缺点,它实际上帮助Storm以最好的可能和最快的方式处理实时数据。
Storm虽然不是完全无状态的。它将其状态存储在Apache ZooKeeper中。由于状态在Apache ZooKeeper中可用,故障的网络可以重新启动,并从它离开的地方工作。通常,像monit这样的服务监视工具将监视Nimbus,并在出现任何故障时重新启动它。
Apache Storm还有一个称为Trident拓扑的高级拓扑,它具有状态维护,并且还提供了一个高级API,如Pig。我们将在接下来的章节中讨论所有这些功能。
一个工作的Storm集群应该有一个Nimbus和一个或多个supervisors。另一个重要的节点是Apache ZooKeeper,它将用于nimbus和supervisors之间的协调。
现在让我们仔细看看Apache Storm的工作流程 −
最初,nimbus将等待“Storm拓扑”提交给它。
一旦提交拓扑,它将处理拓扑并收集要执行的所有任务和任务将被执行的顺序。
然后,nimbus将任务均匀分配给所有可用的supervisors。
在特定的时间间隔,所有supervisor将向nimbus发送心跳以通知它们仍然运行着。
当supervisor终止并且不向心跳发送心跳时,则nimbus将任务分配给另一个supervisor。
当nimbus本身终止时,supervisor将在没有任何问题的情况下对已经分配的任务进行工作。
一旦所有的任务都完成后,supervisor将等待新的任务进去。
同时,终止nimbus将由服务监控工具自动重新启动。
重新启动的网络将从停止的地方继续。同样,终止supervisor也可以自动重新启动。由于网络管理程序和supervisor都可以自动重新启动,并且两者将像以前一样继续,因此Storm保证至少处理所有任务一次。
一旦处理了所有拓扑,则网络管理器等待新的拓扑到达,并且类似地,管理器等待新的任务。
默认情况下,Storm集群中有两种模式:
本地模式 -此模式用于开发,测试和调试,因为它是查看所有拓扑组件协同工作的最简单方法。在这种模式下,我们可以调整参数,使我们能够看到我们的拓扑如何在不同的Storm配置环境中运行。在本地模式下,storm拓扑在本地机器上在单个JVM中运行。
生产模式 -在这种模式下,我们将拓扑提交到工作Storm集群,该集群由许多进程组成,通常运行在不同的机器上。如在storm的工作流中所讨论的,工作集群将无限地运行,直到它被关闭。
Apache Storm处理实时数据,并且输入通常来自消息排队系统。外部分布式消息系统将提供实时计算所需的输入。Spout将从消息系统读取数据,并将其转换为元组并输入到Apache Storm中。有趣的是,Apache Storm在内部使用其自己的分布式消息传递系统,用于其nimbus和主管之间的通信。
分布式消息传递基于可靠消息队列的概念。消息在客户端应用程序和消息系统之间异步排队。分布式消息传递系统提供可靠性,可扩展性和持久性的好处。
大多数消息模式遵循发布 - 订阅模型(简称发布 - 订阅),其中消息的发送者称为发布者,而想要接收消息的那些被称为订阅者。
一旦消息已经被发送者发布,订阅者可以在过滤选项的帮助下接收所选择的消息。通常我们有两种类型的过滤,一种是基于主题的过滤,另一种是基于内容的过滤。
需要注意的是,pub-sub模型只能通过消息进行通信。它是一个非常松散耦合的架构;甚至发件人不知道他们的订阅者是谁。许多消息模式使消息代理能够交换发布消息以便由许多订户及时访问。一个现实生活的例子是Dish电视,它发布不同的渠道,如运动,电影,音乐等,任何人都可以订阅自己的频道集,并获得他们订阅的频道时可用。
下表描述了一些流行的高吞吐量消息传递系统 -
分布式消息系统 | 描述 |
---|---|
Apache Kafka | Kafka是在LinkedIn公司开发的,后来它成为Apache的一个子项目。 Apache Kafka基于brokerenabled的,持久的,分布式的发布订阅模型。 Kafka是快速,可扩展和高效的。 |
RabbitMQ | RabbitMQ是一个开源的分布式鲁棒消息应用程序。它易于使用并在所有平台上运行。 |
JMS(Java Message Service) | JMS是一个开源API,支持创建,读取和从一个应用程序向另一个应用程序发送消息。它提供有保证的消息传递并遵循发布 - 订阅模型。 |
ActiveMQ | ActiveMQ消息系统是JMS的开源API。 |
ZeroMQ | ZeroMQ是无代理的对等体消息处理。它提供推拉,路由器 - 经销商消息模式。 |
Kestrel | Kestrel是一个快速,可靠,简单的分布式消息队列。 |
Thrift在Facebook上构建,用于跨语言服务开发和远程过程调用(RPC)。后来,它成为一个开源的Apache项目。Apache Thrift是一种接口定义语言,允许以容易的方式在定义的数据类型之上定义新的数据类型和服务实现。
Apache Thrift也是一个支持嵌入式系统,移动应用程序,Web应用程序和许多其他编程语言的通信框架。与Apache Thrift相关的一些关键功能是它的模块化,灵活性和高性能。此外,它可以在分布式应用程序中执行流式处理,消息传递和RPC。
Storm广泛使用Thrift协议进行内部通信和数据定义。Storm拓扑只是Thrift Structs。在Apache Storm中运行拓扑的Storm Nimbus是一个Thrift服务。
现在,让我们来看看如何在你的机器上安装Apache Storm框架。这里有三个步骤 -
使用以下命令检查系统上是否已安装Java。
$ java -version
如果Java已经存在,那么你会看到它的版本号。否则,下载最新版本的JDK。
使用以下链接 - www.oracle.com下载最新版本的JDK
最新版本为JDK 8u 60,文件为“jdk-8u60-linux-x64.tar.gz”。在您的机器上下载文件。
通常文件被下载到下载文件夹。使用以下命令解压tar设置。
$ cd /go/to/download/path$ tar -zxf jdk-8u60-linux-x64.gz
要使Java对所有用户可用,请将提取的Java内容移动到“/ usr / local / java”文件夹。
$ supassword: (type password of root user)$ mkdir /opt/jdk$ mv jdk-1.8.0_60 /opt/jdk/
要设置路径和JAVA_HOME变量,请将以下命令添加到〜/ .bashrc文件。
export JAVA_HOME =/usr/jdk/jdk-1.8.0_60export PATH=$PATH:$JAVA_HOME/bin
现在将所有更改应用到当前运行的系统。
$ source ~/.bashrc
使用以下命令更改Java替代项。
update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100
现在使用第1步中解释的验证命令(java -version)验证Java安装。
要在您的计算机上安装ZooKeeper框架,请访问以下链接并下载最新版本的ZooKeeper http://zookeeper.apache.org/releases.html
到目前为止,最新版本的ZooKeeper是3.4.6(ZooKeeper-3.4.6.tar.gz)。
使用以下命令解压tar文件 -
$ cd opt/$ tar -zxf zookeeper-3.4.6.tar.gz$ cd zookeeper-3.4.6$ mkdir data
使用命令“vi conf / zoo.cfg”打开名为“conf / zoo.cfg”的配置文件,并将所有以下参数设置为起点。
$ vi conf/zoo.cfgtickTime=2000dataDir=/path/to/zookeeper/dataclientPort=2181initLimit=5syncLimit=2
配置文件保存成功后,可以启动ZooKeeper服务器。
使用以下命令启动ZooKeeper服务器。
$ bin/zkServer.sh start
执行此命令后,您将收到一个响应如下 -
$ JMX enabled by default$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg$ Starting zookeeper ... STARTED
使用以下命令启动CLI。
$ bin/zkCli.sh
执行上述命令后,您将连接到ZooKeeper服务器并获得以下响应。
Connecting to localhost:2181................................................Welcome to ZooKeeper!................................WATCHER::WatchedEvent state:SyncConnected type: None path:null[zk: localhost:2181(CONNECTED) 0]
连接服务器并执行所有操作后,可以使用以下命令停止ZooKeeper服务器。
bin/zkServer.sh stop
您已成功在计算机上安装Java和ZooKeeper。现在让我们看看安装Apache Storm框架的步骤。
要在您的计算机上安装Storm框架,请访问以下链接并下载最新版本的Storm http://storm.apache.org/downloads.html
到目前为止,最新版本的Storm是“apache-storm-0.9.5.tar.gz”。
使用以下命令解压tar文件
$ cd opt/$ tar -zxf apache-storm-0.9.5.tar.gz$ cd apache-storm-0.9.5$ mkdir data
当前版本的Storm在“conf / storm.yaml”中包含一个配置Storm守护程序的文件。将以下信息添加到该文件。
$ vi conf/storm.yamlstorm.zookeeper.servers: - "localhost"storm.local.dir: “/path/to/storm/data(any path)”nimbus.host: "localhost"supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
应用所有更改后,保存并返回到终端。
$ bin/storm nimbus
$ bin/storm supervisor
$ bin/storm ui
启动Storm用户界面应用程序后,在您最喜欢的浏览器中键入URL http:// localhost:8080,您可以看到Storm群集信息及其运行的拓扑。该页面应类似于以下屏幕截图。
我们已经经历了Apache Storm的核心技术细节,现在是时候编写一些简单的场景。
移动呼叫及其持续时间将作为对Apache Storm的输入,Storm将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数。
Spout是用于数据生成的组件。基本上,一个spout将实现一个IRichSpout接口。 “IRichSpout”接口有以下重要方法 -
open -为Spout提供执行环境。执行器将运行此方法来初始化喷头。
nextTuple -通过收集器发出生成的数据。
close -当spout将要关闭时调用此方法。
declareOutputFields -声明元组的输出模式。
ack -确认处理了特定元组。
fail -指定不处理和不重新处理特定元组。
open方法的签名如下 -
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf - 为此spout提供storm配置。
context - 提供有关拓扑中的spout位置,其任务ID,输入和输出信息的完整信息。
collector - 使我们能够发出将由bolts处理的元组。
nextTuple方法的签名如下 -
nextTuple()
nextTuple()从与ack()和fail()方法相同的循环中定期调用。它必须释放线程的控制,当没有工作要做,以便其他方法有机会被调用。因此,nextTuple的第一行检查处理是否已完成。如果是这样,它应该休眠至少一毫秒,以减少处理器在返回之前的负载。
close方法的签名如下-
close()
declareOutputFields方法的签名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
declarer -它用于声明输出流id,输出字段等
此方法用于指定元组的输出模式。
ack方法的签名如下 -
ack(Object msgId)
该方法确认已经处理了特定元组。
nextTuple方法的签名如下-
ack(Object msgId)
此方法通知特定元组尚未完全处理。 Storm将重新处理特定的元组。
在我们的场景中,我们需要收集呼叫日志详细信息。呼叫日志的信息包含。
由于我们没有呼叫日志的实时信息,我们将生成假呼叫日志。假信息将使用Random类创建。完整的程序代码如下。
import java.util.*;//import storm tuple packagesimport backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;//import Spout interface packagesimport backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;//Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; }}
Bolt是一个使用元组作为输入,处理元组,并产生新的元组作为输出的组件。Bolts将实现IRichBolt接口。在此程序中,使用两个Bolts
类CallLogCreatorBolt和CallLogCounterBolt来执行操作。
IRichBolt接口有以下方法 -
prepare -为bolt提供要执行的环境。执行器将运行此方法来初始化spout。
execute -处理单个元组的输入
cleanup -当spout要关闭时调用。
declareOutputFields -声明元组的输出模式。
prepare(Map conf, TopologyContext context, OutputCollector collector)
conf -为此bolt提供Storm配置。
context -提供有关拓扑中的bolt位置,其任务ID,输入和输出信息等的完整信息。
collector -使我们能够发出处理的元组。
execute方法的签名如下-
execute(Tuple tuple)
这里的元组是要处理的输入元组。
execute方法一次处理单个元组。元组数据可以通过Tuple类的getValue方法访问。不必立即处理输入元组。多元组可以被处理和输出为单个输出元组。处理的元组可以通过使用OutputCollector类发出。
cleanup方法的签名如下 -
cleanup()
declareOutputFields方法的签名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
这里的参数declarer用于声明输出流id,输出字段等。
此方法用于指定元组的输出模式。
呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。完整的代码如下。
//import util packagesimport java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;//import Storm IRichBolt packageimport backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;//Create a class CallLogCreatorBolt which implement IRichBolt interfacepublic class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; }}
呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。完整的代码如下。
import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Storm拓扑基本上是一个Thrift结构。 TopologyBuilder类提供了简单而容易的方法来创建复杂的拓扑。TopologyBuilder类具有设置spout(setSpout)和设置bolt(setBolt)的方法。最后,TopologyBuilder有createTopology来创建拓扑。使用以下代码片段创建拓扑 -
TopologyBuilder builder = new TopologyBuilder();builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout");builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping和fieldsGrouping方法有助于为spout和bolts设置流分组。
为了开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑。 “submitTopology”的参数之一是“Config”类的实例。“Config”类用于在提交拓扑之前设置配置选项。此配置选项将在运行时与集群配置合并,并使用prepare方法发送到所有任务(spout和bolt)。一旦拓扑提交到集群,我们将等待10秒钟,集群计算提交的拓扑,然后使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下 -
import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;//import storm configuration packagesimport backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;//Create main class LogAnalyserStorm submit topology.public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); }}
完整的应用程序有四个Java代码。它们是 -
应用程序可以使用以下命令构建 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
应用程序可以使用以下命令运行 -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
一旦应用程序启动,它将输出有关集群启动过程,spout和螺栓处理的完整详细信息,最后是集群关闭过程。在“CallLogCounterBolt”中,我们打印了呼叫及其计数详细信息。此信息将显示在控制台上如下 -
1234123402 - 1234123401 : 781234123402 - 1234123404 : 881234123402 - 1234123403 : 1051234123401 - 1234123404 : 741234123401 - 1234123403 : 811234123401 - 1234123402 : 811234123403 - 1234123404 : 861234123404 - 1234123401 : 631234123404 - 1234123402 : 821234123403 - 1234123402 : 831234123404 - 1234123403 : 861234123403 - 1234123401 : 93
Storm拓扑通过Thrift接口实现,这使得轻松地提交任何语言的拓扑。Storm支持Ruby,Python和许多其他语言。让我们来看看python绑定。
Python是一种通用的解释,交互,面向对象和高级编程语言。Storm支持Python实现其拓扑。Python支持发射,锚定,acking和日志操作。
如你所知,bolt可以用任何语言定义。用另一种语言编写的bolt作为子进程执行,Storm通过stdin / stdout与JSON消息进行通信。首先拿一个支持python绑定的样例bolt WordCount。
public static class WordCount implements IRichBolt { public WordSplit() { super("python", "splitword.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }}
这里的类WordCount实现IRichBolt接口和运行与python实现指定超级方法参数“splitword.py”。现在创建一个名为“splitword.py”的python实现。
import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word])WordCountBolt().run()
这是Python的示例实现,它计算给定句子中的单词。同样,您也可以与其他支持语言绑定。
Trident是Storm的延伸。像Storm,Trident也是由Twitter开发的。开发Trident的主要原因是在Storm上提供高级抽象,以及状态流处理和低延迟分布式查询。
Trident使用spout和bolt,但是这些低级组件在执行之前由Trident自动生成。 Trident具有函数,过滤器,联接,分组和聚合。
Trident将流处理为一系列批次,称为事务。通常,这些小批量的大小将是大约数千或数百万个元组,这取决于输入流。这样,Trident不同于Storm,它执行元组一元组处理。
批处理概念非常类似于数据库事务。每个事务都分配了一个事务ID。该事务被认为是成功的,一旦其所有的处理完成。然而,处理事务的元组中的一个的失败将导致整个事务被重传。对于每个批次,Trident将在事务开始时调用beginCommit,并在结束时提交。
Trident API公开了一个简单的选项,使用“TridentTopology”类创建Trident拓扑。基本上,Trident拓扑从流出接收输入流,并对流上执行有序的操作序列(滤波,聚合,分组等)。Storm元组被替换为Trident元组,bolt被操作替换。一个简单的Trident拓扑可以创建如下 -
TridentTopology topology = new TridentTopology();
Trident Tuples是一个命名的值列表。TridentTuple接口是Trident拓扑的数据模型。TridentTuple接口是可由Trident拓扑处理的数据的基本单位。
Trident spout与类似于Storm spout,附加选项使用Trident的功能。实际上,我们仍然可以使用IRichSpout,我们在Storm拓扑中使用它,但它本质上是非事务性的,我们将无法使用Trident提供的优点。
具有使用Trident的特征的所有功能的基本spout是“ITridentSpout”。它支持事务和不透明的事务语义。其他的spouts是IBatchSpout,IPartitionedTridentSpout和IOpaquePartitionedTridentSpout。
除了这些通用spouts,Trident有许多样品实施trident spout。其中之一是FeederBatchSpout输出,我们可以使用它发送trident tuples的命名列表,而不必担心批处理,并行性等。
FeederBatchSpout创建和数据馈送可以如下所示完成 -
TridentTopology topology = new TridentTopology();FeederBatchSpout testSpout = new FeederBatchSpout( ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));topology.newStream("fixed-batch-spout", testSpout)testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
Trident依靠“Trident操作”来处理trident tuples的输入流。Trident API具有多个内置操作来处理简单到复杂的流处理。这些操作的范围从简单验证到复杂的trident tuples分组和聚合。让我们经历最重要和经常使用的操作。
过滤器是用于执行输入验证任务的对象。Trident过滤器获取trident tuples字段的子集作为输入,并根据是否满足某些条件返回真或假。如果返回true,则该元组保存在输出流中;否则,从流中移除元组。过滤器将基本上继承自BaseFilter类并实现isKeep方法。这里是一个滤波器操作的示例实现 -
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(1) % 2 == 0; }}input[1, 2][1, 3][1, 4]output[1, 2][1, 4]
可以使用“each”方法在拓扑中调用过滤器功能。“Fields”类可以用于指定输入(trident tuple的子集)。示例代码如下 -
TridentTopology topology = new TridentTopology();topology.newStream("spout", spout).each(new Fields("a", "b"), new MyFilter())
函数是用于对单个trident tuple执行简单操作的对象。它需要一个trident tuple字段的子集,并发出零个或多个新的trident tuple字段。
函数基本上从BaseFunction类继承并实现execute方法。下面给出了一个示例实现:
public class MyFunction extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { int a = tuple.getInteger(0); int b = tuple.getInteger(1); collector.emit(new Values(a + b)); }}input[1, 2][1, 3][1, 4]output[1, 2, 3][1, 3, 4][1, 4, 5]
与过滤操作类似,可以使用每个方法在拓扑中调用函数操作。示例代码如下 -
TridentTopology topology = new TridentTopology();topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
聚合是用于对输入批处理或分区或流执行聚合操作的对象。Trident有三种类型的聚合。他们如下 -
aggregate -单独聚合每批trident tuple。在聚合过程期间,首先使用全局分组将元组重新分区,以将同一批次的所有分区组合到单个分区中。
partitionAggregate -聚合每个分区,而不是整个trident tuple。分区集合的输出完全替换输入元组。分区集合的输出包含单个字段元组。
persistentaggregate -聚合所有批次中的所有trident tuple,并将结果存储在内存或数据库中。
TridentTopology topology = new TridentTopology();// aggregate operationtopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .aggregate(new Count(), new Fields(“count”)) // partitionAggregate operationtopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .partitionAggregate(new Count(), new Fields(“count")) // persistentAggregate - saving the count to memorytopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
可以使用CombinerAggregator,ReducerAggregator或通用Aggregator接口创建聚合操作。上面例子中使用的“计数”聚合器是内置聚合器之一,它使用“CombinerAggregator”实现,实现如下 -
public class Count implements CombinerAggregator<Long> { @Override public Long init(TridentTuple tuple) { return 1L; } @Override public Long combine(Long val1, Long val2) { return val1 + val2; } @Override public Long zero() { return 0L; }}
分组操作是一个内置操作,可以由groupBy方法调用。groupBy方法通过在指定字段上执行partitionBy来重新分区流,然后在每个分区中,它将组字段相等的元组组合在一起。通常,我们使用“groupBy”以及“persistentAggregate”来获得分组聚合。示例代码如下 -
TridentTopology topology = new TridentTopology();// persistentAggregate - saving the count to memorytopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .groupBy(new Fields(“d”) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
合并和连接可以分别通过使用“合并”和“连接”方法来完成。合并组合一个或多个流。加入类似于合并,除了加入使用来自两边的trident tuple字段来检查和连接两个流的事实。此外,加入将只在批量级别工作。示例代码如下 -
TridentTopology topology = new TridentTopology();topology.merge(stream1, stream2, stream3);topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
Trident提供了状态维护的机制。状态信息可以存储在拓扑本身中,否则也可以将其存储在单独的数据库中。原因是维护一个状态,如果任何元组在处理过程中失败,则重试失败的元组。这会在更新状态时产生问题,因为您不确定此元组的状态是否已在之前更新过。如果在更新状态之前元组已经失败,则重试该元组将使状态稳定。然而,如果元组在更新状态后失败,则重试相同的元组将再次增加数据库中的计数并使状态不稳定。需要执行以下步骤以确保消息仅处理一次 -
小批量处理元组。
为每个批次分配唯一的ID。如果重试批次,则给予相同的唯一ID。
状态更新在批次之间排序。例如,第二批次的状态更新将不可能,直到第一批次的状态更新完成为止。
分布式RPC用于查询和检索Trident拓扑结果。 Storm有一个内置的分布式RPC服务器。分布式RPC服务器从客户端接收RPC请求并将其传递到拓扑。拓扑处理请求并将结果发送到分布式RPC服务器,分布式RPC服务器将其重定向到客户端。Trident的分布式RPC查询像正常的RPC查询一样执行,除了这些查询并行运行的事实。
在许多使用情况下,如果要求是只处理一次查询,我们可以通过在Trident中编写拓扑来实现。另一方面,在Storm的情况下将难以实现精确的一次处理。因此,Trident将对那些需要一次处理的用例有用。Trident不适用于所有用例,特别是高性能用例,因为它增加了Storm的复杂性并管理状态。
我们将把上一节中制定的呼叫日志分析器应用程序转换为Trident框架。由于其高级API,Trident应用程序将比普通风暴更容易。Storm基本上需要执行Trident中的Function,Filter,Aggregate,GroupBy,Join和Merge操作中的任何一个。最后,我们将使用LocalDRPC类启动DRPC服务器,并使用LocalDRPC类的execute方法搜索一些关键字。
FormatCall类的目的是格式化包括“呼叫者号码”和“接收者号码”的呼叫信息。完整的程序代码如下 -
import backtype.storm.tuple.Values;import storm.trident.operation.BaseFunction;import storm.trident.operation.TridentCollector;import storm.trident.tuple.TridentTuple;public class FormatCall extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String fromMobileNumber = tuple.getString(0); String toMobileNumber = tuple.getString(1); collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber)); }}
CSVSplit类的目的是基于“comma(,)”拆分输入字符串,并发出字符串中的每个字。此函数用于解析分布式查询的输入参数。完整的代码如下 -
import backtype.storm.tuple.Values;import storm.trident.operation.BaseFunction;import storm.trident.operation.TridentCollector;import storm.trident.tuple.TridentTuple;public class CSVSplit extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { for(String word: tuple.getString(0).split(",")) { if(word.length() > 0) { collector.emit(new Values(word)); } } }}
这是主要的应用程序。最初,应用程序将使用FeederBatchSpout初始化TridentTopology并提供调用者信息。Trident拓扑流可以使用TridentTopology类的newStream方法创建。类似地,Trident拓扑DRPC流可以使用TridentTopology类的newDRCPStream方法创建。可以使用LocalDRPC类创建一个简单的DRCP服务器。LocalDRPC有execute方法来搜索一些关键字。完整的代码如下。
import java.util.*;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.LocalDRPC;import backtype.storm.utils.DRPCClient;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import storm.trident.TridentState;import storm.trident.TridentTopology;import storm.trident.tuple.TridentTuple;import storm.trident.operation.builtin.FilterNull;import storm.trident.operation.builtin.Count;import storm.trident.operation.builtin.Sum;import storm.trident.operation.builtin.MapGet;import storm.trident.operation.builtin.Debug;import storm.trident.operation.BaseFilter;import storm.trident.testing.FixedBatchSpout;import storm.trident.testing.FeederBatchSpout;import storm.trident.testing.Split;import storm.trident.testing.MemoryMapState;import com.google.common.collect.ImmutableList;public class LogAnalyserTrident { public static void main(String[] args) throws Exception { System.out.println("Log Analyser Trident"); TridentTopology topology = new TridentTopology(); FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber", "toMobileNumber", "duration")); TridentState callCounts = topology .newStream("fixed-batch-spout", testSpout) .each(new Fields("fromMobileNumber", "toMobileNumber"), new FormatCall(), new Fields("call")) .groupBy(new Fields("call")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); LocalDRPC drpc = new LocalDRPC(); topology.newDRPCStream("call_count", drpc) .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count")); topology.newDRPCStream("multiple_call_count", drpc) .each(new Fields("args"), new CSVSplit(), new Fields("call")) .groupBy(new Fields("call")) .stateQuery(callCounts, new Fields("call"), new MapGet(), new Fields("count")) .each(new Fields("call", "count"), new Debug()) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("trident", conf, topology.build()); Random randomGenerator = new Random(); int idx = 0; while(idx < 10) { testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123403", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123404", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123402", "1234123403", randomGenerator.nextInt(60)))); idx = idx + 1; } System.out.println("DRPC : Query starts"); System.out.println(drpc.execute("call_count","1234123401 - 1234123402")); System.out.println(drpc.execute("multiple_call_count", "1234123401 - 1234123402,1234123401 - 1234123403")); System.out.println("DRPC : Query ends"); cluster.shutdown(); drpc.shutdown(); // DRPCClient client = new DRPCClient("drpc.server.location", 3772); }}
完整的应用程序有三个Java代码。他们如下 -
可以使用以下命令构建应用程序 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以使用以下命令运行应用程序 -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident
一旦应用程序启动,应用程序将输出有关集群启动过程,操作处理,DRPC服务器和客户端信息的完整详细信息,以及最后的集群关闭过程。此输出将显示在控制台上,如下所示。
DRPC : Query starts[["1234123401 - 1234123402",10]]DEBUG: [1234123401 - 1234123402, 10]DEBUG: [1234123401 - 1234123403, 10][[20]]DRPC : Query ends
在本章中,我们将讨论Apache Storm的实时应用程序。我们将看到Storm如何在Twitter中使用。
Twitter是一种在线社交网络服务,提供发送和接收用户推文的平台。注册用户可以阅读和发布tweet,但未注册的用户只能阅读tweets。 Hashtag用于按关键字在相关关键字之前附加#来对tweet进行分类。现在让我们来看一个实时场景,找到每个主题使用最多的hashtag。
spout的目的是尽快收到人们提交的tweets。Twitter提供了“Twitter Streaming API”,一个基于Web服务的工具,用于实时检索人们提交的tweets。Twitter Streaming API可以使用任何编程语言访问。
twitter4j是一个开源的非官方Java库,它提供了一个基于Java的模块,可以轻松访问Twitter Streaming API。twitter4j提供了一个基于监听器的框架来访问tweet。要访问Twitter Streaming API,我们需要登录Twitter开发人员帐户,并获取以下OAuth身份验证详细信息。
Storm在其入门套件中提供了一个twitter spout,TwitterSampleSpout。我们将使用它来检索tweet。该邮件需要OAuth身份验证详细信息和至少一个关键字。该spout将发出基于关键字的实时tweet。完整的程序代码如下。
import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import twitter4j.FilterQuery;import twitter4j.StallWarning;import twitter4j.Status;import twitter4j.StatusDeletionNotice;import twitter4j.StatusListener;import twitter4j.TwitterStream;import twitter4j.TwitterStreamFactory;import twitter4j.auth.AccessToken;import twitter4j.conf.ConfigurationBuilder;import backtype.storm.Config;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;@SuppressWarnings("serial")public class TwitterSampleSpout extends BaseRichSpout { SpoutOutputCollector _collector; LinkedBlockingQueue<Status> queue = null; TwitterStream _twitterStream; String consumerKey; String consumerSecret; String accessToken; String accessTokenSecret; String[] keyWords; public TwitterSampleSpout(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, String[] keyWords) { this.consumerKey = consumerKey; this.consumerSecret = consumerSecret; this.accessToken = accessToken; this.accessTokenSecret = accessTokenSecret; this.keyWords = keyWords; } public TwitterSampleSpout() { // TODO Auto-generated constructor stub } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { queue = new LinkedBlockingQueue<Status>(1000); _collector = collector; StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); } @Override public void onDeletionNotice(StatusDeletionNotice sdn) {} @Override public void onTrackLimitationNotice(int i) {} @Override public void onScrubGeo(long l, long l1) {} @Override public void onException(Exception ex) {} @Override public void onStallWarning(StallWarning arg0) { // TODO Auto-generated method stub } }; ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret); _twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); _twitterStream.addListener(listener); if (keyWords.length == 0) { _twitterStream.sample(); }else { FilterQuery query = new FilterQuery().track(keyWords); _twitterStream.filter(query); } } @Override public void nextTuple() { Status ret = queue.poll(); if (ret == null) { Utils.sleep(50); } else { _collector.emit(new Values(ret)); } } @Override public void close() { _twitterStream.shutdown(); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } @Override public void ack(Object id) {} @Override public void fail(Object id) {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweet")); }}
由spout发出的tweet将被转发到HashtagReaderBolt,它将处理该tweet并发出所有可用的hashtag。HashtagReaderBolt使用twitter4j提供的getHashTagEntities方法。getHashTagEntities读取tweet并返回hashtag的列表。完整的程序代码如下 -
import java.util.HashMap;import java.util.Map;import twitter4j.*;import twitter4j.conf.*;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class HashtagReaderBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { Status tweet = (Status) tuple.getValueByField("tweet"); for(HashtagEntity hashtage : tweet.getHashtagEntities()) { System.out.println("Hashtag: " + hashtage.getText()); this.collector.emit(new Values(hashtage.getText())); } } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
发出的hashtag将被转发到HashtagCounterBolt。这个bolt将处理所有的hashtags,并使用Java Map对象将每个hashtags及其计数保存在内存中。完整的程序代码如下。
import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class HashtagCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String key = tuple.getString(0); if(!counterMap.containsKey(key)){ counterMap.put(key, 1); }else{ Integer c = counterMap.get(key) + 1; counterMap.put(key, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println("Result: " + entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
提交拓扑是主要应用程序。Twitter拓扑由TwitterSampleSpout,HashtagReaderBolt和HashtagCounterBolt组成。以下程序代码显示如何提交拓扑。
import java.util.*;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;public class TwitterHashtagStorm { public static void main(String[] args) throws Exception{ String consumerKey = args[0]; String consumerSecret = args[1]; String accessToken = args[2]; String accessTokenSecret = args[3]; String[] arguments = args.clone(); String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keyWords)); builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt()) .shuffleGrouping("twitter-spout"); builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt()) .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TwitterHashtagStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); }}
完整的应用程序有四个Java代码。他们如下 -
您可以使用以下命令编译应用程序 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java
使用以下命令执行应用程序 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret><keyword1> <keyword2> … <keywordN>
应用程序将打印当前可用的主题标签及其计数。输出应类似于以下内容 -
Result: jazztastic : 1Result: foodie : 1Result: Redskins : 1Result: Recipe : 1Result: cook : 1Result: android : 1Result: food : 2Result: NoToxicHorseMeat : 1Result: Purrs4Peace : 1Result: livemusic : 1Result: VIPremium : 1Result: Frome : 1Result: SundayRoast : 1Result: Millennials : 1Result: HealthWithKier : 1Result: LPs30DaysofGratitude : 1Result: cooking : 1Result: gameinsight : 1Result: Countryfile : 1Result: androidgames : 1
雅虎财经是互联网领先的商业新闻和金融数据网站。它是雅虎的一部分,并提供有关金融新闻,市场统计,国际市场数据和其他任何人都可以访问的财务资源信息。
如果您是注册的Yahoo!用户,那么您可以自定义Yahoo! Finance以利用其特定产品。Yahoo! Finance API用于从Yahoo!查询财务数据。
此API显示实时延迟15分钟的数据,并每1分钟更新其数据库,以访问当前股票相关信息。现在让我们看一家公司的实时情景,看看当公司的股票价值低于100时如何提高警报。
spout的目的是获得公司的详细信息,并发出价格spout。您可以使用以下程序代码创建spout。
import java.util.*;import java.io.*;import java.math.BigDecimal;//import yahoofinace packagesimport yahoofinance.YahooFinance;import yahoofinance.Stock;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;public class YahooFinanceSpout implements IRichSpout { private SpoutOutputCollector collector; private boolean completed = false; private TopologyContext context; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){ this.context = context; this.collector = collector; } @Override public void nextTuple() { try { Stock stock = YahooFinance.get("INTC"); BigDecimal price = stock.getQuote().getPrice(); this.collector.emit(new Values("INTC", price.doubleValue())); stock = YahooFinance.get("GOOGL"); price = stock.getQuote().getPrice(); this.collector.emit(new Values("GOOGL", price.doubleValue())); stock = YahooFinance.get("AAPL"); price = stock.getQuote().getPrice(); this.collector.emit(new Values("AAPL", price.doubleValue())); } catch(Exception e) {} } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("company", "price")); } @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
这里的的目的是当价格低于100时处理给定公司的价格。它使用Java Map对象在股价低于100时设置截止价格限制警报为真;否则为false。完整的程序代码如下 -
import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class PriceCutOffBolt implements IRichBolt { Map<String, Integer> cutOffMap; Map<String, Boolean> resultMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.cutOffMap = new HashMap <String, Integer>(); this.cutOffMap.put("INTC", 100); this.cutOffMap.put("AAPL", 100); this.cutOffMap.put("GOOGL", 100); this.resultMap = new HashMap<String, Boolean>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String company = tuple.getString(0); Double price = tuple.getDouble(1); if(this.cutOffMap.containsKey(company)){ Integer cutOffPrice = this.cutOffMap.get(company); if(price < cutOffPrice) { this.resultMap.put(company, true); } else { this.resultMap.put(company, false); } } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("cut_off_price")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
这是YahooFinanceSpout.java和PriceCutOffBolt.java连接在一起并生成拓扑的主要应用程序。以下程序代码显示了如何提交拓扑。
import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;public class YahooFinanceStorm { public static void main(String[] args) throws Exception{ Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout()); builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt()) .fieldsGrouping("yahoo-finance-spout", new Fields("company")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); }}
完整的应用程序有三个Java代码。他们如下 -
应用程序可以使用以下命令构建 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java
应用程序可以使用以下命令运行 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.YahooFinanceStorm
输出将类似于以下内容 -
GOOGL : falseAAPL : falseINTC : true
Apache Storm框架支持许多当今最好的工业应用程序。我们将在本章中简要介绍Storm的一些最显着的应用。
Klout是一个应用程序,它使用社交媒体分析,根据在线社交影响力通过Klout得分,这是一个介于1和100之间的数值对用户排名。Klout使用Apache Storm的内置Trident抽象来创建流数据的复杂拓扑。
天气频道使用Storm拓扑来获取天气数据。它绑定了Twitter,以在Twitter和移动应用程序启用天气知道的广告。OpenSignal是一家专门从事无线覆盖制图的公司。StormTag和WeatherSignal是由OpenSignal创建的基于天气的项目。StormTag是一个蓝牙气象站,连接到钥匙串。由设备收集的天气数据发送到WeatherSignal应用程序和OpenSignal服务器。
电信提供商每秒处理数百万的电话呼叫。他们对掉话和低音质进行取证。呼叫详细记录以每秒百万的速率流入,Apache Storm实时处理这些流并识别任何令人不安的模式。Storm分析可以用来不断提高通话质量。