Skip to content

Commit

Permalink
Upgraded batch importer to Peripleo-style version
Browse files Browse the repository at this point in the history
  • Loading branch information
Rainer Simon committed Dec 21, 2017
1 parent e0bf92c commit 6a021f7
Showing 1 changed file with 37 additions and 2 deletions.
39 changes: 37 additions & 2 deletions app/models/place/PlaceImporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ trait PlaceImporter { self: PlaceStore with GeoTagStore =>

// Maximum number of times a gazetteer record or place link update will be retried in case of failure
private def MAX_RETRIES = 5

private val BACKOFF_MS = 1000

/** Retrieves all places in the store that will be affected from adding the record **/
private[place] def getAffectedPlaces(normalizedRecord: GazetteerRecord)(implicit context: ExecutionContext): Future[Seq[(Place, Long)]] = {
Expand Down Expand Up @@ -147,7 +149,40 @@ trait PlaceImporter { self: PlaceStore with GeoTagStore =>
}
}

/** TODO chain the Futures properly instead of using Await! **/
def importRecords(records: Seq[GazetteerRecord], retries: Int = MAX_RETRIES)(implicit ctx: ExecutionContext): Future[Seq[GazetteerRecord]] =
records.foldLeft(Future.successful(Seq.empty[GazetteerRecord])) { case (f, next) =>
f.flatMap { failed =>
importRecord(next).map { success =>
if (success) failed
else next +: failed
}
}
} flatMap { failedRecords =>
Logger.info(s"Imported ${(records.size - failedRecords.size)} records")
if (failedRecords.size > 0 && retries > 0) {
Logger.warn(s"${failedRecords.size} gazetteer records failed to import - retrying")

// Start first retry immediately and then increases wait time for each subsequent retry
val backoff = (MAX_RETRIES - retries) * BACKOFF_MS
if (backoff > 0) {
Logger.info(s"Waiting... ${backoff}ms")
Thread.sleep(backoff)
}

Logger.debug("Retrying now.")
importRecords(failedRecords, retries - 1)
} else {
if (failedRecords.size > 0) {
Logger.error(s"${failedRecords.size} gazetteer records failed without recovery")
failedRecords.foreach(record => Logger.error(record.toString))
} else {
Logger.info("No failed imports")
}
Future.successful(failedRecords)
}
}

/** TODO chain the Futures properly instead of using Await! **
def importRecords(records: Seq[GazetteerRecord], retries: Int = MAX_RETRIES)(implicit context: ExecutionContext): Future[Seq[GazetteerRecord]] =
Future {
records.map { record =>
Expand All @@ -172,6 +207,6 @@ trait PlaceImporter { self: PlaceStore with GeoTagStore =>
Future.successful(failedRecords)
}
}
}*/

}

0 comments on commit 6a021f7

Please sign in to comment.