Skip to content

Commit 972c65e

Browse files
authored
Merge pull request #116 from SumoLogic/SUMO-239802_nsglogs_fix
SUMO-239802: added nsglogs parsing function
2 parents 300186e + 1f5aaf8 commit 972c65e

File tree

9 files changed

+454
-117
lines changed

9 files changed

+454
-117
lines changed

BlockBlobReader/src/consumer.js

Lines changed: 142 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
var sumoHttp = require('./sumoclient');
66
var { ContainerClient } = require("@azure/storage-blob");
77
var { DefaultAzureCredential } = require("@azure/identity");
8+
const { TableClient } = require("@azure/data-tables");
89
var { AbortController } = require("@azure/abort-controller");
910
var { ServiceBusClient } = require("@azure/service-bus");
1011
var DEFAULT_CSV_SEPARATOR = ",";
1112
var MAX_CHUNK_SIZE = 1024;
1213
var JSON_BLOB_HEAD_BYTES = 12;
1314
var JSON_BLOB_TAIL_BYTES = 2;
15+
const azureTableClient = TableClient.fromConnectionString(process.env.AzureWebJobsStorage, "FileOffsetMap");
1416

1517
function csvToArray(strData, strDelimiter) {
1618
strDelimiter = (strDelimiter || ",");
@@ -116,19 +118,131 @@ function csvHandler(context,msgtext, headers) {
116118
return messageArray;
117119
}
118120

119-
function nsgLogsHandler(context,msg) {
121+
/*
122+
return index of first time when pattern matches the string
123+
*/
124+
function regexIndexOf(string, regex, startpos) {
125+
var indexOf = string.substring(startpos || 0).search(regex);
126+
return (indexOf >= 0) ? (indexOf + (startpos || 0)) : indexOf;
127+
}
128+
129+
/*
130+
return index of last time when pattern matches the string
131+
*/
132+
function regexLastIndexOf(string, regex, startpos) {
133+
// https://stackoverflow.com/questions/19445994/javascript-string-search-for-regex-starting-at-the-end-of-the-string
134+
var stringToWorkWith = string.substring(startpos, string.length);
135+
var match = stringToWorkWith.match(regex);
136+
return match ? string.lastIndexOf(match.slice(-1)) : -1;
137+
}
138+
139+
/*
140+
returns array of json by removing unparseable prefix and suffix in data
141+
*/
142+
function getParseableJsonArray(data, context) {
143+
144+
let logRegex = '\{\\s*\"time\"\:'; // starting regex for nsg logs
145+
let defaultEncoding = "utf8";
146+
let orginalDatalength = data.length;
147+
// If the byte sequence in the buffer data is not valid according to the provided encoding, then it is replaced by the default replacement character i.e. U+FFFD.
148+
// return -1 if not found
149+
let firstIdx = regexIndexOf(data, logRegex);
150+
let lastIndex = regexLastIndexOf(data, logRegex, firstIdx + 1);
151+
152+
// data.substring method extracts the characters in a string between "start" and "end", not including "end" itself.
153+
let prefix = data.substring(0, firstIdx);
154+
// in case only one time string
155+
if (lastIndex === -1 && data.length > 0) {
156+
lastIndex = data.length;
157+
}
158+
let suffix = data.substring(lastIndex, data.length);
159+
if (suffix.length > 0) {
160+
try {
161+
JSON.parse(suffix.trim());
162+
lastIndex = data.length;
163+
} catch (error) {
164+
context.log.error(`Failed to parse the JSON last chunk. Ignoring suffix: ${suffix.trim()}, error: ${error}`);
165+
}
166+
}
167+
168+
// ideally ignoredprefixLen should always be 0. it will be dropped for files which are updated
169+
context.log.verbose(`Ignoring log prefix length: ${Buffer.byteLength(prefix, defaultEncoding)} suffix length: ${Buffer.byteLength(data.substring(lastIndex, data.length), defaultEncoding)}`);
170+
171+
// data with both prefix and suffix removed
172+
data = data.substring(firstIdx, lastIndex);
173+
let dataLenParsed = Buffer.byteLength(prefix + data, defaultEncoding);
174+
data = data.trim().replace(/(^,)|(,$)/g, ""); //removing trailing spaces,newlines and leftover commas
175+
176+
try {
177+
var jsonArray = JSON.parse("[" + data + "]");
178+
context.log.verbose(`Successfully parsed Json! datalength: ${data.length} orginalDatalength: ${orginalDatalength} dataLenParsed: ${dataLenParsed}`)
179+
return [jsonArray, dataLenParsed, true];
180+
} catch(error) {
181+
context.log.error(`Failed to parse the JSON after removing prefix/suffix Error: ${error} firstIdx: ${firstIdx} lastIndex: ${lastIndex} prefix: ${prefix} datastart: ${data.substring(0,10)} dataend: ${data.substring(data.length-10,data.length)} orginalDatalength: ${orginalDatalength} dataLenParsed: ${dataLenParsed}`);
182+
return [[data], dataLenParsed, false];
183+
}
184+
}
185+
186+
function getRowKey(metadata) {
187+
var storageName = metadata.url.split("//").pop().split(".")[0];
188+
var arr = metadata.url.split('/').slice(3);
189+
var keyArr = [storageName];
190+
Array.prototype.push.apply(keyArr, arr);
191+
// key cannot be greater than 1KB or 1024 bytes;
192+
var rowKey = keyArr.join("-");
193+
return rowKey.substr(0,Math.min(1024, rowKey.length)).replace(/^-+|-+$/g, '');
194+
}
195+
196+
async function setAppendBlobOffset(context, serviceBusTask, newOffset) {
197+
198+
try {
199+
let rowKey = getRowKey(serviceBusTask);
200+
// Todo: this should be atomic update if other request decreases offset it shouldn't allow
201+
context.log.verbose("Attempting to update offset row: %s from: %d to: %d", rowKey, serviceBusTask.startByte, newOffset);
202+
let entity = {
203+
offset: { type: "Int64", value: String(newOffset) },
204+
// In a scenario where the entity could have been deleted (archived) by appendblob because of large queueing time so to avoid error in insertOrMerge Entity we include rest of the fields like storageName,containerName etc.
205+
partitionKey: serviceBusTask.containerName,
206+
rowKey: rowKey,
207+
blobName: serviceBusTask.blobName,
208+
containerName: serviceBusTask.containerName,
209+
storageName: serviceBusTask.storageName
210+
}
211+
var updateResult = await azureTableClient.updateEntity(entity, "Merge");
212+
context.log.verbose("Updated offset result: %s row: %s from: %d to: %d", JSON.stringify(updateResult), rowKey, serviceBusTask.startByte, newOffset);
213+
} catch (error) {
214+
context.log.error(`Error - Failed to update OffsetMap table, error: ${JSON.stringify(error)}, rowKey: ${rowKey}, newOffset: ${newOffset}`)
215+
}
216+
}
217+
218+
async function nsgLogsHandler(context, msg, serviceBusTask) {
120219

121220
var jsonArray = [];
122221
msg = msg.trim().replace(/(^,)|(,$)/g, ""); //removing trailing spaces,newlines and leftover commas
123-
jsonArray = JSON.parse("[" + msg + "]");
222+
223+
try {
224+
jsonArray = JSON.parse("[" + msg + "]");
225+
} catch(err) {
226+
let response = getParseableJsonArray(msg, context, serviceBusTask);
227+
jsonArray = response[0];
228+
let is_success = response[2];
229+
let newOffset = response[1] + serviceBusTask.startByte;
230+
if (is_success) {
231+
await setAppendBlobOffset(context, serviceBusTask, newOffset);
232+
} else {
233+
return jsonArray;
234+
}
235+
236+
}
237+
124238
var eventsArr = [];
125239
jsonArray.forEach(function (record) {
126-
version = record.properties.Version;
240+
let version = record.properties.Version;
127241
record.properties.flows.forEach(function (rule) {
128242
rule.flows.forEach(function (flow) {
129243
flow.flowTuples.forEach(function (tuple) {
130-
col = tuple.split(",");
131-
event = {
244+
let col = tuple.split(",");
245+
let event = {
132246
time: col[0], // this should be epoch time
133247
sys_id: record.systemId,
134248
category: record.category,
@@ -179,7 +293,7 @@ function jsonHandler(context,msg) {
179293
function blobHandler(context,msg) {
180294
// it's assumed that .blob files contains json separated by \n
181295
//https://docs.microsoft.com/en-us/azure/application-insights/app-insights-export-telemetry
182-
296+
183297
var jsonArray = [];
184298
msg = msg.replace(/\0/g, '');
185299
msg = msg.replace(/(\r?\n|\r)/g, ",");
@@ -238,7 +352,7 @@ function messageHandler(serviceBusTask, context, sumoClient) {
238352
context.done();
239353
return;
240354
}
241-
if (file_ext === "json" & serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent") {
355+
if ((file_ext === "json") && (serviceBusTask.containerName === "insights-logs-networksecuritygroupflowevent")) {
242356
// because in json first block and last block remain as it is and azure service adds new block in 2nd last pos
243357
if (serviceBusTask.endByte < JSON_BLOB_HEAD_BYTES + JSON_BLOB_TAIL_BYTES) {
244358
context.done(); //rejecting first commit when no data is there data will always be atleast HEAD_BYTES+DATA_BYTES+TAIL_BYTES
@@ -247,19 +361,17 @@ function messageHandler(serviceBusTask, context, sumoClient) {
247361
serviceBusTask.endByte -= JSON_BLOB_TAIL_BYTES;
248362
if (serviceBusTask.startByte <= JSON_BLOB_HEAD_BYTES) {
249363
serviceBusTask.startByte = JSON_BLOB_HEAD_BYTES;
250-
} else {
251-
serviceBusTask.startByte -= 1; //to remove comma before json object
252364
}
253365
file_ext = "nsg";
254366
}
255367
getBlockBlobService(context, serviceBusTask).then(function (blobService) {
256-
return getData(serviceBusTask, blobService, context).then(function (msg) {
368+
return getData(serviceBusTask, blobService, context).then(async function (msg) {
257369
context.log("Sucessfully downloaded blob %s %d %d", serviceBusTask.blobName, serviceBusTask.startByte, serviceBusTask.endByte);
258370
var messageArray;
259371
if (file_ext === "csv") {
260372
return getcsvHeader(serviceBusTask.containerName, serviceBusTask.blobName, blobService, context).then(function (headers) {
261373
context.log("Received headers %d", headers.length);
262-
messageArray = msghandler[file_ext](context,msg, headers);
374+
messageArray = csvHandler(context,msg, headers);
263375
// context.log("Transformed data %s", JSON.stringify(messageArray));
264376
messageArray.forEach(function (msg) {
265377
sumoClient.addData(msg);
@@ -270,7 +382,11 @@ function messageHandler(serviceBusTask, context, sumoClient) {
270382
context.done(err);
271383
});
272384
} else {
273-
messageArray = msghandler[file_ext](context,msg);
385+
if (file_ext == "nsg") {
386+
messageArray = await nsgLogsHandler(context, msg, serviceBusTask);
387+
} else {
388+
messageArray = msghandler[file_ext](context,msg);
389+
}
274390
messageArray.forEach(function (msg) {
275391
sumoClient.addData(msg);
276392
});
@@ -282,7 +398,7 @@ function messageHandler(serviceBusTask, context, sumoClient) {
282398
context.log.error("Error in messageHandler: blob file doesn't exist " + serviceBusTask.blobName + " " + serviceBusTask.startByte + " " +serviceBusTask.endByte);
283399
context.done()
284400
} else {
285-
context.log.error("Error in messageHandler: Failed to send blob " + serviceBusTask.blobName + " " + serviceBusTask.startByte + " " +serviceBusTask.endByte);
401+
context.log.error("Error in messageHandler: Failed to send blob " + serviceBusTask.blobName + " " + serviceBusTask.startByte + " " +serviceBusTask.endByte + " err: " + err);
286402
context.done(err);
287403
}
288404

@@ -347,20 +463,18 @@ function servicebushandler(context, serviceBusTask) {
347463
};
348464
setSourceCategory(serviceBusTask, options);
349465
function failureHandler(msgArray, ctx) {
350-
ctx.log("ServiceBus Task: ", serviceBusTask)
351-
ctx.log.error("Failed to send to Sumo");
466+
ctx.log.error(`Failed to send to Sumo`);
352467
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
353468
ctx.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
354469
}
355470
}
356471
function successHandler(ctx) {
357472
if (sumoClient.messagesAttempted === sumoClient.messagesReceived) {
358-
ctx.log("ServiceBus Task: ", serviceBusTask)
359473
if (sumoClient.messagesFailed > 0) {
360-
ctx.log.error('Failed to send few messages to Sumo')
474+
ctx.log.error(`Failed to send few messages to Sumo`)
361475
ctx.done("TaskConsumer failedmessages: " + sumoClient.messagesFailed);
362476
} else {
363-
ctx.log('Successfully sent to Sumo, Exiting now.');
477+
ctx.log(`Successfully sent to Sumo, Exiting now.`);
364478
ctx.done();
365479
}
366480
}
@@ -457,6 +571,16 @@ async function timetriggerhandler(context, timetrigger) {
457571
}
458572

459573
module.exports = function (context, triggerData) {
574+
// triggerData = {
575+
// "blobName": "blob_fixtures.json",
576+
// "containerName": "insights-logs-networksecuritygroupflowevent",
577+
// "endByte": 2617,
578+
// "resourceGroupName": "testsumosa250624004409",
579+
// "startByte": 0,
580+
// "storageName": "testsa250624004409",
581+
// "subscriptionId": "c088dc46-d692-42ad-a4b6-9a542d28ad2a",
582+
// "url": "https://testsa250624004409.blob.core.windows.net/insights-logs-networksecuritygroupflowevent/blob_fixtures.json"
583+
// };
460584
if (triggerData.isPastDue === undefined) {
461585
servicebushandler(context, triggerData);
462586
} else {

BlockBlobReader/src/producer.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,15 @@ function getNewTask(currentoffset,sortedcontentlengths,metadata){
124124
lastoffset = endByte;
125125
}
126126
}
127+
if (tasks.length === 0 && sortedcontentlengths.length > 0 && endByte < currentoffset) {
128+
// in NSG Flow logs sometimes file gets rewritten, hence starting the file from the beginning
129+
task = Object.assign({
130+
startByte: 0,
131+
endByte: endByte
132+
}, metadata);
133+
tasks.push(task);
134+
lastoffset = endByte;
135+
}
127136
return [tasks,lastoffset];
128137
}
129138

@@ -142,7 +151,7 @@ async function createTasksForBlob(partitionKey, rowKey, sortedcontentlengths, co
142151
}
143152
var currentoffset = retrievedResponse.statusCode === 404 ? -1 : Number(retrievedResponse.entity.offset);
144153
var currentEtag = retrievedResponse.statusCode === 404 ? null : retrievedResponse.entity.etag;
145-
var [tasks,lastoffset] = getNewTask(currentoffset,sortedcontentlengths,metadata);
154+
var [tasks,lastoffset] = getNewTask(currentoffset, sortedcontentlengths, metadata);
146155

147156
if (tasks.length > 0) { // modify offset only when it's been changed
148157
var entity = getEntity(metadata, lastoffset, currentEtag);

0 commit comments

Comments
 (0)