From d86e544148e7536a92e60ec2a0bee48f6d70d5aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sindre=20=C3=98strem?= Date: Tue, 5 Nov 2024 16:20:05 +0100 Subject: [PATCH] should fail on nonexistant --- .../DownloadMultipleS3FilesByPrefix.kt | 11 ++++++-- .../DownloadMultipleS3FilesByPrefixTest.kt | 26 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/nifi-tekst-bundle-processors/src/main/kotlin/no/nb/nifi/tekst/processors/DownloadMultipleS3FilesByPrefix.kt b/nifi-tekst-bundle-processors/src/main/kotlin/no/nb/nifi/tekst/processors/DownloadMultipleS3FilesByPrefix.kt index cedde87..97c950d 100644 --- a/nifi-tekst-bundle-processors/src/main/kotlin/no/nb/nifi/tekst/processors/DownloadMultipleS3FilesByPrefix.kt +++ b/nifi-tekst-bundle-processors/src/main/kotlin/no/nb/nifi/tekst/processors/DownloadMultipleS3FilesByPrefix.kt @@ -160,7 +160,14 @@ class DownloadMultipleS3FilesByPrefix : AbstractProcessor() { val localFolder = context.getProperty("local_folder").evaluateAttributeExpressions(flowFile).value client = getS3Client(accessKey, secretKey, region, endpoint) - downloadAllItems(bucket, prefix, localFolder) + + try { + downloadAllItems(bucket, prefix, localFolder) + } catch (e: Exception) { + logger.error("Failed to download files", e) + session.transfer(flowFile, REL_FAILURE) + return + } session.transfer(flowFile, REL_SUCCESS) } @@ -209,7 +216,7 @@ class DownloadMultipleS3FilesByPrefix : AbstractProcessor() { ) { println("localFolder: $localFolder") val items: MutableIterable>? = listItemsByPrefix(bucket, addTrailingSlashIfNotPresent(prefix)) - if (items == null) { + if (items == null || !items.iterator().hasNext()) { logger.error("No items found in bucket $bucket with prefix $prefix") throw RuntimeException("No items found in bucket $bucket with prefix $prefix") } diff --git a/nifi-tekst-bundle-processors/src/test/kotlin/no/nb/nifi/tekst/processors/DownloadMultipleS3FilesByPrefixTest.kt b/nifi-tekst-bundle-processors/src/test/kotlin/no/nb/nifi/tekst/processors/DownloadMultipleS3FilesByPrefixTest.kt index 2cb457b..5114adb 100644 --- a/nifi-tekst-bundle-processors/src/test/kotlin/no/nb/nifi/tekst/processors/DownloadMultipleS3FilesByPrefixTest.kt +++ b/nifi-tekst-bundle-processors/src/test/kotlin/no/nb/nifi/tekst/processors/DownloadMultipleS3FilesByPrefixTest.kt @@ -114,4 +114,30 @@ class DownloadMultipleS3FilesByPrefixTest { File(downloadedFilesFolder).delete() } + @Test + fun shouldFailWhenObjectDoesNotExist() { + val runner = TestRunners.newTestRunner(DownloadMultipleS3FilesByPrefix::class.java) + runner.setProperty(DownloadMultipleS3FilesByPrefix.BUCKET, BUCKET) + runner.setProperty(DownloadMultipleS3FilesByPrefix.ACCESS_KEY, ACCESS_KEY) + runner.setProperty(DownloadMultipleS3FilesByPrefix.SECRET_KEY, SECRET_KEY) + runner.setProperty(DownloadMultipleS3FilesByPrefix.REGION, REGION) + runner.setProperty(DownloadMultipleS3FilesByPrefix.PREFIX, "NEWSPAPER/abc") + runner.setProperty(DownloadMultipleS3FilesByPrefix.ENDPOINT, minioServerUrl) + + val projectFolder = Paths.get("").toAbsolutePath().toString() + val downloadedFilesFolder = "src/test/resources/downloaded-files" + + runner.setProperty(DownloadMultipleS3FilesByPrefix.LOCAL_FOLDER, "$projectFolder/$downloadedFilesFolder") + + runner.enqueue("Hello world") + + runner.run() + + // check that the files are downloaded + runner.assertAllFlowFilesTransferred("failure") + + // clean up + File(downloadedFilesFolder).delete() + } + }