Skip to content

Commit

Permalink
fixes #208 (#209)
Browse files Browse the repository at this point in the history
  • Loading branch information
dev-claw authored Dec 27, 2024
1 parent 095e69d commit c74d711
Show file tree
Hide file tree
Showing 23 changed files with 206 additions and 204 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<koin.version>4.0.0</koin.version>
<h2.version>2.2.224</h2.version>
<htmlcleaner.version>2.29</htmlcleaner.version>
<failsafe.version>2.4.4</failsafe.version>
<failsafe.version>3.3.2</failsafe.version>
<caffeine.version>3.1.8</caffeine.version>
<jna-platform.version>5.14.0</jna-platform.version>
<kotlinx-serialization-json.version>1.7.3</kotlinx-serialization-json.version>
Expand Down Expand Up @@ -142,8 +142,8 @@
<version>${htmlcleaner.version}</version>
</dependency>
<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<groupId>net.jodah</groupId>
<version>${failsafe.version}</version>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion vripper-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
</dependency>
<dependency>
<artifactId>failsafe</artifactId>
<groupId>net.jodah</groupId>
<groupId>dev.failsafe</groupId>
</dependency>
<dependency>
<artifactId>caffeine</artifactId>
Expand Down
8 changes: 4 additions & 4 deletions vripper-core/src/main/kotlin/me/vripper/Module.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package me.vripper

import me.vripper.data.repositories.ImageRepository
import me.vripper.data.repositories.MetadataRepository
import me.vripper.data.repositories.PostDownloadStateRepository
import me.vripper.data.repositories.PostRepository
import me.vripper.data.repositories.ThreadRepository
import me.vripper.data.repositories.impl.ImageRepositoryImpl
import me.vripper.data.repositories.impl.MetadataRepositoryImpl
import me.vripper.data.repositories.impl.PostDownloadStateRepositoryImpl
import me.vripper.data.repositories.impl.PostRepositoryImpl
import me.vripper.data.repositories.impl.ThreadRepositoryImpl
import me.vripper.download.DownloadService
import me.vripper.event.EventBus
Expand All @@ -29,8 +29,8 @@ val coreModule = module {
single<ImageRepository> {
ImageRepositoryImpl()
}
single<PostDownloadStateRepository> {
PostDownloadStateRepositoryImpl()
single<PostRepository> {
PostRepositoryImpl()
}
single<MetadataRepository> {
MetadataRepositoryImpl()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package me.vripper.data.repositories

import me.vripper.entities.PostEntity
import java.util.*

internal interface PostDownloadStateRepository {
internal interface PostRepository {
fun save(postEntities: List<PostEntity>): List<PostEntity>
fun findByPostId(postId: Long): Optional<PostEntity>
fun findById(id: Long): Optional<PostEntity>
fun findByPostId(postId: Long): PostEntity?
fun findById(id: Long): PostEntity?
fun findCompleted(): List<Long>
fun findAll(): List<PostEntity>
fun existByPostId(postId: Long): Boolean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package me.vripper.data.repositories.impl

import me.vripper.data.repositories.PostDownloadStateRepository
import me.vripper.data.repositories.PostRepository
import me.vripper.data.tables.PostTable
import me.vripper.entities.PostEntity
import me.vripper.entities.Status
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.transactions.TransactionManager
import java.sql.Connection
import java.util.*

internal class PostDownloadStateRepositoryImpl :
PostDownloadStateRepository {
internal class PostRepositoryImpl :
PostRepository {

private val delimiter = ";"

Expand All @@ -37,15 +36,11 @@ internal class PostDownloadStateRepositoryImpl :
}.map(::transform)
}

override fun findByPostId(postId: Long): Optional<PostEntity> {
override fun findByPostId(postId: Long): PostEntity? {
val result = PostTable.selectAll().where {
PostTable.postId eq postId
}.map(::transform)
return if (result.isEmpty()) {
Optional.empty()
} else {
Optional.of(result.first())
}
return result.firstOrNull()
}

override fun findCompleted(): List<Long> {
Expand All @@ -54,16 +49,11 @@ internal class PostDownloadStateRepositoryImpl :
}.map { it[PostTable.postId] }
}

override fun findById(id: Long): Optional<PostEntity> {
override fun findById(id: Long): PostEntity? {
val result = PostTable.selectAll().where {
PostTable.id eq id
}.map { transform(it) }

return if (result.isEmpty()) {
Optional.empty()
} else {
Optional.of(result.first())
}
return result.firstOrNull()
}

override fun findAll(): List<PostEntity> {
Expand Down
90 changes: 42 additions & 48 deletions vripper-core/src/main/kotlin/me/vripper/download/DownloadService.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package me.vripper.download

import dev.failsafe.Failsafe
import dev.failsafe.RetryPolicy
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.launch
import me.vripper.entities.ImageEntity
import me.vripper.entities.PostEntity
import me.vripper.entities.Status
Expand All @@ -16,10 +17,8 @@ import me.vripper.services.DataTransaction
import me.vripper.services.RetryPolicyService
import me.vripper.services.SettingsService
import me.vripper.services.VGAuthService
import me.vripper.utilities.GlobalScopeCoroutine
import me.vripper.utilities.LoggerDelegate
import net.jodah.failsafe.Failsafe
import net.jodah.failsafe.RetryPolicy
import me.vripper.utilities.executorService
import org.jetbrains.exposed.sql.transactions.transaction
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
Expand All @@ -46,14 +45,14 @@ internal class DownloadService(
lock.withLock {
candidates.addAll(getCandidates(candidateCount()))
candidates.forEach {
if (canRun(it.context.imageEntity.host)) {
if (canRun(it.imageEntity.host)) {
accepted.add(it)
running[it.context.imageEntity.host]!!.add(it)
log.debug("${it.context.imageEntity.url} accepted to run")
running[it.imageEntity.host]!!.add(it)
log.debug("${it.imageEntity.url} accepted to run")
}
}
accepted.forEach {
pending[it.context.imageEntity.host]?.remove(it)
pending[it.imageEntity.host]?.remove(it)
scheduleForDownload(it)
}
accepted.clear()
Expand Down Expand Up @@ -144,18 +143,18 @@ internal class DownloadService(
}

private fun isPending(postId: Long): Boolean {
return pending.values.flatten().any { it.context.imageEntity.postId == postId }
return pending.values.flatten().any { it.imageEntity.postId == postId }
}

private fun isRunning(postId: Long): Boolean {
return running.values.flatten().any { it.context.imageEntity.postId == postId }
return running.values.flatten().any { it.imageEntity.postId == postId }
}

private fun stopAll() {
lock.withLock {
pending.values.clear()
running.values.flatten().forEach { obj: ImageDownloadRunnable -> obj.stop() }
while (running.values.flatten().count { !it.context.completed } > 0) {
while (running.values.flatten().count { !it.completed } > 0) {
Thread.sleep(100)
}
dataTransaction.findAllNonCompletedPostIds().forEach {
Expand All @@ -169,13 +168,13 @@ internal class DownloadService(
lock.withLock {
for (postId in postIds) {
pending.values.forEach { pending ->
pending.removeIf { it.context.imageEntity.postId == postId }
pending.removeIf { it.imageEntity.postId == postId }
}
running.values.flatten()
.filter { p: ImageDownloadRunnable -> p.context.imageEntity.postId == postId }
.filter { p: ImageDownloadRunnable -> p.imageEntity.postId == postId }
.forEach { obj: ImageDownloadRunnable -> obj.stop() }
while (running.values.flatten()
.count { !it.context.completed && it.context.imageEntity.postId == postId } > 0
.count { !it.completed && it.imageEntity.postId == postId } > 0
) {
Thread.sleep(100)
}
Expand Down Expand Up @@ -213,7 +212,7 @@ internal class DownloadService(

val list: List<ImageDownloadRunnable> =
pending[host]!!.sortedWith(Comparator.comparingInt<ImageDownloadRunnable> { it.postRank }
.thenComparingInt { it.context.imageEntity.index })
.thenComparingInt { it.imageEntity.index })

for (imageDownloadRunnable in list) {
val count = hostIntegerMap[host] ?: 0
Expand All @@ -229,47 +228,42 @@ internal class DownloadService(
}

private fun scheduleForDownload(imageDownloadRunnable: ImageDownloadRunnable) {
log.debug("Scheduling a job for ${imageDownloadRunnable.context.imageEntity.url}")
GlobalScopeCoroutine.launch {
eventBus.publishEvent(QueueStateEvent(QueueState(runningCount(), pendingCount())))
try {
Failsafe.with<Any, RetryPolicy<Any>>(retryPolicyService.buildRetryPolicyForDownload("Failed to download ${imageDownloadRunnable.context.imageEntity.url}: "))
.onFailure {
log.error(
"Failed to download ${imageDownloadRunnable.context.imageEntity.url} after ${it.attemptCount} tries",
it.failure
)
val image = imageDownloadRunnable.context.imageEntity
image.status = Status.ERROR
dataTransaction.updateImage(image)
}
.onComplete {
afterJobFinish(imageDownloadRunnable)
eventBus.publishEvent(
QueueStateEvent(
QueueState(
runningCount(), pendingCount()
)
)
)
eventBus.publishEvent(ErrorCountEvent(ErrorCount(dataTransaction.countImagesInError())))
log.debug(
"Finished downloading ${imageDownloadRunnable.context.imageEntity.url}"
)
}.run(imageDownloadRunnable::run)
} catch (e: Exception) {
log.error("Download Failure", e)
log.debug("Scheduling a job for ${imageDownloadRunnable.imageEntity.url}")
eventBus.publishEvent(QueueStateEvent(QueueState(runningCount(), pendingCount())))
Failsafe.with<Any, RetryPolicy<Any>>(retryPolicyService.buildRetryPolicy("Failed to download ${imageDownloadRunnable.imageEntity.url}: "))
.with(executorService)
.onFailure {
log.error(
"Failed to download ${imageDownloadRunnable.imageEntity.url} after ${it.attemptCount} tries",
it.exception
)
val image = imageDownloadRunnable.imageEntity
image.status = Status.ERROR
dataTransaction.updateImage(image)
}
}
.onComplete {
afterJobFinish(imageDownloadRunnable)
eventBus.publishEvent(
QueueStateEvent(
QueueState(
runningCount(), pendingCount()
)
)
)
eventBus.publishEvent(ErrorCountEvent(ErrorCount(dataTransaction.countImagesInError())))
log.debug(
"Finished downloading ${imageDownloadRunnable.imageEntity.url}"
)
}.runAsync(imageDownloadRunnable)
}

private fun afterJobFinish(imageDownloadRunnable: ImageDownloadRunnable) {
lock.withLock {
val image = imageDownloadRunnable.context.imageEntity
val image = imageDownloadRunnable.imageEntity
running[image.host]!!.remove(imageDownloadRunnable)
if (!isPending(image.postId) && !isRunning(
image.postId
) && !imageDownloadRunnable.context.stopped
) && !imageDownloadRunnable.stopped
) {
dataTransaction.finishPost(image.postId, true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ internal class ImageDownloadContext(val imageEntity: ImageEntity, val settings:
HttpClientContext.create().apply { cookieStore = BasicCookieStore() }
val requests = mutableListOf<HttpUriRequestBase>()
val postId = imageEntity.postIdRef
var stopped = false
var completed = false

fun cancelCoroutines() {
runBlocking {
Expand Down
Loading

0 comments on commit c74d711

Please sign in to comment.