这篇文章主要介绍nutch中如何实现索引去重,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
一、主程序调用
SolrDeleteDuplicates dedup = new SolrDeleteDuplicates();dedup.setConf(getConf());dedup.dedup(solrUrl);
二、job任务配置
JobConf job = new NutchJob(getConf());
job.setInputFormat(SolrInputFormat.class);job.setMapperClass(IdentityMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(SolrRecord.class);
job.setReducerClass(SolrDeleteDuplicates.class);job.setOutputFormat(NullOutputFormat.class);
JobClient.runJob(job);
三、Map、reduce任务的输入和输出Map任务输入、输出public void map(K key, V val, OutputCollector
reduce任务输入、输出输入:Text/Iterator
public void reduce(Text key, Iterator
四、job任务输入类SolrInputFormat
getSplits方法将所有的文档按照数量平均分片getRecordReader方法中利用solrserver查询了当前分片包含的所有doc记录,solrrecord返回了的当前的RecordReader
(1)、SolrInputFormat的getSplits方法
1、根据job对象的参数,获取solrserver对象。2、构建并执行查询(查询参数:[*:*、id、setRow(1)] ),获取响应对象3、根据响应对象获取索引总数,除以分片数,得到每一片分配多少个索引4、根据分片数创建 SolrInputSplit数组对象,5、根据solr输入分片的开始和结束位置,实例化SolrInputSplit对象
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job);
final SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD); solrQuery.setRows(1);
QueryResponse response; try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); }
int numResults = (int)response.getResults().getNumFound(); int numDocsPerSplit = (numResults / numSplits); int currentDoc = 0; SolrInputSplit[] splits = new SolrInputSplit[numSplits]; for (int i = 0; i
splits[i] = new SolrInputSplit(currentDoc, numDocsPerSplit); currentDoc += numDocsPerSplit; } splits[splits.length – 1] = new SolrInputSplit(currentDoc, numResults – currentDoc);
return splits; }
(2)、SolrInputFormat的getRecordReader()方法
1、获取solrserver对象2、将传入的split参数,强转成SolrInputSplit对象,并获取这个分片的文档总数3、构建查询对象,执行查询(参数[*:*,id,boost,tstamp,digest, SolrInputSplit中的开始位置,文档总数 ])。4、根据响应对象,获取结果集5、对匿名内部内RecordReader做了实现,并且返回
public RecordReader
//1、获取solrserver对象 SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job);
//2、将传入的split参数,强转成SolrInputSplit对象,并获取这个分片的文档总数SolrInputSplit solrSplit = (SolrInputSplit) split; final int numDocs = solrSplit.getNumDocs(); //3、构建查询对象,执行查询(参数[*:*,id,boost,tstamp,digest,SolrInputSplit中的开始位置,文档总数]) SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD, SolrConstants.TIMESTAMP_FIELD, SolrConstants.DIGEST_FIELD); solrQuery.setStart(solrSplit.getDocBegin()); solrQuery.setRows(numDocs);
QueryResponse response;
try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); }
//4、根据响应对象,获取结果集 final SolrDocumentList solrDocs = response.getResults();
return new RecordReader
public void close() throws IOException { }
public Text createKey() { return new Text(); }
public SolrRecord createValue() { return new SolrRecord(); }//获取当前的指针 public long getPos() throws IOException { return currentDoc; }//获取进度 public float getProgress() throws IOException { return currentDoc / (float) numDocs; }//获取下一个 public boolean next(Text key, SolrRecord value) throws IOException { if (currentDoc >= numDocs) { return false; }// SolrDocument doc = solrDocs.get(currentDoc);
//获取摘要 String digest = (String) doc.getFieldValue(SolrConstants.DIGEST_FIELD);//把摘要作为key key.set(digest);//value(SolrRecord)//赋值:通过doc给solrrecord的id,tstamp,boost 3个字段赋值 value.readSolrDocument(doc);//指针加自增1 currentDoc++; return true; } }; }
五、map()方法和reduce()方法中的实现
(1)、map任务(2)、reduce任务
去重逻辑:
reduce任务会遍历每一个record,并执行reduce()方法中的代码reduce()方法中,会遍历处于当前文档之后的所有文档,如果分值和时间都比当前的小,会调用solrj删除这个文档,如果比当前的大,会删除当前的,并把当前的替换成这个大的。
public void reduce(Text key, Iterator
//2、遍历了SolrRecord while (values.hasNext()) {// SolrRecord solrRecord = values.next();
//boost、tstamp参与比较//如果当前的分值, 比保持的分支高,并且时间比保持的新,就根据id删除这条索引, if (solrRecord.getBoost() > recordToKeep.getBoost() || (solrRecord.getBoost() == recordToKeep.getBoost() && solrRecord.getTstamp() > recordToKeep.getTstamp())) { updateRequest.deleteById(recordToKeep.id); recordToKeep = new SolrRecord(solrRecord); } else { updateRequest.deleteById(solrRecord.id); } numDeletes++; reporter.incrCounter(“SolrDedupStatus”, “Deleted documents”, 1); if (numDeletes >= NUM_MAX_DELETE_REQUEST) { try { LOG.info(“SolrDeleteDuplicates: deleting ” + numDeletes + ” duplicates”); updateRequest.process(solr); } catch (SolrServerException e) { throw new IOException(e); } updateRequest = new UpdateRequest(); numDeletes = 0; } } }
六、关于digest
doc中的digest字段,是在IndexerMapReduce类中的reduce方法中加入的// add digest, used by dedupdoc.add(“digest”, metadata.get(Nutch.SIGNATURE_KEY));
Metadata中包含了一个HashMapfinal Metadata metadata = parseData.getContentMeta();以上是“nutch中如何实现索引去重”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注开发云行业资讯频道!
本篇内容主要讲解“MySQL中order by的实现原理是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MySQL中order by的实现原理是什么”吧!全字段排序MySQL会给每个线程分配一块内存用于排序,称…
免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。