本篇内容主要讲解“Storm可靠性acker案例分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Storm可靠性acker案例分析”吧!worker进程死掉worker进程挂掉,storm集群会在重新启动一个worker进程。supervisor进程死掉supervisor进程挂掉,不会影响之前已经提交的topology,只是后期不能向这个节点分配任务,因为这个节点已经不是集群的一员了。nimbus进程死掉(存在HA的问题)快速失败nimbus进程挂掉,也不会影响之前已经提交的topology,只是后期不能向集群再提交新的topology了。1.0以下的版本存在HA的问题,1.0之后已经修复了这个问题,可以有多个备选nimbus。节点宕机ack/fail消息确认机制(确保一个tuple被完全处理)在spout中发射tuple的时候需要同时发送messageid,这样才相当于开启了消息确认机制如果你的topology里面的tuple比较多的话, 那么把acker的数量设置多一点,效率会高一点。通过config.setNumAckers(num)来设置一个topology里面的acker的数量,默认值是1。注意: acker用了特殊的算法,使得对于追踪每个spout tuple的状态所需要的内存量是恒定的(20 bytes) (可以了解一下其算法,目前暂时不做这个算法的深入理解,百度storm acker就能找到相关的分析文章)注意:如果一个tuple在指定的timeout(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS默认值为30秒)时间内没有被成功处理,那么这个tuple会被认为处理失败了。完全处理tuple在storm里面一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所衍生的所有的tuple都被成功处理。前面也提到了,如果希望使用qck/fail
确认机制,则需要做下面的事情:根据上面的说明,程序代码如下,注意其中体现的这几点:
*Storm组件:Spout、Bolt、数据是Tuple,使用main中的Topology将spout和bolt进行关联
*MapReduce的组件:Mapper和Reducer、数据是Writable,通过一个main中的job将二者关联
*
*适配器模式(Adapter):BaseRichSpout,其对继承接口中一些没必要的方法进行了重写,但其重写的代码没有实现任何功能。
*我们称这为适配器模式
*
*storm消息确认机制—可靠性分析
*acker
*fail
*/
publicclassAckerSumTopology{
/**
*数据源
*/
staticclassOrderSpoutextendsBaseRichSpout{
privateMapconf;//当前组件配置信息
priva 香港云主机teTopologyContextcontext;//当前组件上下文对象
privateSpoutOutputCollectorcollector;//发送tuple的组件
@Override
publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){
this.conf=conf;
this.context=context;
this.collector=collector;
}
privatelongnum=0;
/**
*接收数据的核心方法
*/
@Override
publicvoidnextTuple(){
StringmessageId=UUID.randomUUID().toString().replaceAll(“-“,””).toLowerCase();
//while(true){
num++;
StormUtil.sleep(1000);
System.out.println(“当前时间”+StormUtil.df_yyyyMMddHHmmss.format(newDate())+”产生的订单金额:”+num);
this.collector.emit(newValues(num),messageId);
//}
}
/**
*是对发送出去的数据的描述schema
*/
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
declarer.declare(newFields(“order_cost”));
}
@Override
publicvoidack(ObjectmsgId){
System.out.println(msgId+”对应的消息被处理成功了”);
}
@Override
publicvoidfail(ObjectmsgId){
System.out.println(msgId+”—->对应的消息被处理失败了”);
}
}
/**
*计算和的Bolt节点
*/
staticclassSumBoltextendsBaseRichBolt{
privateMapconf;//当前组件配置信息
privateTopologyContextcontext;//当前组件上下文对象
privateOutputCollectorcollector;//发送tuple的组件
@Override
publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){
this.conf=conf;
this.context=context;
this.collector=collector;
}
privateLongsumOrderCost=0L;
/**
*处理数据的核心方法
*/
@Override
publicvoidexecute(Tupleinput){
LongorderCost=input.getLongByField(“order_cost”);
sumOrderCost+=orderCost;
if(orderCost%10==1){//每10次模拟消息失败一次
collector.fail(input);
}else{
System.out.println(“线程ID:”+Thread.currentThread().getId()+”,商城网站到目前”+StormUtil.df_yyyyMMddHHmmss.format(newDate())+”的商品总交易额”+sumOrderCost);
collector.ack(input);
}
StormUtil.sleep(1000);
}
/**
*如果当前bolt为最后一个处理单元,该方法可以不用管
*/
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
}
}
/**
*构建拓扑,相当于在MapReduce中构建Job
*/
publicstaticvoidmain(String[]args)throwsException{
TopologyBuilderbuilder=newTopologyBuilder();
/**
*设置spout和bolt的dag(有向无环图)
*/
builder.setSpout(“id_order_spout”,newOrderSpout());
builder.setBolt(“id_sum_bolt”,newSumBolt(),1)
.shuffleGrouping(“id_order_spout”);//通过不同的数据流转方式,来指定数据的上游组件
//使用builder构建topology
StormTopologytopology=builder.createTopology();
StringtopologyName=AckerSumTopology.class.getSimpleName();//拓扑的名称
Configconfig=newConfig();//Config()对象继承自HashMap,但本身封装了一些基本的配置
//启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter
if(args==null||args.length
运行后(本地运行或上传到集群上提交作业),输出如下:一般的业务数据存储,最终还是要落地,存储到RDBMS,但是RDBMS无法达到高访问量,能力达不到实时处理,或者说处理能力是有限的,会造成连接中断等问题,为了数据落地,我们可以采取迂回方式,可以采用比如说先缓存到高速内存数据库(如redis),然后再将内存数据库中的数据定时同步到rdbms中,而且可以定期定时来做。可以每隔指定的时间将数据整合一次存入数据库。或者每隔指定的时间执行一些可以在storm中使用定时任务来实现这些定时数据落地的功能,不过需要先了解storm定时任务。在main中设置但是我们一般都会对特定的bolt设置定时任务,而没有必要对全局每一个bolt都发送系统的tuple,这样非常的耗费资源,所以就有了局部定时任务,也是我们常用的。注意:storm会按照用户设置的时间间隔给拓扑中的所有bolt发送系统级别的tuple。在main函数中设置定时器,storm会定时给拓扑中的所有bolt都发送系统级别的tuple,如果只需要给某一个bolt设置定时功能的话,只需要在这个bolt中覆盖getComponentConfiguration方法,里面设置定时间隔即可。测试代码如下:输出:在bolt中使用下面代码判断是否是触发用的bolt如果为true,则执行定时任务需要执行的代码,最后return,如果为false,则执行正常的tuple处理的业务逻辑。即对于需要进行数据落地的bolt,我们可以只给该bolt设置定时任务,这样系统会定时给该bolt发送系统级别的tuple,在我们该bolt的代码中进行判断,如果接收到的是系统级别的bolt,则进行数据落地的操作,比如将数据写入数据库或其它操作等,否则就按照正常的逻辑来执行我们的业务代码。工作中常用这一种方式进行操作。测试程序如下:输出如下:deactive:未激活(暂停)emitted: emitted tuple数transferred: transferred tuple数emitted的区别:如果一个task,emitted一个tuple到2个task中,则 transferred tuple数是emitted tuple数的两倍complete latency: spout emitting 一个tuple到spout ack这个tuple的平均时间(可以认为是tuple以及该tuple树的整个处理时间)process latency: bolt收到一个tuple到bolt ack这个tuple的平均时间,如果没有启动acker机制,那么值为0execute latency:bolt处理一个tuple的平均时间,不包含acker操作,单位是毫秒(也就是bolt 执行 execute 方法的平均时间)capacity:这个值越接近1,说明bolt或者spout基本一直在调用execute方法,说明并行度不够,需要扩展这个组件的executor数量。到此,相信大家对“Storm可靠性acker案例分析”有了更深的了解,不妨来实际操作一番吧!这里是开发云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
这篇文章主要讲解了“Go RWMutex并发怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Go RWMutex并发怎么使用”吧!RWMutex表示读写锁:1.它允许任意读操作同时进行,主要用于读多写少的…
免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。