黑马大数据学习笔记(四)—— 实时计算Storm基础

黑马大数据学习笔记(四)—— 实时计算Storm基础

一、Storm基础

1.背景知识

我们先比较一下分布式电商网站的两种架构,及其日志收集方式

1)webservice 解耦:

将服务分成不同的职能部门,并且分别进行处理:

注册中心 —–>10实例,负载均衡,tomcat —–> tomcat.log 10 agent—>日志收集服务器
商品中心 —–>20实例,负载均衡,tomcat —–> tomcat.log 20 agent—>日志收集服务器
订单中心 —–>10实例,负载均衡,tomcat —–> tomcat.log 10 agent—>日志收集服务器
支付中心 —–>20实例,负载均衡,tomcat —–> tomcat.log 20 agent—>日志收集服务器
物流中心 —–>10实例,负载均衡,tomcat —–> tomcat.log 10 agent—>日志收集服务器
评价中心 —–>20实例,负载均衡,tomcat —–> tomcat.log 20 agent—>日志收集服务器

 

2)点击流日志收集系统(重要渠道):

JS 跨域JSONP—>提供对外的服务—>20实例,负载均衡,nginx

 

——————————————————————————

流式计算课程就是基于上面2)的方式,内容包括:
实时数据产生、实时数据传输、实时数据计算、实时数据存储(Reids)、报表展示

 

2.离线计算的特点

离线计算:批量获取数据、批量传输数据、周期性批量计算数据、数据展示

代表技术:Sqoop批量导入数据、HDFS批量存储数据、MapReduce批量计算数据、Hive批量计算数据、Kettle任务调度

3.流式计算的特点

流式计算:数据实时产生、数据实时传输、数据实时计算、实时展示

代表技术:Flume实时获取数据、Kafka实时数据存储、Storm/JStorm实时数据计算、Redis实时结果缓存

一句话总结:将源源不断产生的数据实时收集并实时计算,尽可能快的得到计算结果

4.流式计算与实时计算的区别

最大区别:实时收集、实时计算、实时展示

5.Storm是什么

Storm用来实时处理数据,特点:低延迟、高可用、分布式、可扩展、数据不丢失。提供简单容易理解的接口,便于开发。

6.Storm与Hadoop的区别

  • Storm用于实时计算,Hadoop用于离线计算
  • Storm处理的数据保存在内存中,源源不断;Hadoop处理的数据保存在文件系统中,一批一批。
  • Storm的数据通过网络传输进来;Hadoop的数据保存在磁盘中。
  • Storm与Hadoop的编程模型相似我们以一个部门的人员进行类比:
    Job:任务名称
    JobTracker:项目经理
    TaskTracker:开发组长、产品经理
    Child:负责开发的人员
    Mapper/Reducer:开发人员中的两种角色,一种是服务器开发、一种是客户端开发Topology:任务名称
    Nimbus:项目经理
    Supervisor:开发组长、产品经理
    Worker:负责开发的人员
    Spout/Bout:开发人员中的两种角色,一种是服务器开发、一种是客户端开发

7.Storm运用场景及行业案例

Storm用来实时计算源源不断产生的数据,如同流水线生产。

运用场景:

  • 日志分析
    海量日志中分析出特定的数据,并且将分析结果存入外部存储器用来辅佐决策。
  • 管道系统
    将一个数据从一个系统传输到另一个系统,比如将数据库同步到Hadoop。
  • 消息转化器
    将接受到的消息按照某种格式进行转化,存储到另一个系统如消息中间件。

典型案例:

  • 一淘-实时分析系统:实时分析用户的属性,并反馈给搜索引擎
    最初,用户属性分析是通过每天在云梯上定时运行的MR job来完成的。为了满足实时性的要求,希望能够实时分析用户的行为日志,将最新的用户属性反馈给搜索引擎,能够为用户展现最贴近其当前需求的结果。
  • 携程-网站性能监控:实时分析系统监控携程网的网站性能
    利用HTML5提供的performance标准获得可用的指标,并记录日志。Storm集群实时分析日志和入库。使用DRPC聚合成报表,通过历史数据对比等判断规则,触发预警事件。
  • 阿里妈妈-用户画像:实时计算用户的兴趣数据
    为了更加精准的投放广告,阿里妈妈后台计算引擎需要维护每个用户的兴趣点(理想状态是,你对什么感兴趣,就向你投放哪类广告)。用户兴趣主要基于用户的历史行为、用户的实时查询、用户的实时点击、用户的地理位置信息而得,其中实时查询、实时点击等用户行为都是实时数据。考虑到系统的实时性,阿里妈妈使用Storm维护用户兴趣数据,并在此基础上进行受众定向的广告投放。

8.Storm核心组件(重要)

  • Nimbus:负责资源分配和任务调度
  • Supervisor:负责接收nimbus分配的任务,启动和停止属于自己管理的worker进程。
  • Worker:运行具体处理组件逻辑的进程。
  • Task:worker中每一个spout/bolt的线程称为一个task。在storm0.8以后,task不再与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。

9.Storm编程模型(重要)

  • Topology:Storm中运行的一个实时应用程序的名称。(拓扑)
  • Spout:在一个topology中获取源数据流的组件。
    通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。
  • Bolt:接收数据然后执行处理的组件,用户可以在其中执行自己想要的操作。
  • Tuple:一次消息传递的基本单元,理解为一组消息就是一个Tuple。
  • Stream:表示数据的流向

Storm里面有7种类型的stream grouping:

  • Shuffle Grouping:随机分组,随机派发stream里面的Tuple,保证每个bolt接收到的Tuple数目大致相同。
  • Fields Grouping:按字段分组,比如按userID来分组,具有同样userID的Tuple会被分到相同的bolts的一个task,而不同的userID则会被分配到不同的bolts里的task。
  • All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
  • Global Grouping:全局分组,这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给ID值最低的那个task。
  • Non Grouping:不分组,这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果,有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
  • Direct Grouping:直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接受者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的ID(OutputCollector.emit方法也会返回task的ID)。
  • Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发送给这些tasks。否则,和普通的Shuffle Grouping行为一致。

10.流式计算一般架构图(重要)

网站系统 <– Flume集群 –> Kafka集群 –> Storm集群 –> Redis集群

  • 其中Flume用来获取数据
  • Kafka用来临时保存数据
  • Storm用来计算数据
  • Redis是个内存数据库,用来保存数据

 

二、Storm集群的部署

1.添加hosts

这里我们使用之前搭建的Hadoop集群来进行安装,为了方便管理,我们仍需要为每一台机器添加新的域名storm**。

(如果是在新的集群上面部署,需要做的还有:关闭防火墙、配置ssh免密码传输、修改hostname)

2.Zookeeper的部署

Storm的部署依赖ZooKeeper,它是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
ZooKeeper包含一个简单的原语集,提供Java和C的接口。

 

下载Zookeeper:

华中科技大学开源镜像站:http://mirror.hust.edu.cn/apache/zookeeper/stable/

下载之后,我们将其安装到/usr/zookeeper路径:

 

配置环境变量:

接下来依然需要配置环境变量,

 

配置Zookeeper:

首先创建配置文件:

接下来需要设定:

  • data和log文件夹的路径
  • 每个server的编号、地址、通信端口,其形式为 server.A = B:C:D ,A代表的是服务器的编号,B代表的是服务器的IP或域名,C是服务器与Leader通信的端口号,D是在Leader挂掉之后用来选举Leader的通信端口号。

接下来需要人为地建立data和log文件夹

将profile、hosts、Zookeeper通过ssh传输到集群中的其他机器上:

最后一步,对每一个机器,刷新其hosts、profile,并且在$Zookeeper_HOME/data文件夹下创建myid文件,文件中的内容为当前机器的server编号,需要修改

之后,对每一台机器,都要启动Zookeeper:

都启动之后,可以查看当前机器是leader还是follower:

 

3.Storm的部署

下载并安装Storm:

华中科技大学开源镜像站:http://mirror.hust.edu.cn/apache/storm/

配置环境变量:

配置Storm:

进入$STORM_HOME/conf中配置:

在其中配置如下信息:

  • Zookeeper服务器地址
  • Zookeeper服务器端口
  • Storm的缓存文件存放路径data
  • 可以作为nimbus的候选机器
  • 每个supervisor的开放端口,一个端口对应一个worker

之后需要手动创建data文件夹

再将profile、Storm通过ssh传输到集群中的其他机器上:

然后就可以启动storm了:

对于一号机,同时启动nimbus和ui:

对于其他机器,启动supervisor:

 

1.nohup

用途:不挂断地运行命令。

语法:nohup Command [ Arg … ] [ & ]

无论是否将 nohup 命令的输出重定向到终端,输出都将附加到当前目录的 nohup.out 文件中。

如果当前目录的 nohup.out 文件不可写,输出重定向到 $HOME/nohup.out 文件中。

如果没有文件能创建或打开以用于追加,那么 Command 参数指定的命令不可调用。

退出状态:该命令返回下列出口值:
126 可以查找但不能调用 Command 参数指定的命令。
127 nohup 命令发生错误或不能查找由 Command 参数指定的命令。
否则,nohup 命令的退出状态是 Command 参数指定命令的退出状态。
2.&

用途:在后台运行

一般两个一起用

nohup command &

之后用jps查看一下进程,

然后在浏览器访问一下UI验证storm集群的情况:

 

三、WordCount分析(重点掌握)

1.功能说明

设计一个topology,来实现对文档里面的单词出现的频率进行统计。整个topology分为三个部分:

  • RandomSentenceSpout:数据源,在已知的英文句子中,随机发送一条句子出去。
  • SplitSentenceBolt:负责将单行文本记录(句子)切分成单词
  • WordCountBolt:负责对单词的频率进行累加

2.项目主要流程

首先是驱动类:

接下来是源数据流Spout:

 

接下来是划分的Bolt,splitSentenceSpout:

接下来是统计的Bolt,WordCountBolt:

 

3.Storm常用命令操作与自带的WordCountTopology

我们先运行一下系统自带的例子。

可以看到我们运行了jar包里的WordCountTopology这个topology,并将其命名为wordcount

Storm常用操作命令

1、任务提交命令:storm jar jar路径 拓扑包名.拓扑类名 拓扑名称

与hadoop不同的是:不需要指定输入输出路径

2、杀死任务命令:storm kill 拓扑名称 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)

3、停用任务命令:storm deactive 拓扑名称

我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。

4、启用任务命令:storm activate 拓扑名称

5、重新部署任务命令:storm rebalance 拓扑名称

再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配worker,并重启拓扑。

我们提交任务后,可以通过storm01:8080访问UI,查看相关信息。

并且在worker机器里的logs下追踪对应的Log输出,例如:

 

四、补充

1.任务分配分解

2.任务提交流程

1、client:用户提交任务
2、nimbus:分析用户配置文件
Submitting topology wordcount in distributed mode with conf {“topology.workers”:3,”topology.debug”:true}
worker数量, 有哪些task,给task进行编号
object = new RandomSpout() —->序列化后得到object的序列化文件
3、nimbus:指定哪些task启动在哪些supervisor上,
找supervisor上空闲的worker 通过端口号查询
4、nimubs:分配那个task运行在哪些worker
5、nimubus:将任务信息存放到zookeeper上
6、supervisor:监听zookeeper上的任务信息 可能是zookeeper上的/storms 因为storms目录存储正在运行的topology信息。
7、supervisor: 通过分析任务信息,确定自己要启动哪些worker。启动worker。
8、worker:分析任务配置文件,确定自己启动哪些task。
object instance of component(Spout/Bolt)
反序列化对象之后,通过线程池提交任务
Exectors.newxxxxxPool(num)
submit.newThread( 执行业务逻辑的对象 )

创建一个对象有几种方式?
1、new
2、反射
3、克隆
4、序列化反序列化

 

 

3.Storm总结

Storm组件
Nimbus:任务管理及分配
Supervisor:管理worker,启动或停止worker。
Worker:负责具体的任务执行,启动task。
Task:拓扑设置的任务信息,包括两种Spout/Bolt
Spout.open() Spout.netTuple()
Bolt.prepare() Bolt.execute(tuple)
Tuple:元组,storm中用来传输消息的最基本单元。
Storm 任务提交流程
1、客户端其提交任务
2、nimbus分配任务信息保存到zk上。
3、supervior根据topology任务信息,启动worker
4、worker根据topology任务信息,启动tasks。
5、任务一旦启动,永不停止。
Storm 通信机制
1、内部通信机制是disruptor的无锁队列,每一个task有incoming queue,outqueue。
2、外部通信机制是netty,每一个worker 有revQueue、outQueue。
Storm消息容错 ack-fail 可以选择是否开启
1、tuple树,每个消息创建的时候发送一个messageId,应答时发送一个messageId。
2、对tuple树上的所有messageID,进行异或计算。最后得到0,为成功。
3、影响ack-fail的因素
—->默认超时时间30秒
—->Config.TOPOLOGY_ACKERS设置为0
Storm并发机制设置
1、先设置worker数量,setNumWorkers()
2、每个component的并发度设置尽可能的是worker数的倍数
3、并发度的多少是根据业务量的。
Storm 目录树
1、本地目录树
2、zookeeper目录树

 

 

 

 

Tags:


Leave a Reply

Your email address will not be published.

*
*
*

Insert math as
Block
Inline
Additional settings
Formula color
Text color
#333333
Type math using LaTeX
Preview
\({}\)
Nothing to preview
Insert