From fe625b4b5f481b3066443a07298afa2ae0055846 Mon Sep 17 00:00:00 2001 From: Himanshu Pal Date: Wed, 17 Jul 2024 11:26:55 +0530 Subject: [PATCH 1/4] added table dependency --- BlockBlobReader/target/dlqprocessor_build/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/BlockBlobReader/target/dlqprocessor_build/package.json b/BlockBlobReader/target/dlqprocessor_build/package.json index fc70c260..162c143b 100644 --- a/BlockBlobReader/target/dlqprocessor_build/package.json +++ b/BlockBlobReader/target/dlqprocessor_build/package.json @@ -5,6 +5,7 @@ "@azure/identity": "^4.2.1", "@azure/service-bus": "^7.8.1", "@azure/storage-blob": "^12.13.0", + "@azure/data-tables": "^13.2.2", "encoding": "^0.1.13" } } From 16aa8a161d71698215293e50d913e8f288c599d3 Mon Sep 17 00:00:00 2001 From: Himanshu Pal Date: Fri, 19 Jul 2024 00:19:24 +0530 Subject: [PATCH 2/4] fixed timeout error --- BlockBlobReader/src/consumer.js | 19 ++++++++----------- .../consumer_build/BlobTaskConsumer/index.js | 19 ++++++++----------- .../DLQTaskConsumer/index.js | 19 ++++++++----------- 3 files changed, 24 insertions(+), 33 deletions(-) diff --git a/BlockBlobReader/src/consumer.js b/BlockBlobReader/src/consumer.js index 91e4bda0..b8c6df4d 100644 --- a/BlockBlobReader/src/consumer.js +++ b/BlockBlobReader/src/consumer.js @@ -497,6 +497,7 @@ async function timetriggerhandler(context, timetrigger) { }catch(err){ context.log.error("Failed to create service bus client and receiver"); context.done(err); + return; } try { var messages = await queueReceiver.receiveMessages(1, { @@ -541,23 +542,19 @@ async function timetriggerhandler(context, timetrigger) { ctx.log('Successfully sent to Sumo, Exiting now.'); try{ await queueReceiver.completeMessage(messages[0]); - }catch(err){ - await queueReceiver.close(); - await sbClient.close(); - if (!err) { - ctx.log("sent and deleted"); - ctx.done(); - } else { - ctx.log.verbose("Messages Sent but failed delete from DeadLetterQueue"); - ctx.done(err); - } + ctx.log("Successfully deleted message from DLQ."); + } catch(err){ + ctx.log.error(`Failed to delete message from DLQ. error: ${error}`); } + await queueReceiver.close(); + await sbClient.close(); + ctx.done(); } } } sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler); messageHandler(serviceBusTask, context, sumoClient); - }catch(error){ + } catch(error){ await queueReceiver.close(); await sbClient.close(); if (typeof error === 'string' && new RegExp("\\b" + "No messages" + "\\b", "gi").test(error)) { diff --git a/BlockBlobReader/target/consumer_build/BlobTaskConsumer/index.js b/BlockBlobReader/target/consumer_build/BlobTaskConsumer/index.js index 91e4bda0..b8c6df4d 100644 --- a/BlockBlobReader/target/consumer_build/BlobTaskConsumer/index.js +++ b/BlockBlobReader/target/consumer_build/BlobTaskConsumer/index.js @@ -497,6 +497,7 @@ async function timetriggerhandler(context, timetrigger) { }catch(err){ context.log.error("Failed to create service bus client and receiver"); context.done(err); + return; } try { var messages = await queueReceiver.receiveMessages(1, { @@ -541,23 +542,19 @@ async function timetriggerhandler(context, timetrigger) { ctx.log('Successfully sent to Sumo, Exiting now.'); try{ await queueReceiver.completeMessage(messages[0]); - }catch(err){ - await queueReceiver.close(); - await sbClient.close(); - if (!err) { - ctx.log("sent and deleted"); - ctx.done(); - } else { - ctx.log.verbose("Messages Sent but failed delete from DeadLetterQueue"); - ctx.done(err); - } + ctx.log("Successfully deleted message from DLQ."); + } catch(err){ + ctx.log.error(`Failed to delete message from DLQ. error: ${error}`); } + await queueReceiver.close(); + await sbClient.close(); + ctx.done(); } } } sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler); messageHandler(serviceBusTask, context, sumoClient); - }catch(error){ + } catch(error){ await queueReceiver.close(); await sbClient.close(); if (typeof error === 'string' && new RegExp("\\b" + "No messages" + "\\b", "gi").test(error)) { diff --git a/BlockBlobReader/target/dlqprocessor_build/DLQTaskConsumer/index.js b/BlockBlobReader/target/dlqprocessor_build/DLQTaskConsumer/index.js index 91e4bda0..b8c6df4d 100644 --- a/BlockBlobReader/target/dlqprocessor_build/DLQTaskConsumer/index.js +++ b/BlockBlobReader/target/dlqprocessor_build/DLQTaskConsumer/index.js @@ -497,6 +497,7 @@ async function timetriggerhandler(context, timetrigger) { }catch(err){ context.log.error("Failed to create service bus client and receiver"); context.done(err); + return; } try { var messages = await queueReceiver.receiveMessages(1, { @@ -541,23 +542,19 @@ async function timetriggerhandler(context, timetrigger) { ctx.log('Successfully sent to Sumo, Exiting now.'); try{ await queueReceiver.completeMessage(messages[0]); - }catch(err){ - await queueReceiver.close(); - await sbClient.close(); - if (!err) { - ctx.log("sent and deleted"); - ctx.done(); - } else { - ctx.log.verbose("Messages Sent but failed delete from DeadLetterQueue"); - ctx.done(err); - } + ctx.log("Successfully deleted message from DLQ."); + } catch(err){ + ctx.log.error(`Failed to delete message from DLQ. error: ${error}`); } + await queueReceiver.close(); + await sbClient.close(); + ctx.done(); } } } sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler); messageHandler(serviceBusTask, context, sumoClient); - }catch(error){ + } catch(error){ await queueReceiver.close(); await sbClient.close(); if (typeof error === 'string' && new RegExp("\\b" + "No messages" + "\\b", "gi").test(error)) { From d61d38631439fcb11a1cd830508e1b6ca95a2fc6 Mon Sep 17 00:00:00 2001 From: Himanshu Pal Date: Fri, 19 Jul 2024 00:28:51 +0530 Subject: [PATCH 3/4] added test for dlq flow --- BlockBlobReader/tests/requirements.txt | 2 + BlockBlobReader/tests/test_blobreader.py | 100 +++++++++++++++++++++-- 2 files changed, 95 insertions(+), 7 deletions(-) diff --git a/BlockBlobReader/tests/requirements.txt b/BlockBlobReader/tests/requirements.txt index 39cfa09d..dba8e6e6 100644 --- a/BlockBlobReader/tests/requirements.txt +++ b/BlockBlobReader/tests/requirements.txt @@ -8,3 +8,5 @@ azure-mgmt-storage==21.1.0 azure-storage-blob==12.19.0 azure-storage==0.36.0 azure-cosmosdb-table==1.0.6 +azure-servicebus==7.12.2 +azure-mgmt-web==7.3.0 diff --git a/BlockBlobReader/tests/test_blobreader.py b/BlockBlobReader/tests/test_blobreader.py index f6bab3f6..482fc602 100644 --- a/BlockBlobReader/tests/test_blobreader.py +++ b/BlockBlobReader/tests/test_blobreader.py @@ -1,5 +1,5 @@ import os -from datetime import datetime +from datetime import datetime, timedelta import time import unittest import json @@ -8,6 +8,8 @@ from azure.storage.blob import BlockBlobService from azure.storage.blob.models import BlobBlock from azure.mgmt.storage import StorageManagementClient +from azure.servicebus import ServiceBusClient, ServiceBusMessage +from azure.mgmt.web import WebSiteManagementClient from azure.cosmosdb.table.tableservice import TableService from random import choices from string import ascii_uppercase, digits @@ -88,6 +90,15 @@ def upload_file_of_unknown_extension(self): self.test_filename_unsupported_extension, data_block, []) + def get_full_testlog_file_name(self): + # Verify with a very long append blob filename (1024 characters) + file_ext = f".{self.log_type}" + if len(self.test_filename) > 128: + expected_filename = self.test_filename[:60] + "..." + self.test_filename[-(60-len(file_ext)):] + file_ext + else: + expected_filename = self.test_filename + return expected_filename + def test_03_func_logs(self): self.logger.info("inserting mock %s data in BlobStorage" % self.log_type) if self.log_type in ("csv", "log", "blob"): @@ -159,17 +170,92 @@ def test_03_func_logs(self): self.assertTrue(record_unsupported_extension_count == 0, f"block blob file's record count: {record_unsupported_extension_count}, logs with unsupported blob extension should not be ingested") - # Verify with a very long append blob filename (1024 characters) - file_ext = f".{self.log_type}" - if len(self.test_filename) > 128: - expected_filename = self.test_filename[:60] + "..." + self.test_filename[-(60-len(file_ext)):] + file_ext - else: - expected_filename = self.test_filename + expected_filename = self.get_full_testlog_file_name() # Verify addition of _sourceCategory, _sourceHost, _sourceName and also additional metadata self.assertTrue(source_name == expected_filename, f"_sourceName: {source_name} expected_filename: {expected_filename} metadata is incorrect") self.assertTrue(source_host == f"{self.test_storageaccount_name}/{self.test_container_name}", f"_sourceHost {source_host} expected_sourcehost: {self.test_storageaccount_name}/{self.test_container_name} metadata is incorrect") + # testing DLQ flow + self.subtest_DLQ_func_logs() + + def upload_message_in_service_bus(self): + file_ext = f".{self.log_type}" + test_filename = self.test_filename + file_ext + file_size = os.path.getsize(f"blob_fixtures{file_ext}") + triggerData = { + "blobName": test_filename, + "containerName": self.test_container_name, + "endByte": file_size-1, + "resourceGroupName": self.test_storage_res_group, + "startByte": 0, + "storageName": self.test_storageaccount_name, + "subscriptionId": self.subscription_id, + "url": f"https:{self.test_storageaccount_name}.blob.core.windows.net/{self.test_container_name}/{test_filename}" + } + self.logger.info("Ingesting into servicebus DLQ") + SERVICE_BUS_NAMESPACE = self.get_resource_name("SUMOBRTaskQNS", "Microsoft.ServiceBus/namespaces") + SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE = f"{SERVICE_BUS_NAMESPACE}.servicebus.windows.net" + SERVICE_BUS_QUEUE_NAME = "blobrangetaskqueue" + servicebus_client = ServiceBusClient(SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, self.azure_credential) + + with servicebus_client: + sender = servicebus_client.get_queue_sender(queue_name=SERVICE_BUS_QUEUE_NAME) + with sender: + sender.send_messages(ServiceBusMessage(json.dumps(triggerData), time_to_live=timedelta(seconds=1))) + + # with receiver: + # num_dlq_messages = 0 + # received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=60) + # for msg in received_msgs: + # receiver.dead_letter_message( + # msg, + # reason="SomeProcessingError", + # error_description="Testing DLQTaskConsumer function", + # ) + # num_dlq_messages += 1 + # print("Number of DLQ messages ingested: ", num_dlq_messages) + + def change_sumo_endpoint(self): + self.logger.info("Changing sumo logic endpoint") + website_client = WebSiteManagementClient(credential=self.azure_credential, subscription_id=self.subscription_id) + FunctionAppWebsiteName = self.get_resource_name("SUMOBRTaskConsumer", "Microsoft.Web/sites") + response = website_client.web_apps.list_application_settings(resource_group_name=self.resource_group_name,name=FunctionAppWebsiteName) + current_properties = response.properties + current_properties["SumoLogEndpoint"] = current_properties["SumoLogEndpoint"] + "donotwork" + response = website_client.web_apps.update_application_settings(resource_group_name=self.resource_group_name,app_settings={"properties": current_properties}, name=FunctionAppWebsiteName) + + def subtest_DLQ_func_logs(self): + + self.change_sumo_endpoint() + self.upload_message_in_service_bus() + time.sleep(300) # after 5 minutes DLQ function gets triggered + app_insights = self.get_resource('Microsoft.Insights/components') + + azurefunction = "BlobTaskConsumer" + captured_output = self.fetchlogs(app_insights.name, azurefunction) + + retry_error_message = "Retry error:" + self.assertTrue(self.filter_logs(captured_output, 'message', retry_error_message), + f"No retry message found in {azurefunction} azure function logs") + + azurefunction = "DLQTaskConsumer" + captured_output = self.fetchlogs(app_insights.name, azurefunction) + + successful_sent_message = "Successfully sent to Sumo, Exiting now." + self.assertTrue(self.filter_logs(captured_output, 'message', successful_sent_message), + f"No success message found in {azurefunction} azure function logs") + + successful_dlq_delete_message = "Successfully deleted message from DLQ." + self.assertTrue(self.filter_logs(captured_output, 'message', successful_dlq_delete_message), + f"No success delete dlq message found in {azurefunction} azure function logs") + + self.assertFalse(self.filter_logs(captured_output, 'severityLevel', '3'), + f"Error messages found in {azurefunction} azure function logs") + + self.assertFalse(self.filter_logs(captured_output, 'severityLevel', '2'), + f"Warning messages found in {azurefunction} azure function logs") + def get_random_name(self, length=32): return str(uuid.uuid4()) From a70d74d95a81f3091519d3110e1cba295a6a4871 Mon Sep 17 00:00:00 2001 From: Himanshu Pal Date: Fri, 19 Jul 2024 08:17:13 +0530 Subject: [PATCH 4/4] removed commented code --- BlockBlobReader/tests/test_blobreader.py | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/BlockBlobReader/tests/test_blobreader.py b/BlockBlobReader/tests/test_blobreader.py index 482fc602..d2825989 100644 --- a/BlockBlobReader/tests/test_blobreader.py +++ b/BlockBlobReader/tests/test_blobreader.py @@ -193,7 +193,7 @@ def upload_message_in_service_bus(self): "subscriptionId": self.subscription_id, "url": f"https:{self.test_storageaccount_name}.blob.core.windows.net/{self.test_container_name}/{test_filename}" } - self.logger.info("Ingesting into servicebus DLQ") + self.logger.info("Ingesting message into servicebus DLQ", triggerData) SERVICE_BUS_NAMESPACE = self.get_resource_name("SUMOBRTaskQNS", "Microsoft.ServiceBus/namespaces") SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE = f"{SERVICE_BUS_NAMESPACE}.servicebus.windows.net" SERVICE_BUS_QUEUE_NAME = "blobrangetaskqueue" @@ -202,19 +202,7 @@ def upload_message_in_service_bus(self): with servicebus_client: sender = servicebus_client.get_queue_sender(queue_name=SERVICE_BUS_QUEUE_NAME) with sender: - sender.send_messages(ServiceBusMessage(json.dumps(triggerData), time_to_live=timedelta(seconds=1))) - - # with receiver: - # num_dlq_messages = 0 - # received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=60) - # for msg in received_msgs: - # receiver.dead_letter_message( - # msg, - # reason="SomeProcessingError", - # error_description="Testing DLQTaskConsumer function", - # ) - # num_dlq_messages += 1 - # print("Number of DLQ messages ingested: ", num_dlq_messages) + sender.send_messages(ServiceBusMessage(json.dumps(triggerData))) def change_sumo_endpoint(self): self.logger.info("Changing sumo logic endpoint") @@ -222,12 +210,13 @@ def change_sumo_endpoint(self): FunctionAppWebsiteName = self.get_resource_name("SUMOBRTaskConsumer", "Microsoft.Web/sites") response = website_client.web_apps.list_application_settings(resource_group_name=self.resource_group_name,name=FunctionAppWebsiteName) current_properties = response.properties - current_properties["SumoLogEndpoint"] = current_properties["SumoLogEndpoint"] + "donotwork" + current_properties["SumoLogEndpoint"] = current_properties["SumoLogEndpoint"].replace("==","") response = website_client.web_apps.update_application_settings(resource_group_name=self.resource_group_name,app_settings={"properties": current_properties}, name=FunctionAppWebsiteName) def subtest_DLQ_func_logs(self): self.change_sumo_endpoint() + time.sleep(300) # wait for settings change to reflect self.upload_message_in_service_bus() time.sleep(300) # after 5 minutes DLQ function gets triggered app_insights = self.get_resource('Microsoft.Insights/components')