本篇内容介绍了“Storm MongoDB接口怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!整体的Storn接口分为以下的几个class1:MongoBolt.java2 : MongoSpout.java3 : MongoTailableCursorTopology.java4 : SimpleMongoBolt.java看代码说话:12 :
*WARNING:Youcanonlyusetailablecursorsoncappedcollections.
*
*@authorDanBeaulieu
*
*/
//在这里,抽象的过程中,依旧保持了第一层的Spout为一个抽象类,MongoSpout为abstract的一个抽象类,子类在继承这//个类的过程之中实现特定的方法即可
//这里还有一个类似Cursor的操作。
publicabstractclassMongoSpoutextendsBaseRichSpout{
privateSpoutOutputCollectorcollector;
privateLinkedBlockingQueue
privatefinalAtomicBooleanopened=newAtomicBoolean(false);
privateDBmongoDB;
privatefinalDBObjectquery;
privatefinalStringmongoHost;
privatefinalintmongoPort;
privatefinalStringmongoDbName;
privatefinalStringmongoCollectionName;
publicMongoSpout(StringmongoHost,intmongoPort,StringmongoDbName,StringmongoCollectionName,DBObjectquery){
this.mongoHost=mongoHost;
this.mongoPort=mongoPort;
this.mongoDbName=mongoDbName;
this.mongoCollectionName=mongoCollectionName;
this.query=que开发云主机域名ry;
}
classTailableCursorThreadextendsThread{
//内部类TailableCursorThread线程
//注意在其中我们使用了LinkedBlockingQueue的对象,有关java高并发的集合类,请参考本ID的【Java集合类型的博文】博文。
LinkedBlockingQueue
StringmongoCollectionName;
DBmongoDB;
DBObjectquery;
publicTailableCursorThread(LinkedBlockingQueue
this.queue=queue;
this.mongoDB=mongoDB;
this.mongoCollectionName=mongoCollectionName;
this.query=query;
}
publicvoidrun(){
while(opened.get()){
try{
//createthecursor
mongoDB.requestStart();
finalDBCursorcursor=mongoDB.getCollection(mongoCollectionName)
.find(query)
.sort(newBasicDBObject(“$natural”,1))
.addOption(Bytes.QUERYOPTION_TAILABLE)
.addOption(Bytes.QUERYOPTION_AWAITDATA);
try{
while(opened.get()&&cursor.hasNext()){
finalDBObjectdoc=cursor.next();
if(doc==null)break;
queue.put(doc);
}
}finally{
try{
if(cursor!=null)cursor.close();
}catch(finalThrowablet){}
try{
mongoDB.requestDone();
}catch(finalThrowablet){}
}
Utils.sleep(500);
}catch(finalMongoException.CursorNotFoundcnf){
//rethrowonlyifsomethingwentwrongwhileweexpectthecursortobeopen.
if(opened.get()){
throwcnf;
}
}catch(InterruptedExceptione){break;}
}
};
}
@Override
publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){
this.collector=collector;
this.queue=newLinkedBlockingQueue
try{
this.mongoDB=newMongoClient(this.mongoHost,this.mongoPort).getDB(this.mongoDbName);
}catch(Exceptione){
thrownewRuntimeException(e);
}
TailableCursorThreadlistener=newTailableCursorThread(this.queue,this.mongoDB,this.mongoCollectionName,this.query);
this.opened.set(true);
listener.start();
}
@Override
publicvoidclose(){
this.opened.set(false);
}
@Override
publicvoidnextTuple(){
DBObjectdbo=this.queue.poll();
if(dbo==null){
Utils.sleep(50);
}else{
this.collector.emit(dbObjectToStormTuple(dbo));
}
}
@Override
publicvoidack(ObjectmsgId){
//TODOAuto-generatedmethodstub
}
@Override
publicvoidfail(ObjectmsgId){
//TODOAuto-generatedmethodstub
}
publicabstractList
}
“Storm MongoDB接口怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注开发云网站,小编将为大家输出更多高质量的实用文章!
“Storm MongoDB接口怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注开发云网站,小编将为大家输出更多高质量的实用文章!虚拟主机为什么没有web?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。web是虚拟主机的网站根目录,购买虚拟主机后,通过FTP上传工具连接虚拟主机,将网站代码传上web文件夹就可以…
免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。