From d3e7f0f17f461ca00ebc45108c60c52977266dc8 Mon Sep 17 00:00:00 2001 From: Norwin Roosen Date: Thu, 6 Sep 2018 14:54:50 +0200 Subject: [PATCH] avoid stalling of crawler #32 disabling es.status.reset.fetchdate.after somehow caused the crawler to not return any new results. another issue remains: a small percentage of pages is crawled again and again, but never indexed nor updated to FETCHED.. --- crawler/es-conf.flux | 2 +- crawler/es-crawler.flux | 2 +- .../org/n52/webcrawl/CollapsingSpout.java | 247 ++++++++++++++++++ 3 files changed, 249 insertions(+), 2 deletions(-) create mode 100644 crawler/src/main/java/org/n52/webcrawl/CollapsingSpout.java diff --git a/crawler/es-conf.flux b/crawler/es-conf.flux index b1047c5..5d44e57 100644 --- a/crawler/es-conf.flux +++ b/crawler/es-conf.flux @@ -52,7 +52,7 @@ config: es.status.global.sort.field: "metadata.depth" # Delay since previous query date (in secs) after which the nextFetchDate value will be reset - #es.status.reset.fetchdate.after: 180 + es.status.reset.fetchdate.after: 30 # CollapsingSpout : limits the deep paging by resetting the start offset for the ES query es.status.max.start.offset: 500 diff --git a/crawler/es-crawler.flux b/crawler/es-crawler.flux index 89992dd..77266a9 100644 --- a/crawler/es-crawler.flux +++ b/crawler/es-crawler.flux @@ -15,7 +15,7 @@ includes: spouts: - id: "spout" - className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.CollapsingSpout" + className: "org.n52.webcrawl.CollapsingSpout" parallelism: 10 bolts: diff --git a/crawler/src/main/java/org/n52/webcrawl/CollapsingSpout.java b/crawler/src/main/java/org/n52/webcrawl/CollapsingSpout.java new file mode 100644 index 0000000..dd810c0 --- /dev/null +++ b/crawler/src/main/java/org/n52/webcrawl/CollapsingSpout.java @@ -0,0 +1,247 @@ +/** + * Licensed to DigitalPebble Ltd under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * DigitalPebble licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.n52.webcrawl; + +import com.digitalpebble.stormcrawler.Metadata; +import com.digitalpebble.stormcrawler.elasticsearch.persistence.AbstractSpout; +import com.digitalpebble.stormcrawler.util.ConfUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Values; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.index.query.InnerHitBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.collapse.CollapseBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; + +/** + * Copy of stormcrawler ES 1.9 class, with patch for https://github.com/DigitalPebble/storm-crawler/issues/595 + **/ +public class CollapsingSpout extends AbstractSpout implements + ActionListener { + + private static final Logger LOG = LoggerFactory + .getLogger(CollapsingSpout.class); + + /** Used to avoid deep paging **/ + private static final String ESMaxStartOffsetParamName = "es.status.max.start.offset"; + + private int lastStartOffset = 0; + private int maxStartOffset = -1; + + @Override + public void open(Map stormConf, TopologyContext context, + SpoutOutputCollector collector) { + maxStartOffset = ConfUtils.getInt(stormConf, ESMaxStartOffsetParamName, + -1); + super.open(stormConf, context, collector); + } + + @Override + protected void populateBuffer() { + // not used yet or returned empty results + if (lastDate == null) { + lastDate = new Date(); + lastStartOffset = 0; + } + // been running same query for too long and paging deep? + else if (maxStartOffset != -1 && lastStartOffset > maxStartOffset) { + LOG.info("Reached max start offset {}", lastStartOffset); + lastStartOffset = 0; + } + + String formattedLastDate = ISODateTimeFormat.dateTimeNoMillis().print( + lastDate.getTime()); + + LOG.info("{} Populating buffer with nextFetchDate <= {}", logIdprefix, + formattedLastDate); + + QueryBuilder queryBuilder = QueryBuilders.rangeQuery("nextFetchDate") + .lte(formattedLastDate); + + if (filterQuery != null) { + queryBuilder = boolQuery().must(queryBuilder).filter( + QueryBuilders.queryStringQuery(filterQuery)); + } + + SearchRequest request = new SearchRequest(indexName).types(docType) + .searchType(SearchType.QUERY_THEN_FETCH); + + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.query(queryBuilder); + sourceBuilder.from(lastStartOffset); + sourceBuilder.size(maxBucketNum); + sourceBuilder.explain(false); + sourceBuilder.trackTotalHits(false); + + // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-preference.html + // _shards:2,3 + if (shardID != -1) { + request.preference("_shards:" + shardID); + } + + if (StringUtils.isNotBlank(totalSortField)) { + sourceBuilder.sort(new FieldSortBuilder(totalSortField) + .order(SortOrder.ASC)); + } + + CollapseBuilder collapse = new CollapseBuilder(partitionField); + + // group expansion -> sends sub queries for each bucket + if (maxURLsPerBucket > 1) { + InnerHitBuilder ihb = new InnerHitBuilder(); + ihb.setSize(maxURLsPerBucket); + ihb.setName("urls_per_bucket"); + // sort within a bucket + if (StringUtils.isNotBlank(bucketSortField)) { + List> sorts = new LinkedList<>(); + FieldSortBuilder bucketsorter = SortBuilders.fieldSort( + bucketSortField).order(SortOrder.ASC); + sorts.add(bucketsorter); + ihb.setSorts(sorts); + } + collapse.setInnerHits(ihb); + } + + sourceBuilder.collapse(collapse); + + request.source(sourceBuilder); + + // dump query to log + LOG.debug("{} ES query {}", logIdprefix, request.toString()); + + timeStartESQuery = System.currentTimeMillis(); + isInESQuery.set(true); + client.searchAsync(request, this); + } + + @Override + public void onFailure(Exception e) { + LOG.error("{} Exception with ES query", logIdprefix, e); + isInESQuery.set(false); + } + + @Override + public void onResponse(SearchResponse response) { + long timeTaken = System.currentTimeMillis() - timeStartESQuery; + + SearchHit[] hits = response.getHits().getHits(); + int numBuckets = hits.length; + + // reset the value for next fetch date if the previous one is too old + if (resetFetchDateAfterNSecs != -1) { + Calendar diffCal = Calendar.getInstance(); + diffCal.setTime(lastDate); + diffCal.add(Calendar.SECOND, resetFetchDateAfterNSecs); + // compare to now + if (diffCal.before(Calendar.getInstance())) { + LOG.info( + "{} lastDate set to null based on resetFetchDateAfterNSecs {}", + logIdprefix, resetFetchDateAfterNSecs); + lastDate = null; + lastStartOffset = 0; + } + } + + int alreadyprocessed = 0; + int numDocs = 0; + + synchronized (buffer) { + for (SearchHit hit : hits) { + Map innerHits = hit.getInnerHits(); + // wanted just one per bucket : no inner hits + if (innerHits == null) { + numDocs++; + if (!addHitToBuffer(hit)) { + alreadyprocessed++; + } + continue; + } + // more than one per bucket + SearchHits inMyBucket = innerHits.get("urls_per_bucket"); + for (SearchHit subHit : inMyBucket.getHits()) { + numDocs++; + if (!addHitToBuffer(subHit)) { + alreadyprocessed++; + } + } + } + + // Shuffle the URLs so that we don't get blocks of URLs from the + // same host or domain + if (numBuckets != numDocs) { + Collections.shuffle((List) buffer); + } + } + + esQueryTimes.addMeasurement(timeTaken); + // could be derived from the count of query times above + eventCounter.scope("ES_queries").incrBy(1); + eventCounter.scope("ES_docs").incrBy(numDocs); + eventCounter.scope("already_being_processed").incrBy(alreadyprocessed); + + LOG.info( + "{} ES query returned {} hits from {} buckets in {} msec with {} already being processed", + logIdprefix, numDocs, numBuckets, timeTaken, alreadyprocessed); + + // no more results? + if (numBuckets == 0) { + lastDate = null; + lastStartOffset = 0; + } + // still got some results but paging won't help + else if (numBuckets < maxBucketNum) { + lastStartOffset = 0; + } else { + lastStartOffset += numBuckets; + } + + // remove lock + isInESQuery.set(false); + } + + private final boolean addHitToBuffer(SearchHit hit) { + Map keyValues = hit.getSourceAsMap(); + String url = (String) keyValues.get("url"); + // is already being processed - skip it! + if (beingProcessed.containsKey(url)) { + return false; + } + Metadata metadata = fromKeyValues(keyValues); + return buffer.add(new Values(url, metadata)); + } + +}