重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章给大家介绍Nutch如何解析Html文档,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名与空间、虚拟空间、营销软件、网站建设、剑阁网站维护、网站推广。
解析Html文档 MapReduce任务描述
一、主程序调用
ParseSegment parseSegment = new ParseSegment(getConf());
if (!Fetcher.isParsing(job)) {
parseSegment.parse(segs[0]); // parse it, if needed
}
(1)、isParseing方法的实现
public static boolean isParsing(Configuration conf) {
return conf.getBoolean("fetcher.parse", true);
}
(2)、参数segs[0]
Path[] segs = generator.generate(
crawlDb,
segments,
-1,
topN,
System.currentTimeMillis());
generate方法中 generatedSegments的生成过程。
这里疑点比较多,先放一放
// read the subdirectories generated in the temp 读取temp中生成的子文件夹
// output and turn them into segments 输出并且把他们转到segments中
//1、创建Path的集合对象
List
//2、
FileStatus[] status = fs.listStatus(tempDir);// 这里读取上面生成的多个fetchlist的segment
try {
for (FileStatus stat : status) {
Path subfetchlist = stat.getPath();
if (!subfetchlist.getName().startsWith("fetchlist-"))
continue;// 过滤不是以fetchlist-开头的文件
// start a new partition job for this segment
Path newSeg = partitionSegment(fs, segments, subfetchlist,
numLists);
// 对segment进行Partition操作,产生一个新的目录
generatedSegments.add(newSeg);
}
} catch (Exception e) {
LOG.warn("Generator: exception while partitioning segments, exiting ...");
fs.delete(tempDir, true);
return null;
}
二、job任务配置
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ParseSegment.class);
job.setReducerClass(ParseSegment.class);
FileOutputFormat.setOutputPath(job, segment);
job.setOutputFormat(ParseOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ParseImpl.class);
JobClient.runJob(job);
三、Map、reduce任务的输入和输出
map任务输入和输出
输入: WritableComparable/ Content
输出: Text/ ParseImpl
public void map(WritableComparable> key, Content content,
OutputCollector
reduce任务输入和输出
输入: Text/Iterator
输出: Text/Writable
public void reduce(Text key, Iterator
OutputCollector
四、job任务输入类SequenceFileInputFormat
protected FileStatus[] listStatus(JobConf job) throws IOException {
FileStatus[] files = super.listStatus(job);
for (int i = 0; i < files.length; i++) {
FileStatus file = files[i];
if (file.isDir()) { // it's a MapFile
Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
FileSystem fs = file.getPath().getFileSystem(job);
// use the data file
files[i] = fs.getFileStatus(dataFile);
}
}
return files;
}
public RecordReader
JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(split.toString());
return new SequenceFileRecordReader
}
五、map()方法和reduce()方法中的实现
(1)、map任务
org.apache.nutch.parse.ParseSegment
public void map(WritableComparable> key, Content content,
OutputCollector
throws IOException {
// convert on the fly from old UTF8 keys
if (key instanceof Text) {
newKey.set(key.toString());
key = newKey;
}
//2、 获取抓取状态,
//Nutch.FETCH_STATUS_KEY)——> _fst_
int status =
Integer.parseInt(content.getMetadata().get(Nutch.FETCH_STATUS_KEY));
//3、如果成功抓取,如果没有抓取成功,就跳过这条记录
if (status != CrawlDatum.STATUS_FETCH_SUCCESS) {
// content not fetched successfully, skip document
LOG.debug("Skipping " + key + " as content is not fetched successfully");
return;
}
//4、判断是否试过截断,文档中是否被截断,如果要跳过截断,且文档是被截断了,也跳过这条记录
if (skipTruncated && isTruncated(content)) {
return;
}
ParseResult parseResult = null;
try {
//5、创建一个ParseUtil对象,调用解析方法parse,并返回一个解析结果ParseResult
parseResult = new ParseUtil(getConf()).parse(content);
} catch (Exception e) {
LOG.warn("Error parsing: " + key + ": " +StringUtils.stringifyException(e));
return;
}
//以上主要是解析,一下是对解析的处理
//————————————————————————————————————————————————————————————————————————————————
//6、遍历上一步解析得到的解析结果,
for (Entry
//7、获取键和值
Text url = entry.getKey();
Parse parse = entry.getValue();
//8、获取状态
ParseStatus parseStatus = parse.getData().getStatus();
long start = System.currentTimeMillis();
//9、计数器+1
reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[parseStatus.getMajorCode()], 1);
//10、如果解析不成功,将parse持有的所有对象都置为null。
if (!parseStatus.isSuccess()) {
LOG.warn("Error parsing: " + key + ": " + parseStatus);
parse = parseStatus.getEmptyParse(getConf());
}
// pass segment name to parse data
//11、将segment名称赋值给parse data
parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
getConf().get(Nutch.SEGMENT_NAME_KEY));
// compute the new signature
//12、计算新的分值
byte[] signature =
SignatureFactory.getSignature(getConf()).calculate(content, parse);
//13、设置digest的值
parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY,
StringUtil.toHexString(signature));
try {
scfilters.passScoreAfterParsing(url, content, parse);
} catch (ScoringFilterException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Error passing score: "+ url +": "+e.getMessage());
}
}
long end = System.currentTimeMillis();
LOG.info("Parsed (" + Long.toString(end - start) + "ms):" + url);
output.collect(url, new ParseImpl(new ParseText(parse.getText()),
parse.getData(), parse.isCanonical()));
}
}
ParseResult对象的特点
a、实现了Iterable接口 ,可迭代;迭代对象是Entry,entry的key是Text,值是Parse,如下
public class ParseResult implements Iterable
迭代方法如下,
public Iterator
return parseMap.entrySet().iterator();
}
可以看到是使用了Map结合的迭代器
b、持有一个HashMap用于存放解析结果,另持有当前的url,代码如下
private Map
private String originalUrl;
Parse和ParseImpl的说明
Parse是一个接口,其中的3个方法如下
/** The textual(正文) content of the page. This is indexed, searched, and used when generating snippets.*/
//网页正文内容,将被索引,搜索,生成快照
String getText();
/** Other data extracted from the page. */
//冲网页中提取的其他数据
ParseData getData();
/** Indicates if the parse is coming from a url or a sub-url */
//标识这个Parse是否来自于url或者一个子url
boolean isCanonical();//Canonical:正规的
ParseImpl实现了Parse和Writable接口
ParseImpl中有3个字段,如下所示,其中isCanonical是在构造的时候传入的,默认是true
private ParseText text;
private ParseData data;
private boolean isCanonical;//规则
用于去重的digest的计算
//12、计算新的分值
byte[] signature =
SignatureFactory.getSignature(getConf()).calculate(content, parse);
//13、设置digest的值
parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY,
StringUtil.toHexString(signature));
Signature:n.签名; 署名; 识别标志,鲜明特征; [医]药的用法说明;
SignatureFactory类中的getSignature方法
该方法看ObjectCache中有没有Signature的实现,如果没有就利用反射创建一个并返回。
/** Return the default Signature implementation. */
public static Signature getSignature(Configuration conf) {
String clazz = conf.get("db.signature.class", MD5Signature.class.getName());
ObjectCache objectCache = ObjectCache.get(conf);
Signature impl = (Signature)objectCache.getObject(clazz);
if (impl == null) {
try {
if (LOG.isInfoEnabled()) {
LOG.info("Using Signature impl: " + clazz);
}
Class> implClass = Class.forName(clazz);
impl = (Signature)implClass.newInstance();
impl.setConf(conf);
objectCache.setObject(clazz, impl);
} catch (Exception e) {
throw new RuntimeException("Couldn't create " + clazz, e);
}
}
return impl;
}
***** 重要
计算网页特征,最后是调用了Signature的calculate方法,以下是Signature实现类MD5Signaure的类代码
/**
* Default implementation of a page signature.
默认的Signature的实现类
* It calculates an MD5 hash of the raw binary content of a page.
它计算page内容的原始的二进制代码的md5 hash值
* In case there is no content,
* it calculates a hash from the page's URL.
*
* @author Andrzej Bialecki <ab@getopt.org>
*/
public class MD5Signature extends Signature {
public byte[] calculate(Content content, Parse parse) {
byte[] data = content.getContent();
if (data == null) data = content.getUrl().getBytes();
return MD5Hash.digest(data).getDigest();
}
}
(2)、reduce任务
未完待续
多线程解析
//解析是多线程地
private ParseResult runParser(Parser p, Content content) {
ParseCallable pc = new ParseCallable(p, content);
Future
ParseResult res = null;
try {
res = task.get(maxParseTime, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("Error parsing " + content.getUrl() + " with " + p, e);
task.cancel(true);
} finally {
pc = null;
}
return res;
}
关于Nutch如何解析Html文档就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。