Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Releasing 4.1.2 #118

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions BlockBlobReader/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ async function timetriggerhandler(context, timetrigger) {
}catch(err){
context.log.error("Failed to create service bus client and receiver");
context.done(err);
return;
}
try {
var messages = await queueReceiver.receiveMessages(1, {
Expand Down Expand Up @@ -541,23 +542,19 @@ async function timetriggerhandler(context, timetrigger) {
ctx.log('Successfully sent to Sumo, Exiting now.');
try{
await queueReceiver.completeMessage(messages[0]);
}catch(err){
await queueReceiver.close();
await sbClient.close();
if (!err) {
ctx.log("sent and deleted");
ctx.done();
} else {
ctx.log.verbose("Messages Sent but failed delete from DeadLetterQueue");
ctx.done(err);
}
ctx.log("Successfully deleted message from DLQ.");
} catch(err){
ctx.log.error(`Failed to delete message from DLQ. error: ${error}`);
}
await queueReceiver.close();
await sbClient.close();
ctx.done();
}
}
}
sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler);
messageHandler(serviceBusTask, context, sumoClient);
}catch(error){
} catch(error){
await queueReceiver.close();
await sbClient.close();
if (typeof error === 'string' && new RegExp("\\b" + "No messages" + "\\b", "gi").test(error)) {
Expand Down
19 changes: 8 additions & 11 deletions BlockBlobReader/target/consumer_build/BlobTaskConsumer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ async function timetriggerhandler(context, timetrigger) {
}catch(err){
context.log.error("Failed to create service bus client and receiver");
context.done(err);
return;
}
try {
var messages = await queueReceiver.receiveMessages(1, {
Expand Down Expand Up @@ -541,23 +542,19 @@ async function timetriggerhandler(context, timetrigger) {
ctx.log('Successfully sent to Sumo, Exiting now.');
try{
await queueReceiver.completeMessage(messages[0]);
}catch(err){
await queueReceiver.close();
await sbClient.close();
if (!err) {
ctx.log("sent and deleted");
ctx.done();
} else {
ctx.log.verbose("Messages Sent but failed delete from DeadLetterQueue");
ctx.done(err);
}
ctx.log("Successfully deleted message from DLQ.");
} catch(err){
ctx.log.error(`Failed to delete message from DLQ. error: ${error}`);
}
await queueReceiver.close();
await sbClient.close();
ctx.done();
}
}
}
sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler);
messageHandler(serviceBusTask, context, sumoClient);
}catch(error){
} catch(error){
await queueReceiver.close();
await sbClient.close();
if (typeof error === 'string' && new RegExp("\\b" + "No messages" + "\\b", "gi").test(error)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ async function timetriggerhandler(context, timetrigger) {
}catch(err){
context.log.error("Failed to create service bus client and receiver");
context.done(err);
return;
}
try {
var messages = await queueReceiver.receiveMessages(1, {
Expand Down Expand Up @@ -541,23 +542,19 @@ async function timetriggerhandler(context, timetrigger) {
ctx.log('Successfully sent to Sumo, Exiting now.');
try{
await queueReceiver.completeMessage(messages[0]);
}catch(err){
await queueReceiver.close();
await sbClient.close();
if (!err) {
ctx.log("sent and deleted");
ctx.done();
} else {
ctx.log.verbose("Messages Sent but failed delete from DeadLetterQueue");
ctx.done(err);
}
ctx.log("Successfully deleted message from DLQ.");
} catch(err){
ctx.log.error(`Failed to delete message from DLQ. error: ${error}`);
}
await queueReceiver.close();
await sbClient.close();
ctx.done();
}
}
}
sumoClient = new sumoHttp.SumoClient(options, context, failureHandler, successHandler);
messageHandler(serviceBusTask, context, sumoClient);
}catch(error){
} catch(error){
await queueReceiver.close();
await sbClient.close();
if (typeof error === 'string' && new RegExp("\\b" + "No messages" + "\\b", "gi").test(error)) {
Expand Down
1 change: 1 addition & 0 deletions BlockBlobReader/target/dlqprocessor_build/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"@azure/identity": "^4.2.1",
"@azure/service-bus": "^7.8.1",
"@azure/storage-blob": "^12.13.0",
"@azure/data-tables": "^13.2.2",
"encoding": "^0.1.13"
}
}
2 changes: 2 additions & 0 deletions BlockBlobReader/tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ azure-mgmt-storage==21.1.0
azure-storage-blob==12.19.0
azure-storage==0.36.0
azure-cosmosdb-table==1.0.6
azure-servicebus==7.12.2
azure-mgmt-web==7.3.0
89 changes: 82 additions & 7 deletions BlockBlobReader/tests/test_blobreader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from datetime import datetime
from datetime import datetime, timedelta
import time
import unittest
import json
Expand All @@ -8,6 +8,8 @@
from azure.storage.blob import BlockBlobService
from azure.storage.blob.models import BlobBlock
from azure.mgmt.storage import StorageManagementClient
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.mgmt.web import WebSiteManagementClient
from azure.cosmosdb.table.tableservice import TableService
from random import choices
from string import ascii_uppercase, digits
Expand Down Expand Up @@ -88,6 +90,15 @@ def upload_file_of_unknown_extension(self):
self.test_filename_unsupported_extension,
data_block, [])

def get_full_testlog_file_name(self):
# Verify with a very long append blob filename (1024 characters)
file_ext = f".{self.log_type}"
if len(self.test_filename) > 128:
expected_filename = self.test_filename[:60] + "..." + self.test_filename[-(60-len(file_ext)):] + file_ext
else:
expected_filename = self.test_filename
return expected_filename

def test_03_func_logs(self):
self.logger.info("inserting mock %s data in BlobStorage" % self.log_type)
if self.log_type in ("csv", "log", "blob"):
Expand Down Expand Up @@ -159,17 +170,81 @@ def test_03_func_logs(self):
self.assertTrue(record_unsupported_extension_count == 0,
f"block blob file's record count: {record_unsupported_extension_count}, logs with unsupported blob extension should not be ingested")

# Verify with a very long append blob filename (1024 characters)
file_ext = f".{self.log_type}"
if len(self.test_filename) > 128:
expected_filename = self.test_filename[:60] + "..." + self.test_filename[-(60-len(file_ext)):] + file_ext
else:
expected_filename = self.test_filename
expected_filename = self.get_full_testlog_file_name()

# Verify addition of _sourceCategory, _sourceHost, _sourceName and also additional metadata
self.assertTrue(source_name == expected_filename, f"_sourceName: {source_name} expected_filename: {expected_filename} metadata is incorrect")
self.assertTrue(source_host == f"{self.test_storageaccount_name}/{self.test_container_name}", f"_sourceHost {source_host} expected_sourcehost: {self.test_storageaccount_name}/{self.test_container_name} metadata is incorrect")

# testing DLQ flow
self.subtest_DLQ_func_logs()

def upload_message_in_service_bus(self):
file_ext = f".{self.log_type}"
test_filename = self.test_filename + file_ext
file_size = os.path.getsize(f"blob_fixtures{file_ext}")
triggerData = {
"blobName": test_filename,
"containerName": self.test_container_name,
"endByte": file_size-1,
"resourceGroupName": self.test_storage_res_group,
"startByte": 0,
"storageName": self.test_storageaccount_name,
"subscriptionId": self.subscription_id,
"url": f"https:{self.test_storageaccount_name}.blob.core.windows.net/{self.test_container_name}/{test_filename}"
}
self.logger.info("Ingesting message into servicebus DLQ", triggerData)
SERVICE_BUS_NAMESPACE = self.get_resource_name("SUMOBRTaskQNS", "Microsoft.ServiceBus/namespaces")
SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE = f"{SERVICE_BUS_NAMESPACE}.servicebus.windows.net"
SERVICE_BUS_QUEUE_NAME = "blobrangetaskqueue"
servicebus_client = ServiceBusClient(SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, self.azure_credential)

with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=SERVICE_BUS_QUEUE_NAME)
with sender:
sender.send_messages(ServiceBusMessage(json.dumps(triggerData)))

def change_sumo_endpoint(self):
self.logger.info("Changing sumo logic endpoint")
website_client = WebSiteManagementClient(credential=self.azure_credential, subscription_id=self.subscription_id)
FunctionAppWebsiteName = self.get_resource_name("SUMOBRTaskConsumer", "Microsoft.Web/sites")
response = website_client.web_apps.list_application_settings(resource_group_name=self.resource_group_name,name=FunctionAppWebsiteName)
current_properties = response.properties
current_properties["SumoLogEndpoint"] = current_properties["SumoLogEndpoint"].replace("==","")
response = website_client.web_apps.update_application_settings(resource_group_name=self.resource_group_name,app_settings={"properties": current_properties}, name=FunctionAppWebsiteName)

def subtest_DLQ_func_logs(self):

self.change_sumo_endpoint()
time.sleep(300) # wait for settings change to reflect
self.upload_message_in_service_bus()
time.sleep(300) # after 5 minutes DLQ function gets triggered
app_insights = self.get_resource('Microsoft.Insights/components')

azurefunction = "BlobTaskConsumer"
captured_output = self.fetchlogs(app_insights.name, azurefunction)

retry_error_message = "Retry error:"
self.assertTrue(self.filter_logs(captured_output, 'message', retry_error_message),
f"No retry message found in {azurefunction} azure function logs")

azurefunction = "DLQTaskConsumer"
captured_output = self.fetchlogs(app_insights.name, azurefunction)

successful_sent_message = "Successfully sent to Sumo, Exiting now."
self.assertTrue(self.filter_logs(captured_output, 'message', successful_sent_message),
f"No success message found in {azurefunction} azure function logs")

successful_dlq_delete_message = "Successfully deleted message from DLQ."
self.assertTrue(self.filter_logs(captured_output, 'message', successful_dlq_delete_message),
f"No success delete dlq message found in {azurefunction} azure function logs")

self.assertFalse(self.filter_logs(captured_output, 'severityLevel', '3'),
f"Error messages found in {azurefunction} azure function logs")

self.assertFalse(self.filter_logs(captured_output, 'severityLevel', '2'),
f"Warning messages found in {azurefunction} azure function logs")

def get_random_name(self, length=32):
return str(uuid.uuid4())

Expand Down
Loading