diff --git a/BlockBlobReader/src/consumer.js b/BlockBlobReader/src/consumer.js index 91e4bda0..b8c6df4d 100644 --- a/BlockBlobReader/src/consumer.js +++ b/BlockBlobReader/src/consumer.js @@ -497,6 +497,7 @@ async function timetriggerhandler(context, timetrigger) { }catch(err){ context.log.error("Failed to create service bus client and receiver"); context.done(err); + return; } try { var messages = await queueReceiver.receiveMessages(1, { @@ -541,23 +542,19 @@ async function timetriggerhandler(context, timetrigger) { ctx.log('Successfully sent to Sumo, Exiting now.'); try{ await queueReceiver.completeMessage(messages[0]); - }catch(err){ - await queueReceiver.close(); - await sbClient.close(); - if (!err) { - ctx.log("sent and deleted"); - ctx.done(); - } else { - ctx.log.verbose("Messages Sent but failed delete from DeadLetterQueue"); - ctx.done(err); - } + ctx.log("Successfully deleted message from DLQ."); + } catch(err){ + ctx.log.error(`Failed to delete message from DLQ. error: ${error}`); } + await queueReceiver.close(); + await sbClient.close(); + ctx.done(); } } } sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler); messageHandler(serviceBusTask, context, sumoClient); - }catch(error){ + } catch(error){ await queueReceiver.close(); await sbClient.close(); if (typeof error === 'string' && new RegExp("\\b" + "No messages" + "\\b", "gi").test(error)) { diff --git a/BlockBlobReader/target/consumer_build/BlobTaskConsumer/index.js b/BlockBlobReader/target/consumer_build/BlobTaskConsumer/index.js index 91e4bda0..b8c6df4d 100644 --- a/BlockBlobReader/target/consumer_build/BlobTaskConsumer/index.js +++ b/BlockBlobReader/target/consumer_build/BlobTaskConsumer/index.js @@ -497,6 +497,7 @@ async function timetriggerhandler(context, timetrigger) { }catch(err){ context.log.error("Failed to create service bus client and receiver"); context.done(err); + return; } try { var messages = await queueReceiver.receiveMessages(1, { @@ -541,23 +542,19 @@ async function timetriggerhandler(context, timetrigger) { ctx.log('Successfully sent to Sumo, Exiting now.'); try{ await queueReceiver.completeMessage(messages[0]); - }catch(err){ - await queueReceiver.close(); - await sbClient.close(); - if (!err) { - ctx.log("sent and deleted"); - ctx.done(); - } else { - ctx.log.verbose("Messages Sent but failed delete from DeadLetterQueue"); - ctx.done(err); - } + ctx.log("Successfully deleted message from DLQ."); + } catch(err){ + ctx.log.error(`Failed to delete message from DLQ. error: ${error}`); } + await queueReceiver.close(); + await sbClient.close(); + ctx.done(); } } } sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler); messageHandler(serviceBusTask, context, sumoClient); - }catch(error){ + } catch(error){ await queueReceiver.close(); await sbClient.close(); if (typeof error === 'string' && new RegExp("\\b" + "No messages" + "\\b", "gi").test(error)) { diff --git a/BlockBlobReader/target/dlqprocessor_build/DLQTaskConsumer/index.js b/BlockBlobReader/target/dlqprocessor_build/DLQTaskConsumer/index.js index 91e4bda0..b8c6df4d 100644 --- a/BlockBlobReader/target/dlqprocessor_build/DLQTaskConsumer/index.js +++ b/BlockBlobReader/target/dlqprocessor_build/DLQTaskConsumer/index.js @@ -497,6 +497,7 @@ async function timetriggerhandler(context, timetrigger) { }catch(err){ context.log.error("Failed to create service bus client and receiver"); context.done(err); + return; } try { var messages = await queueReceiver.receiveMessages(1, { @@ -541,23 +542,19 @@ async function timetriggerhandler(context, timetrigger) { ctx.log('Successfully sent to Sumo, Exiting now.'); try{ await queueReceiver.completeMessage(messages[0]); - }catch(err){ - await queueReceiver.close(); - await sbClient.close(); - if (!err) { - ctx.log("sent and deleted"); - ctx.done(); - } else { - ctx.log.verbose("Messages Sent but failed delete from DeadLetterQueue"); - ctx.done(err); - } + ctx.log("Successfully deleted message from DLQ."); + } catch(err){ + ctx.log.error(`Failed to delete message from DLQ. error: ${error}`); } + await queueReceiver.close(); + await sbClient.close(); + ctx.done(); } } } sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler); messageHandler(serviceBusTask, context, sumoClient); - }catch(error){ + } catch(error){ await queueReceiver.close(); await sbClient.close(); if (typeof error === 'string' && new RegExp("\\b" + "No messages" + "\\b", "gi").test(error)) { diff --git a/BlockBlobReader/target/dlqprocessor_build/package.json b/BlockBlobReader/target/dlqprocessor_build/package.json index fc70c260..162c143b 100644 --- a/BlockBlobReader/target/dlqprocessor_build/package.json +++ b/BlockBlobReader/target/dlqprocessor_build/package.json @@ -5,6 +5,7 @@ "@azure/identity": "^4.2.1", "@azure/service-bus": "^7.8.1", "@azure/storage-blob": "^12.13.0", + "@azure/data-tables": "^13.2.2", "encoding": "^0.1.13" } } diff --git a/BlockBlobReader/tests/requirements.txt b/BlockBlobReader/tests/requirements.txt index 39cfa09d..dba8e6e6 100644 --- a/BlockBlobReader/tests/requirements.txt +++ b/BlockBlobReader/tests/requirements.txt @@ -8,3 +8,5 @@ azure-mgmt-storage==21.1.0 azure-storage-blob==12.19.0 azure-storage==0.36.0 azure-cosmosdb-table==1.0.6 +azure-servicebus==7.12.2 +azure-mgmt-web==7.3.0 diff --git a/BlockBlobReader/tests/test_blobreader.py b/BlockBlobReader/tests/test_blobreader.py index f6bab3f6..d2825989 100644 --- a/BlockBlobReader/tests/test_blobreader.py +++ b/BlockBlobReader/tests/test_blobreader.py @@ -1,5 +1,5 @@ import os -from datetime import datetime +from datetime import datetime, timedelta import time import unittest import json @@ -8,6 +8,8 @@ from azure.storage.blob import BlockBlobService from azure.storage.blob.models import BlobBlock from azure.mgmt.storage import StorageManagementClient +from azure.servicebus import ServiceBusClient, ServiceBusMessage +from azure.mgmt.web import WebSiteManagementClient from azure.cosmosdb.table.tableservice import TableService from random import choices from string import ascii_uppercase, digits @@ -88,6 +90,15 @@ def upload_file_of_unknown_extension(self): self.test_filename_unsupported_extension, data_block, []) + def get_full_testlog_file_name(self): + # Verify with a very long append blob filename (1024 characters) + file_ext = f".{self.log_type}" + if len(self.test_filename) > 128: + expected_filename = self.test_filename[:60] + "..." + self.test_filename[-(60-len(file_ext)):] + file_ext + else: + expected_filename = self.test_filename + return expected_filename + def test_03_func_logs(self): self.logger.info("inserting mock %s data in BlobStorage" % self.log_type) if self.log_type in ("csv", "log", "blob"): @@ -159,17 +170,81 @@ def test_03_func_logs(self): self.assertTrue(record_unsupported_extension_count == 0, f"block blob file's record count: {record_unsupported_extension_count}, logs with unsupported blob extension should not be ingested") - # Verify with a very long append blob filename (1024 characters) - file_ext = f".{self.log_type}" - if len(self.test_filename) > 128: - expected_filename = self.test_filename[:60] + "..." + self.test_filename[-(60-len(file_ext)):] + file_ext - else: - expected_filename = self.test_filename + expected_filename = self.get_full_testlog_file_name() # Verify addition of _sourceCategory, _sourceHost, _sourceName and also additional metadata self.assertTrue(source_name == expected_filename, f"_sourceName: {source_name} expected_filename: {expected_filename} metadata is incorrect") self.assertTrue(source_host == f"{self.test_storageaccount_name}/{self.test_container_name}", f"_sourceHost {source_host} expected_sourcehost: {self.test_storageaccount_name}/{self.test_container_name} metadata is incorrect") + # testing DLQ flow + self.subtest_DLQ_func_logs() + + def upload_message_in_service_bus(self): + file_ext = f".{self.log_type}" + test_filename = self.test_filename + file_ext + file_size = os.path.getsize(f"blob_fixtures{file_ext}") + triggerData = { + "blobName": test_filename, + "containerName": self.test_container_name, + "endByte": file_size-1, + "resourceGroupName": self.test_storage_res_group, + "startByte": 0, + "storageName": self.test_storageaccount_name, + "subscriptionId": self.subscription_id, + "url": f"https:{self.test_storageaccount_name}.blob.core.windows.net/{self.test_container_name}/{test_filename}" + } + self.logger.info("Ingesting message into servicebus DLQ", triggerData) + SERVICE_BUS_NAMESPACE = self.get_resource_name("SUMOBRTaskQNS", "Microsoft.ServiceBus/namespaces") + SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE = f"{SERVICE_BUS_NAMESPACE}.servicebus.windows.net" + SERVICE_BUS_QUEUE_NAME = "blobrangetaskqueue" + servicebus_client = ServiceBusClient(SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, self.azure_credential) + + with servicebus_client: + sender = servicebus_client.get_queue_sender(queue_name=SERVICE_BUS_QUEUE_NAME) + with sender: + sender.send_messages(ServiceBusMessage(json.dumps(triggerData))) + + def change_sumo_endpoint(self): + self.logger.info("Changing sumo logic endpoint") + website_client = WebSiteManagementClient(credential=self.azure_credential, subscription_id=self.subscription_id) + FunctionAppWebsiteName = self.get_resource_name("SUMOBRTaskConsumer", "Microsoft.Web/sites") + response = website_client.web_apps.list_application_settings(resource_group_name=self.resource_group_name,name=FunctionAppWebsiteName) + current_properties = response.properties + current_properties["SumoLogEndpoint"] = current_properties["SumoLogEndpoint"].replace("==","") + response = website_client.web_apps.update_application_settings(resource_group_name=self.resource_group_name,app_settings={"properties": current_properties}, name=FunctionAppWebsiteName) + + def subtest_DLQ_func_logs(self): + + self.change_sumo_endpoint() + time.sleep(300) # wait for settings change to reflect + self.upload_message_in_service_bus() + time.sleep(300) # after 5 minutes DLQ function gets triggered + app_insights = self.get_resource('Microsoft.Insights/components') + + azurefunction = "BlobTaskConsumer" + captured_output = self.fetchlogs(app_insights.name, azurefunction) + + retry_error_message = "Retry error:" + self.assertTrue(self.filter_logs(captured_output, 'message', retry_error_message), + f"No retry message found in {azurefunction} azure function logs") + + azurefunction = "DLQTaskConsumer" + captured_output = self.fetchlogs(app_insights.name, azurefunction) + + successful_sent_message = "Successfully sent to Sumo, Exiting now." + self.assertTrue(self.filter_logs(captured_output, 'message', successful_sent_message), + f"No success message found in {azurefunction} azure function logs") + + successful_dlq_delete_message = "Successfully deleted message from DLQ." + self.assertTrue(self.filter_logs(captured_output, 'message', successful_dlq_delete_message), + f"No success delete dlq message found in {azurefunction} azure function logs") + + self.assertFalse(self.filter_logs(captured_output, 'severityLevel', '3'), + f"Error messages found in {azurefunction} azure function logs") + + self.assertFalse(self.filter_logs(captured_output, 'severityLevel', '2'), + f"Warning messages found in {azurefunction} azure function logs") + def get_random_name(self, length=32): return str(uuid.uuid4())