写在前面
关于WebMagic这应该是最后一篇博文了,这一篇相对也简单一些
Pipeline & PageProcesser
这两部分是应该程序员自己实现的部分,因为PageProcesser关乎如何解析页面而Pipeline则是存储,推荐使用OOSpider也就是注解式编程。
Downloader
public interface Downloader {
/** * Downloads web pages and store in Page object. * * @param request request * @param task task * @return page */
public Page download(Request request,Task task);
/** * Tell the downloader how many threads the spider used. * @param threadNum number of threads */
public void setThread(int threadNum);
}
主要的实现类又3个,我只重点介绍一下HttpClientDownloader,有兴趣的可以自己看看源码
@ThreadSafe
public class HttpClientDownloader extends AbstractDownloader {
private Logger logger = LoggerFactory.getLogger(getClass());
private final Map<String,CloseableHttpClient> httpClients = new HashMap<String,CloseableHttpClient>();
private HttpClientGenerator httpClientGenerator = new HttpClientGenerator();
private CloseableHttpClient getHttpClient(Site site,Proxy proxy) {
if (site == null) {
return httpClientGenerator.getClient(null,proxy);
}
String domain = site.getDomain();
CloseableHttpClient httpClient = httpClients.get(domain);
if (httpClient == null) {
synchronized (this) {
httpClient = httpClients.get(domain);
if (httpClient == null) {
httpClient = httpClientGenerator.getClient(site,proxy);
httpClients.put(domain,httpClient);
}
}
}
return httpClient;
}
@Override
public Page download(Request request,Task task) {
Site site = null;
if (task != null) {
site = task.getSite();
}
Set<Integer> acceptStatCode;
String charset = null;
Map<String,String> headers = null;
if (site != null) {
acceptStatCode = site.getAcceptStatCode();
charset = site.getCharset();
headers = site.getHeaders();
} else {
acceptStatCode = Sets.newHashSet(200);
}
logger.info("downloading page {}",request.getUrl());
CloseableHttpResponse httpResponse = null;
int statusCode=0;
try {
HttpHost proxyHost = null;
Proxy proxy = null; //TODO
if (site.getHttpProxyPool() != null && site.getHttpProxyPool().isEnable()) {
proxy = site.getHttpProxyFromPool();
proxyHost = proxy.getHttpHost();
} else if(site.getHttpProxy()!= null){
proxyHost = site.getHttpProxy();
}
HttpUriRequest httpUriRequest = getHttpUriRequest(request,site,headers,proxyHost);
httpResponse = getHttpClient(site,proxy).execute(httpUriRequest);��֤
statusCode = httpResponse.getStatusLine().getStatusCode();
request.putExtra(Request.STATUS_CODE,statusCode);
if (statusAccept(acceptStatCode,statusCode)) {
Page page = handleResponse(request,charset,httpResponse,task);
onSuccess(request);
return page;
} else {
logger.warn("code error " + statusCode + "\t" + request.getUrl());
return null;
}
} catch (IOException e) {
logger.warn("download page " + request.getUrl() + " error",e);
if (site.getCycleRetryTimes() > 0) {
return addToCycleRetry(request,site);
}
onError(request);
return null;
} finally {
request.putExtra(Request.STATUS_CODE,statusCode);
if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
site.returnHttpProxyToPool((HttpHost) request.getExtra(Request.PROXY),(Integer) request
.getExtra(Request.STATUS_CODE));
}
try {
if (httpResponse != null) {
//ensure the connection is released back to pool
EntityUtils.consume(httpResponse.getEntity());
}
} catch (IOException e) {
logger.warn("close response fail",e);
}
}
}
@Override
public void setThread(int thread) {
httpClientGenerator.setPoolSize(thread);
}
protected boolean statusAccept(Set<Integer> acceptStatCode,int statusCode) {
return acceptStatCode.contains(statusCode);
}
protected HttpUriRequest getHttpUriRequest(Request request,Site site,Map<String,String> headers,HttpHost proxy) {
RequestBuilder requestBuilder = selectRequestMethod(request).setUri(request.getUrl());
if (headers != null) {
for (Map.Entry<String,String> headerEntry : headers.entrySet()) {
requestBuilder.addHeader(headerEntry.getKey(),headerEntry.getValue());
}
}
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectionRequestTimeout(site.getTimeOut())
.setSocketTimeout(site.getTimeOut())
.setConnectTimeout(site.getTimeOut())
.setCookieSpec(CookieSpecs.BEST_MATCH);
if (proxy !=null) {
requestConfigBuilder.setProxy(proxy);
request.putExtra(Request.PROXY,proxy);
}
requestBuilder.setConfig(requestConfigBuilder.build());
return requestBuilder.build();
}
protected RequestBuilder selectRequestMethod(Request request) {
String method = request.getMethod();
if (method == null || method.equalsIgnoreCase(HttpConstant.Method.GET)) {
//default get
return RequestBuilder.get();
} else if (method.equalsIgnoreCase(HttpConstant.Method.POST)) {
RequestBuilder requestBuilder = RequestBuilder.post();
NameValuePair[] nameValuePair = (NameValuePair[]) request.getExtra("nameValuePair");
if (nameValuePair != null && nameValuePair.length > 0) {
requestBuilder.addParameters(nameValuePair);
}
return requestBuilder;
} else if (method.equalsIgnoreCase(HttpConstant.Method.HEAD)) {
return RequestBuilder.head();
} else if (method.equalsIgnoreCase(HttpConstant.Method.PUT)) {
return RequestBuilder.put();
} else if (method.equalsIgnoreCase(HttpConstant.Method.DELETE)) {
return RequestBuilder.delete();
} else if (method.equalsIgnoreCase(HttpConstant.Method.TRACE)) {
return RequestBuilder.trace();
}
throw new IllegalArgumentException("Illegal HTTP Method " + method);
}
protected Page handleResponse(Request request,String charset,HttpResponse httpResponse,Task task) throws IOException {
String content = getContent(charset,httpResponse);
Page page = new Page();
page.setRawText(content);
page.setUrl(new PlainText(request.getUrl()));
page.setRequest(request);
page.setStatusCode(httpResponse.getStatusLine().getStatusCode());
return page;
}
protected String getContent(String charset,HttpResponse httpResponse) throws IOException {
if (charset == null) {
byte[] contentBytes = IoUtils.toByteArray(httpResponse.getEntity().getContent());
String htmlCharset = getHtmlCharset(httpResponse,contentBytes);
if (htmlCharset != null) {
return new String(contentBytes,htmlCharset);
} else {
logger.warn("Charset autodetect Failed,use {} as charset. Please specify charset in Site.setCharset()",Charset.defaultCharset());
return new String(contentBytes);
}
} else {
return IoUtils.toString(httpResponse.getEntity().getContent(),charset);
}
}
protected String getHtmlCharset(HttpResponse httpResponse,byte[] contentBytes) throws IOException {
String charset;
// charset
// 1、encoding in http header Content-Type
String value = httpResponse.getEntity().getContentType().getValue();
charset = UrlUtils.getCharset(value);
if (StringUtils.isNotBlank(charset)) {
logger.debug("Auto get charset: {}",charset);
return charset;
}
// use default charset to decode first time
Charset defaultCharset = Charset.defaultCharset();
String content = new String(contentBytes,defaultCharset.name());
// 2、charset in Meta
if (StringUtils.isNotEmpty(content)) {
Document document = Jsoup.parse(content);
Elements links = document.select("Meta");
for (Element link : links) {
// 2.1、html4.01 <Meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
String MetaContent = link.attr("content");
String MetaCharset = link.attr("charset");
if (MetaContent.indexOf("charset") != -1) {
MetaContent = MetaContent.substring(MetaContent.indexOf("charset"),MetaContent.length());
charset = MetaContent.split("=")[1];
break;
}
// 2.2、html5 <Meta charset="UTF-8" />
else if (StringUtils.isNotEmpty(MetaCharset)) {
charset = MetaCharset;
break;
}
}
}
logger.debug("Auto get charset: {}",charset);
// 3、todo use tools as cpdetector for content decode
return charset;
}
}
其中包括了添加http proxy这部分官方文档都没有介绍,如果需要那就自行看源码吧- -b
再看带那种的这部分
if (statusAccept(acceptStatCode,statusCode)) {
Page page = handleResponse(request,task);
onSuccess(request);
return page;
} else {
logger.warn("code error " + statusCode + "\t" + request.getUrl());
return null;
}
acceptStatCode默认是200,如果出现其他resultCode那么将会直接return null,也不会释放HttpClient的资源,也就是下面的finally块不会被执行。也算是一个bug吧
finally {
request.putExtra(Request.STATUS_CODE,statusCode);
if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
site.returnHttpProxyToPool((HttpHost) request.getExtra(Request.PROXY),(Irequest
.getExtra(Request.STATUS_CODE));
}
try {
if (httpResponse != null) {
//ensure the connection is released back to pool
EntityUtils.consume(httpResponse.getEntity());
}
} catch (IOException e) {
logger.warn("close response fail",e);
}
}
最后
到此为止,所有的关于WebMagic的主体源码都介绍完毕了,如果你需要使用那么目前的知识已经足够了,如果出现bug还是需要自行更改,还好WebMagic给我们提供了更换组件的接口,使用起来还是很方便的。