diff --git a/app/models/place/PlaceImporter.scala b/app/models/place/PlaceImporter.scala index 4399d28bf..0c96f3258 100644 --- a/app/models/place/PlaceImporter.scala +++ b/app/models/place/PlaceImporter.scala @@ -12,6 +12,8 @@ trait PlaceImporter { self: PlaceStore with GeoTagStore => // Maximum number of times a gazetteer record or place link update will be retried in case of failure private def MAX_RETRIES = 5 + + private val BACKOFF_MS = 1000 /** Retrieves all places in the store that will be affected from adding the record **/ private[place] def getAffectedPlaces(normalizedRecord: GazetteerRecord)(implicit context: ExecutionContext): Future[Seq[(Place, Long)]] = { @@ -147,7 +149,40 @@ trait PlaceImporter { self: PlaceStore with GeoTagStore => } } - /** TODO chain the Futures properly instead of using Await! **/ + def importRecords(records: Seq[GazetteerRecord], retries: Int = MAX_RETRIES)(implicit ctx: ExecutionContext): Future[Seq[GazetteerRecord]] = + records.foldLeft(Future.successful(Seq.empty[GazetteerRecord])) { case (f, next) => + f.flatMap { failed => + importRecord(next).map { success => + if (success) failed + else next +: failed + } + } + } flatMap { failedRecords => + Logger.info(s"Imported ${(records.size - failedRecords.size)} records") + if (failedRecords.size > 0 && retries > 0) { + Logger.warn(s"${failedRecords.size} gazetteer records failed to import - retrying") + + // Start first retry immediately and then increases wait time for each subsequent retry + val backoff = (MAX_RETRIES - retries) * BACKOFF_MS + if (backoff > 0) { + Logger.info(s"Waiting... ${backoff}ms") + Thread.sleep(backoff) + } + + Logger.debug("Retrying now.") + importRecords(failedRecords, retries - 1) + } else { + if (failedRecords.size > 0) { + Logger.error(s"${failedRecords.size} gazetteer records failed without recovery") + failedRecords.foreach(record => Logger.error(record.toString)) + } else { + Logger.info("No failed imports") + } + Future.successful(failedRecords) + } + } + + /** TODO chain the Futures properly instead of using Await! ** def importRecords(records: Seq[GazetteerRecord], retries: Int = MAX_RETRIES)(implicit context: ExecutionContext): Future[Seq[GazetteerRecord]] = Future { records.map { record => @@ -172,6 +207,6 @@ trait PlaceImporter { self: PlaceStore with GeoTagStore => Future.successful(failedRecords) } - } + }*/ }