Storm MongoDB接口怎么使用


本篇内容介绍了“Storm MongoDB接口怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!整体的Storn接口分为以下的几个class1:MongoBolt.java2 : MongoSpout.java3 : MongoTailableCursorTopology.java4 : SimpleMongoBolt.java看代码说话:12 :
*WARNING:Youcanonlyusetailablecursorsoncappedcollections.
*
*@authorDanBeaulieu
*
*/

//在这里,抽象的过程中,依旧保持了第一层的Spout为一个抽象类,MongoSpout为abstract的一个抽象类,子类在继承这//个类的过程之中实现特定的方法即可
//这里还有一个类似Cursor的操作。

publicabstractclassMongoSpoutextendsBaseRichSpout{

privateSpoutOutputCollectorcollector;

privateLinkedBlockingQueuequeue;
privatefinalAtomicBooleanopened=newAtomicBoolean(false);

privateDBmongoDB;
privatefinalDBObjectquery;

privatefinalStringmongoHost;
privatefinalintmongoPort;
privatefinalStringmongoDbName;
privatefinalStringmongoCollectionName;

publicMongoSpout(StringmongoHost,intmongoPort,StringmongoDbName,StringmongoCollectionName,DBObjectquery){

this.mongoHost=mongoHost;
this.mongoPort=mongoPort;
this.mongoDbName=mongoDbName;
this.mongoCollectionName=mongoCollectionName;
this.query=que开发云主机域名ry;
}

classTailableCursorThreadextendsThread{

//内部类TailableCursorThread线程

//注意在其中我们使用了LinkedBlockingQueue的对象,有关java高并发的集合类,请参考本ID的【Java集合类型的博文】博文。
LinkedBlockingQueuequeue;
StringmongoCollectionName;
DBmongoDB;
DBObjectquery;

publicTailableCursorThread(LinkedBlockingQueuequeue,DBmongoDB,StringmongoCollectionName,DBObjectquery){

this.queue=queue;
this.mongoDB=mongoDB;
this.mongoCollectionName=mongoCollectionName;
this.query=query;
}

publicvoidrun(){

while(opened.get()){
try{
//createthecursor
mongoDB.requestStart();
finalDBCursorcursor=mongoDB.getCollection(mongoCollectionName)
.find(query)
.sort(newBasicDBObject(“$natural”,1))
.addOption(Bytes.QUERYOPTION_TAILABLE)
.addOption(Bytes.QUERYOPTION_AWAITDATA);
try{
while(opened.get()&&cursor.hasNext()){
finalDBObjectdoc=cursor.next();

if(doc==null)break;

queue.put(doc);
}
}finally{
try{
if(cursor!=null)cursor.close();
}catch(finalThrowablet){}
try{
mongoDB.requestDone();
}catch(finalThrowablet){}
}

Utils.sleep(500);
}catch(finalMongoException.CursorNotFoundcnf){
//rethrowonlyifsomethingwentwrongwhileweexpectthecursortobeopen.
if(opened.get()){
throwcnf;
}
}catch(InterruptedExceptione){break;}
}
};
}

@Override
publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){

this.collector=collector;
this.queue=newLinkedBlockingQueue(1000);
try{
this.mongoDB=newMongoClient(this.mongoHost,this.mongoPort).getDB(this.mongoDbName);
}catch(Exceptione){
thrownewRuntimeException(e);
}

TailableCursorThreadlistener=newTailableCursorThread(this.queue,this.mongoDB,this.mongoCollectionName,this.query);
this.opened.set(true);
listener.start();
}

@Override
publicvoidclose(){
this.opened.set(false);
}

@Override
publicvoidnextTuple(){

DBObjectdbo=this.queue.poll();
if(dbo==null){
Utils.sleep(50);
}else{
this.collector.emit(dbObjectToStormTuple(dbo));
}
}

@Override
publicvoidack(ObjectmsgId){
//TODOAuto-generatedmethodstub
}

@Override
publicvoidfail(ObjectmsgId){
//TODOAuto-generatedmethodstub
}

publicabstractListdbObjectToStormTuple(DBObjectmessage);

}

“Storm MongoDB接口怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注开发云网站,小编将为大家输出更多高质量的实用文章!

“Storm MongoDB接口怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注开发云网站,小编将为大家输出更多高质量的实用文章!

相关推荐: 虚拟主机为什么没有web

虚拟主机为什么没有web?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。web是虚拟主机的网站根目录,购买虚拟主机后,通过FTP上传工具连接虚拟主机,将网站代码传上web文件夹就可以…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

Like (0)
Donate 微信扫一扫 微信扫一扫
Previous 05/17 16:26
Next 05/17 16:26

相关推荐