Skip to content

Commit

Permalink
bucket notifications - validate notifications conf on change (gh issu…
Browse files Browse the repository at this point in the history
…e 8649)

Signed-off-by: Amit Prinz Setter <[email protected]>
  • Loading branch information
alphaprinz committed Jan 9, 2025
1 parent 038ce84 commit e01196d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ async function delete_bucket(data, force) {
*/
async function bucket_management(action, user_input) {
const data = action === ACTIONS.LIST ? undefined : await fetch_bucket_data(action, user_input);
await manage_nsfs_validations.validate_bucket_args(config_fs, data, action);
await manage_nsfs_validations.validate_bucket_args(config_fs, data, action, user_input);

let response = {};
if (action === ACTIONS.ADD) {
Expand Down
13 changes: 12 additions & 1 deletion src/manage_nsfs/manage_nsfs_validations.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const { TYPES, ACTIONS, VALID_OPTIONS, OPTION_TYPE, FROM_FILE, BOOLEAN_STRING_VA
GLACIER_ACTIONS, LIST_UNSETABLE_OPTIONS, ANONYMOUS, DIAGNOSE_ACTIONS, UPGRADE_ACTIONS } = require('../manage_nsfs/manage_nsfs_constants');
const iam_utils = require('../endpoint/iam/iam_utils');
const { check_root_account_owns_user } = require('../nc/nc_utils');
const notifications_util = require('../util/notifications_util');

/////////////////////////////
//// GENERAL VALIDATIONS ////
Expand Down Expand Up @@ -348,8 +349,9 @@ async function check_new_access_key_exists(config_fs, action, data) {
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @param {object} data
* @param {string} action
* @param {object} user_input
*/
async function validate_bucket_args(config_fs, data, action) {
async function validate_bucket_args(config_fs, data, action, user_input) {
if (action === ACTIONS.ADD || action === ACTIONS.UPDATE) {
if (action === ACTIONS.ADD) native_fs_utils.validate_bucket_creation({ name: data.name });
if ((action === ACTIONS.UPDATE) && (data.new_name !== undefined)) native_fs_utils.validate_bucket_creation({ name: data.new_name });
Expand Down Expand Up @@ -403,6 +405,15 @@ async function validate_bucket_args(config_fs, data, action) {
}
}
}

//if there's a change to the bucket's notifications, we need to test them
//if one of the specified notifications fail, we need to fail the user's request
if (user_input.notifications) {
const test_notif_err = await notifications_util.test_notifications(user_input, config_fs.connections_dir_path);
if (test_notif_err) {
throw_cli_error(ManageCLIError.InvalidArgument, "Failed to update notifications", test_notif_err);
}
}
}

/////////////////////////////
Expand Down
32 changes: 24 additions & 8 deletions src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class HttpNotificator {
return;
}
dbg.error("Notify err =", err);
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(JSON.stringify(notif), err).then(resolve);
});
req.on('timeout', () => {
dbg.error("Notify timeout");
Expand Down Expand Up @@ -249,7 +249,7 @@ class KafkaNotificator {
Date.now(),
(err, offset) => {
if (err) {
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(JSON.stringify(notif), err).then(resolve);
} else {
resolve();
}
Expand Down Expand Up @@ -303,16 +303,32 @@ function get_connection(connect) {
async function test_notifications(bucket, connect_files_dir) {
const notificator = new Notificator({connect_files_dir});
for (const notif of bucket.notifications) {
const connect = await notificator.parse_connect_file(notif.connect);
dbg.log1("testing notif", notif);
let connect;
let connection;
let failure = false;
let notif_failure;
try {
const connection = get_connection(connect);
connect = await notificator.parse_connect_file(notif.topic[0]);
connection = get_connection(connect);
await connection.connect();
await connection.promise_notify({notif: "test notification"}, async err => err);
connection.destroy();
await connection.promise_notify({notif: "test notification"}, async (notif_cb, err_cb) => {
failure = true;
notif_failure = err_cb;
});
if (failure) {
if (notif_failure) {
throw notif_failure;
}
//no error was thrown during notify, throw generic error
throw new Error();
}
} catch (err) {
dbg.error("Connection failed for", connect);
dbg.error("Connection failed for", notif, ", connect =", connect, ", err = ", err);
return err;
} finally {
if (connection) {
connection.destroy();
}
}
}
}
Expand Down

0 comments on commit e01196d

Please sign in to comment.