Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SUMO-239802: added nsglogs parsing function #116

Merged
merged 4 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 142 additions & 18 deletions BlockBlobReader/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
var sumoHttp = require('./sumoclient');
var { ContainerClient } = require("@azure/storage-blob");
var { DefaultAzureCredential } = require("@azure/identity");
const { TableClient } = require("@azure/data-tables");
var { AbortController } = require("@azure/abort-controller");
var { ServiceBusClient } = require("@azure/service-bus");
var DEFAULT_CSV_SEPARATOR = ",";
var MAX_CHUNK_SIZE = 1024;
var JSON_BLOB_HEAD_BYTES = 12;
var JSON_BLOB_TAIL_BYTES = 2;
const azureTableClient = TableClient.fromConnectionString(process.env.AzureWebJobsStorage, "FileOffsetMap");

function csvToArray(strData, strDelimiter) {
strDelimiter = (strDelimiter || ",");
Expand Down Expand Up @@ -116,19 +118,131 @@ function csvHandler(context,msgtext, headers) {
return messageArray;
}

function nsgLogsHandler(context,msg) {
/*
return index of first time when pattern matches the string
*/
function regexIndexOf(string, regex, startpos) {
var indexOf = string.substring(startpos || 0).search(regex);
return (indexOf >= 0) ? (indexOf + (startpos || 0)) : indexOf;
}

/*
return index of last time when pattern matches the string
*/
function regexLastIndexOf(string, regex, startpos) {
// https://stackoverflow.com/questions/19445994/javascript-string-search-for-regex-starting-at-the-end-of-the-string
var stringToWorkWith = string.substring(startpos, string.length);
var match = stringToWorkWith.match(regex);
return match ? string.lastIndexOf(match.slice(-1)) : -1;
}

/*
returns array of json by removing unparseable prefix and suffix in data
*/
function getParseableJsonArray(data, context) {

let logRegex = '\{\\s*\"time\"\:'; // starting regex for nsg logs
let defaultEncoding = "utf8";
let orginalDatalength = data.length;
// If the byte sequence in the buffer data is not valid according to the provided encoding, then it is replaced by the default replacement character i.e. U+FFFD.
// return -1 if not found
let firstIdx = regexIndexOf(data, logRegex);
let lastIndex = regexLastIndexOf(data, logRegex, firstIdx + 1);

// data.substring method extracts the characters in a string between "start" and "end", not including "end" itself.
let prefix = data.substring(0, firstIdx);
// in case only one time string
if (lastIndex === -1 && data.length > 0) {
lastIndex = data.length;
}
let suffix = data.substring(lastIndex, data.length);
if (suffix.length > 0) {
try {
JSON.parse(suffix.trim());
lastIndex = data.length;
} catch (error) {
context.log.error(`Failed to parse the JSON last chunk. Ignoring suffix: ${suffix.trim()}, error: ${error}`);
}
}

// ideally ignoredprefixLen should always be 0. it will be dropped for files which are updated
context.log.verbose(`Ignoring log prefix length: ${Buffer.byteLength(prefix, defaultEncoding)} suffix length: ${Buffer.byteLength(data.substring(lastIndex, data.length), defaultEncoding)}`);

// data with both prefix and suffix removed
data = data.substring(firstIdx, lastIndex);
let dataLenParsed = Buffer.byteLength(prefix + data, defaultEncoding);
data = data.trim().replace(/(^,)|(,$)/g, ""); //removing trailing spaces,newlines and leftover commas

try {
var jsonArray = JSON.parse("[" + data + "]");
context.log.verbose(`Successfully parsed Json! datalength: ${data.length} orginalDatalength: ${orginalDatalength} dataLenParsed: ${dataLenParsed}`)
return [jsonArray, dataLenParsed, true];
} catch(error) {
context.log.error(`Failed to parse the JSON after removing prefix/suffix Error: ${error} firstIdx: ${firstIdx} lastIndex: ${lastIndex} prefix: ${prefix} datastart: ${data.substring(0,10)} dataend: ${data.substring(data.length-10,data.length)} orginalDatalength: ${orginalDatalength} dataLenParsed: ${dataLenParsed}`);
return [[data], dataLenParsed, false];
}
}

function getRowKey(metadata) {
var storageName = metadata.url.split("//").pop().split(".")[0];
var arr = metadata.url.split('/').slice(3);
var keyArr = [storageName];
Array.prototype.push.apply(keyArr, arr);
// key cannot be greater than 1KB or 1024 bytes;
var rowKey = keyArr.join("-");
return rowKey.substr(0,Math.min(1024, rowKey.length)).replace(/^-+|-+$/g, '');
}

async function setAppendBlobOffset(context, serviceBusTask, newOffset) {

try {
let rowKey = getRowKey(serviceBusTask);
// Todo: this should be atomic update if other request decreases offset it shouldn't allow
context.log.verbose("Attempting to update offset row: %s from: %d to: %d", rowKey, serviceBusTask.startByte, newOffset);
let entity = {
offset: { type: "Int64", value: String(newOffset) },
// In a scenario where the entity could have been deleted (archived) by appendblob because of large queueing time so to avoid error in insertOrMerge Entity we include rest of the fields like storageName,containerName etc.
partitionKey: serviceBusTask.containerName,
rowKey: rowKey,
blobName: serviceBusTask.blobName,
containerName: serviceBusTask.containerName,
storageName: serviceBusTask.storageName
}
var updateResult = await azureTableClient.updateEntity(entity, "Merge");
context.log.verbose("Updated offset result: %s row: %s from: %d to: %d", JSON.stringify(updateResult), rowKey, serviceBusTask.startByte, newOffset);
} catch (error) {
context.log.error(`Error - Failed to update OffsetMap table, error: ${JSON.stringify(error)}, rowKey: ${rowKey}, newOffset: ${newOffset}`)
}
}

async function nsgLogsHandler(context, msg, serviceBusTask) {

var jsonArray = [];
msg = msg.trim().replace(/(^,)|(,$)/g, ""); //removing trailing spaces,newlines and leftover commas
jsonArray = JSON.parse("[" + msg + "]");

try {
jsonArray = JSON.parse("[" + msg + "]");
} catch(err) {
let response = getParseableJsonArray(msg, context, serviceBusTask);
jsonArray = response[0];
let is_success = response[2];
let newOffset = response[1] + serviceBusTask.startByte;
if (is_success) {
await setAppendBlobOffset(context, serviceBusTask, newOffset);
} else {
return jsonArray;
}

}

var eventsArr = [];
jsonArray.forEach(function (record) {
version = record.properties.Version;
let version = record.properties.Version;
record.properties.flows.forEach(function (rule) {
rule.flows.forEach(function (flow) {
flow.flowTuples.forEach(function (tuple) {
col = tuple.split(",");
event = {
let col = tuple.split(",");
let event = {
time: col[0], // this should be epoch time
sys_id: record.systemId,
category: record.category,
Expand Down Expand Up @@ -179,7 +293,7 @@ function jsonHandler(context,msg) {
function blobHandler(context,msg) {
// it's assumed that .blob files contains json separated by \n
//https://docs.microsoft.com/en-us/azure/application-insights/app-insights-export-telemetry

var jsonArray = [];
msg = msg.replace(/\0/g, '');
msg = msg.replace(/(\r?\n|\r)/g, ",");
Expand Down Expand Up @@ -238,7 +352,7 @@ function messageHandler(serviceBusTask, context, sumoClient) {
context.done();
return;
}
if (file_ext === "json" & serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent") {
if ((file_ext === "json") && (serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent")) {
// because in json first block and last block remain as it is and azure service adds new block in 2nd last pos
if (serviceBusTask.endByte < JSON_BLOB_HEAD_BYTES + JSON_BLOB_TAIL_BYTES) {
context.done(); //rejecting first commit when no data is there data will always be atleast HEAD_BYTES+DATA_BYTES+TAIL_BYTES
Expand All @@ -247,19 +361,17 @@ function messageHandler(serviceBusTask, context, sumoClient) {
serviceBusTask.endByte -= JSON_BLOB_TAIL_BYTES;
if (serviceBusTask.startByte <= JSON_BLOB_HEAD_BYTES) {
serviceBusTask.startByte = JSON_BLOB_HEAD_BYTES;
} else {
serviceBusTask.startByte -= 1; //to remove comma before json object
}
file_ext = "nsg";
}
getBlockBlobService(context, serviceBusTask).then(function (blobService) {
return getData(serviceBusTask, blobService, context).then(function (msg) {
return getData(serviceBusTask, blobService, context).then(async function (msg) {
context.log("Sucessfully downloaded blob %s %d %d", serviceBusTask.blobName, serviceBusTask.startByte, serviceBusTask.endByte);
var messageArray;
if (file_ext === "csv") {
return getcsvHeader(serviceBusTask.containerName, serviceBusTask.blobName, blobService, context).then(function (headers) {
context.log("Received headers %d", headers.length);
messageArray = msghandler[file_ext](context,msg, headers);
messageArray = csvHandler(context,msg, headers);
// context.log("Transformed data %s", JSON.stringify(messageArray));
messageArray.forEach(function (msg) {
sumoClient.addData(msg);
Expand All @@ -270,7 +382,11 @@ function messageHandler(serviceBusTask, context, sumoClient) {
context.done(err);
});
} else {
messageArray = msghandler[file_ext](context,msg);
if (file_ext == "nsg") {
messageArray = await nsgLogsHandler(context, msg, serviceBusTask);
} else {
messageArray = msghandler[file_ext](context,msg);
}
messageArray.forEach(function (msg) {
sumoClient.addData(msg);
});
Expand All @@ -282,7 +398,7 @@ function messageHandler(serviceBusTask, context, sumoClient) {
context.log.error("Error in messageHandler: blob file doesn't exist " + serviceBusTask.blobName + " " + serviceBusTask.startByte + " " +serviceBusTask.endByte);
context.done()
} else {
context.log.error("Error in messageHandler: Failed to send blob " + serviceBusTask.blobName + " " + serviceBusTask.startByte + " " +serviceBusTask.endByte);
context.log.error("Error in messageHandler: Failed to send blob " + serviceBusTask.blobName + " " + serviceBusTask.startByte + " " +serviceBusTask.endByte + " err: " + err);
context.done(err);
}

Expand Down Expand Up @@ -347,20 +463,18 @@ function servicebushandler(context, serviceBusTask) {
};
setSourceCategory(serviceBusTask, options);
function failureHandler(msgArray, ctx) {
ctx.log("ServiceBus Task: ", serviceBusTask)
ctx.log.error("Failed to send to Sumo");
ctx.log.error(`Failed to send to Sumo`);
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
ctx.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
}
}
function successHandler(ctx) {
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
ctx.log("ServiceBus Task: ", serviceBusTask)
if (sumoClient.messagesFailed > 0) {
ctx.log.error('Failed to send few messages to Sumo')
ctx.log.error(`Failed to send few messages to Sumo`)
ctx.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
} else {
ctx.log('Successfully sent to Sumo, Exiting now.');
ctx.log(`Successfully sent to Sumo, Exiting now.`);
ctx.done();
}
}
Expand Down Expand Up @@ -457,6 +571,16 @@ async function timetriggerhandler(context, timetrigger) {
}

module.exports = function (context, triggerData) {
// triggerData = {
// "blobName": "blob_fixtures.json",
// "containerName": "insights-logs-networksecuritygroupflowevent",
// "endByte": 2617,
// "resourceGroupName": "testsumosa250624004409",
// "startByte": 0,
// "storageName": "testsa250624004409",
// "subscriptionId": "c088dc46-d692-42ad-a4b6-9a542d28ad2a",
// "url": "https://testsa250624004409.blob.core.windows.net/insights-logs-networksecuritygroupflowevent/blob_fixtures.json"
// };
if (triggerData.isPastDue === undefined) {
servicebushandler(context, triggerData);
} else {
Expand Down
11 changes: 10 additions & 1 deletion BlockBlobReader/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ function getNewTask(currentoffset,sortedcontentlengths,metadata){
lastoffset = endByte;
}
}
if (tasks.length === 0 && sortedcontentlengths.length > 0 && endByte < currentoffset) {
// in NSG Flow logs sometimes file gets rewritten, hence starting the file from the beginning
task = Object.assign({
startByte: 0,
endByte: endByte
}, metadata);
tasks.push(task);
lastoffset = endByte;
}
return [tasks,lastoffset];
}

Expand All @@ -142,7 +151,7 @@ async function createTasksForBlob(partitionKey, rowKey, sortedcontentlengths, co
}
var currentoffset = retrievedResponse.statusCode === 404 ? -1 : Number(retrievedResponse.entity.offset);
var currentEtag = retrievedResponse.statusCode === 404 ? null : retrievedResponse.entity.etag;
var [tasks,lastoffset] = getNewTask(currentoffset,sortedcontentlengths,metadata);
var [tasks,lastoffset] = getNewTask(currentoffset, sortedcontentlengths, metadata);

if (tasks.length > 0) { // modify offset only when it's been changed
var entity = getEntity(metadata, lastoffset, currentEtag);
Expand Down
Loading
Loading