Skip to content

Commit

Permalink
CIAC-8968 - Enhancement Hashicorp Vault integration (#34795)
Browse files Browse the repository at this point in the history
* generate_secretID command

* test

* pre commit fixes

* role id command and tests

* some changes

* login fix and docker image

* some details

* pre-commit fixes

* yml fix

* release-notes

* update headers

* fix

* space

* release notes

* release-notes

* role id not found exception

* dockerimage
  • Loading branch information
inbalapt1 authored Nov 3, 2024
1 parent ea7b011 commit dbf306e
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 37 deletions.
133 changes: 100 additions & 33 deletions Packs/HashiCorp-Vault/Integrations/HashiCorpVault/HashiCorpVault.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
def get_headers():
headers = {
'Content-Type': 'application/json',
'X-Vault-Request': 'true'
}

if TOKEN: # pragma: no cover
Expand All @@ -44,10 +45,10 @@ def get_headers():

def login(): # pragma: no cover
if USE_APPROLE_AUTH_METHOD:
path = 'auth/approle/login'
path = 'auth/approle/login' # type: ignore
body = {
'role_id': USERNAME,
'secret_id': PASSWORD,
'secret_id': PASSWORD
}
else:
path = 'auth/userpass/login/' + USERNAME # type: ignore
Expand All @@ -56,16 +57,18 @@ def login(): # pragma: no cover
}

url = urljoin(SERVER_URL, path)
res = requests.request('POST', url, headers=get_headers(), data=json.dumps(body), verify=VERIFY_SSL)
payload = json.dumps(body)
headers = get_headers()
res = requests.request("POST", url, headers=headers, data=payload, verify=VERIFY_SSL, allow_redirects=True)
if (res.status_code < 200 or res.status_code >= 300) and res.status_code not in DEFAULT_STATUS_CODES:
try:
error_body = res.json()
if 'errors' in error_body and isinstance(error_body['errors'], list):
error_body = ';'.join(error_body['errors']) if len(error_body['errors']) > 0 else 'None'
except Exception as ex:
demisto.error("Error in login (parsing error msg): {}".format(ex))
demisto.error(f"Error in login (parsing error msg): {ex}")
error_body = res.content
return_error('Login failed. Status code: {}, details: {}'.format(str(res.status_code), error_body))
return_error(f'Login failed. Status code: {str(res.status_code)}, details: {error_body}')

auth_res = res.json()
if not auth_res or 'auth' not in auth_res or 'client_token' not in auth_res['auth']:
Expand All @@ -88,9 +91,9 @@ def send_request(path, method='get', body=None, params=None, headers=None):
if 'errors' in error_body and isinstance(error_body['errors'], list):
error_body = ';'.join(error_body['errors']) if len(error_body['errors']) > 0 else 'None'
except Exception as ex:
demisto.error("Error in send_request (parsing error msg): {}".format(ex))
demisto.error(f"Error in send_request (parsing error msg): {ex}")
error_body = res.content
return_error('Request failed. Status code: {}, details: {}'.format(str(res.status_code), error_body))
return_error(f'Request failed. Status code: {str(res.status_code)}, details: {error_body}')
if res.content:
return res.json()
return ''
Expand All @@ -99,6 +102,64 @@ def send_request(path, method='get', body=None, params=None, headers=None):
''' FUNCTIONS '''


def generate_role_secret_command():
"""
Generate a secret ID for a specified AppRole in the authentication system.
Args:
args (dict): A dictionary containing the following keys:
- 'role_name' (required): The name of the AppRole for which the secret ID is generated.
- 'meta_data': Metadata associated with the secret ID.
- 'cidr_list': Comma-separated list of CIDR blocks from which requests using the secret ID are allowed.
- 'token_bound_cidrs': Comma-separated list of CIDR blocks to restrict tokens issued with this secret ID.
- 'num_uses': Number of times the secret ID can be used before it expires.
- 'ttl_seconds': Time duration in seconds for which the secret ID remains valid.
Returns:
CommandResults: The command results object containing the response from the Vault server as readable output.
"""
args = demisto.args()
role_name = args.get('role_name')
meta_data = args.get('meta_data')
cidr_list = argToList(args.get('cidr_list', ''))
token_bound_cidrs = argToList(args.get('token_bound_cidrs', ''))
num_uses = arg_to_number(args.get('num_uses', ''))
ttl_seconds = arg_to_number(args.get('ttl_seconds', ''))

path = f'/auth/approle/role/{role_name}/secret-id'
body = {
"metadata": meta_data,
"cidr_list": cidr_list,
"token_bound_cidrs": token_bound_cidrs,
"ttl": ttl_seconds,
"num_uses": num_uses
}
body = remove_empty_elements(body)

response = send_request(path=path, method='post', body=body)
return_results(CommandResults(readable_output=response))


def get_role_id_command():
"""
Retrieve the Role ID associated with a specified AppRole from the authentication system.
Args:
args (dict): A dictionary containing the following keys:
- 'role_name' (required): The name of the AppRole for which the Role ID is retrieved.
Returns:
CommandResults: The command results object containing the retrieved Role ID and role name as outputs.
"""
args = demisto.args()
role_name = args.get('role_name')
path = f'/auth/approle/role/{role_name}/role-id'
response = send_request(path=path, method='get', body={'role_name': role_name})
if response:
role_id = response.get('data', {}).get('role_id', '')

if not role_id:
raise DemistoException(f"Role ID not found for AppRole '{role_name}'. Please check the role name and try again.")

return_results(CommandResults(outputs_prefix='HashiCorp.AppRole', outputs={"Id": role_id, "Name": role_name}))


def list_secrets_engines_command(): # pragma: no cover
res = list_secrets_engines()

Expand Down Expand Up @@ -645,7 +706,7 @@ def get_kv1_secrets(engine_path, concat_username_to_cred_name=False): # pragma:
secret_data = get_kv1_secret(engine_path, secret)
for k, v in secret_data.get('data', {}).items():
if concat_username_to_cred_name:
name = '{0}_{1}'.format(secret, k)
name = f'{secret}_{k}'
else:
name = secret
secrets.append({
Expand All @@ -670,15 +731,15 @@ def get_kv2_secrets(engine_path, concat_username_to_cred_name=False, folder=None
return []

for secret in res['data'].get('keys', []):
if str(secret).endswith('/') and not secret.replace('/', '') == folder:
demisto.debug('Could not get secrets from path: {}'.format(secret))
if str(secret).endswith('/') and secret.replace('/', '') != folder:
demisto.debug(f'Could not get secrets from path: {secret}')
continue

secret_data = get_kv2_secret(engine_path, secret, folder)
secret_info = secret_data.get('data', {}).get('data', {})
for k in secret_data.get('data', {}).get('data', {}):
if concat_username_to_cred_name:
name = '{0}_{1}'.format(secret, k)
name = f'{secret}_{k}'
else:
name = secret
secrets.append({
Expand Down Expand Up @@ -717,7 +778,7 @@ def get_ch_secrets(engine_path, concat_username_to_cred_name=False): # pragma:
secret_data = get_ch_secret(engine_path, secret)
for k, v in secret_data.get('data', {}).items():
if concat_username_to_cred_name:
name = '{0}_{1}'.format(secret, k)
name = f'{secret}_{k}'
else:
name = secret
secrets.append({
Expand All @@ -732,7 +793,7 @@ def get_ch_secrets(engine_path, concat_username_to_cred_name=False): # pragma:
def get_aws_secrets(engine_path, concat_username_to_cred_name, aws_roles_list, aws_method):
secrets = []
roles_list_url = engine_path + '/roles'
demisto.debug('roles_list_url: {}'.format(roles_list_url))
demisto.debug(f'roles_list_url: {roles_list_url}')
params = {'list': 'true'}
res = send_request(roles_list_url, 'get', params=params)
if not res or 'data' not in res:
Expand All @@ -741,7 +802,7 @@ def get_aws_secrets(engine_path, concat_username_to_cred_name, aws_roles_list, a
if aws_roles_list and role not in aws_roles_list:
continue
role_url = urljoin(engine_path, urljoin('/roles/', role))
demisto.debug('role_url: {}'.format(role_url))
demisto.debug(f'role_url: {role_url}')
role_data = send_request(role_url, 'get')
if not role_data or 'data' not in role_data:
return []
Expand All @@ -758,7 +819,7 @@ def get_aws_secrets(engine_path, concat_username_to_cred_name, aws_roles_list, a
method = 'GET'
credential_type = 'creds'
generate_credentials_url = urljoin(engine_path + '/', urljoin(credential_type, '/' + role))
demisto.debug('generate_credentials_url: {}'.format(generate_credentials_url))
demisto.debug(f'generate_credentials_url: {generate_credentials_url}')
body = {}
if 'role_arns' in role_data['data']:
body['role_arns'] = role_data['data'].get('role_arns', [])
Expand All @@ -770,7 +831,7 @@ def get_aws_secrets(engine_path, concat_username_to_cred_name, aws_roles_list, a
if aws_credentials['data'].get('security_token'):
secret_key = secret_key + '@@@' + aws_credentials["data"].get("security_token")
if concat_username_to_cred_name:
role = '{0}_{1}'.format(role, access_key)
role = f'{role}_{access_key}'
secrets.append({
'user': access_key,
'password': secret_key,
Expand Down Expand Up @@ -808,40 +869,46 @@ def get_ch_secret(engine_path, secret):
ENGINE_CONFIGS = integration_context['configs']

try:
if demisto.command() == 'test-module':
command = demisto.command()
if command == 'test-module':
demisto.results('ok')
elif demisto.command() == 'fetch-credentials':
elif command == 'fetch-credentials':
fetch_credentials()
elif demisto.command() == 'hashicorp-list-secrets-engines':
elif command == 'hashicorp-list-secrets-engines':
list_secrets_engines_command()
elif demisto.command() == 'hashicorp-list-secrets':
elif command == 'hashicorp-list-secrets':
list_secrets_command()
elif demisto.command() == 'hashicorp-list-policies':
elif command == 'hashicorp-list-policies':
list_policies_command()
elif demisto.command() == 'hashicorp-get-policy':
elif command == 'hashicorp-get-policy':
get_policy_command()
elif demisto.command() == 'hashicorp-get-secret-metadata':
elif command == 'hashicorp-get-secret-metadata':
get_secret_metadata_command()
elif demisto.command() == 'hashicorp-delete-secret':
elif command == 'hashicorp-delete-secret':
delete_secret_command()
elif demisto.command() == 'hashicorp-undelete-secret':
elif command == 'hashicorp-undelete-secret':
undelete_secret_command()
elif demisto.command() == 'hashicorp-destroy-secret':
elif command == 'hashicorp-destroy-secret':
destroy_secret_command()
elif demisto.command() == 'hashicorp-disable-engine':
elif command == 'hashicorp-disable-engine':
disable_engine_command()
elif demisto.command() == 'hashicorp-enable-engine':
elif command == 'hashicorp-enable-engine':
enable_engine_command()
elif demisto.command() == 'hashicorp-seal-vault':
elif command == 'hashicorp-seal-vault':
seal_vault_command()
elif demisto.command() == 'hashicorp-unseal-vault':
elif command == 'hashicorp-unseal-vault':
unseal_vault_command()
elif demisto.command() == 'hashicorp-create-token':
elif command == 'hashicorp-create-token':
create_token_command()
elif demisto.command() == 'hashicorp-configure-engine':
elif command == 'hashicorp-configure-engine':
configure_engine_command()
elif demisto.command() == 'hashicorp-reset-configuration':
elif command == 'hashicorp-reset-configuration':
reset_config_command()
elif command == 'hashicorp-generate-role-secret':
generate_role_secret_command()
elif command == 'hashicorp-get-role-id':
get_role_id_command()

except Exception as e:
demisto.debug(f'An error occurred: {e}')
return_error(f'An error occurred: {e}')
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,40 @@ script:
description: Authentication lease duration in seconds, 0 if indefinitely.
type: number
description: Creates a new authentication token.
dockerimage: demisto/vendors-sdk:1.0.0.87491
- name: hashicorp-generate-role-secret
arguments:
- name: role_name
required: true
description: The name of the AppRole.
- name: meta_data
description: Metadata to be tied to the SecretID.
- name: cidr_list
description: Comma separated string or list of CIDR blocks enforcing secret IDs to be used from specific set of IP addresses.
isArray: true
- name: token_bound_cidrs
description: Comma-separated string or list of CIDR blocks.
isArray: true
- name: num_uses
description: Number of times this SecretID can be used, after which the SecretID expires. A value of zero will allow unlimited uses.
type: number
- name: ttl_seconds
description: Duration in seconds after which this SecretID expires. A value of zero will allow the SecretID to not expire.
type: number
description: Generates and issues a new SecretID on an existing AppRole.
- name: hashicorp-get-role-id
arguments:
- name: role_name
required: true
description: The name of the AppRole.
outputs:
- contextPath: HashiCorp.AppRole.Id
description: AppRole ID.
type: string
- contextPath: HashiCorp.AppRole.Name
description: AppRole Name.
type: string
description: Retrieves the AppRole ID for a specified role.
dockerimage: demisto/vendors-sdk:1.0.0.114678
tests:
- hashicorp_test
fromversion: 5.0.0
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_get_aws_secrets(mocker):


def test_get_headers():
assert get_headers() == {'Content-Type': 'application/json'}
assert get_headers() == {'Content-Type': 'application/json', 'X-Vault-Request': 'true'}


def test_list_secrets_engines(mocker):
Expand Down Expand Up @@ -113,3 +113,86 @@ def test_seal_vault(mocker):
def test_disable_engine(mocker):
mocker.patch('HashiCorpVault.send_request', return_value={})
assert disable_engine('test') == {}


def test_generate_role_secret_command(mocker):
"""
Given:
A set of command arguments including role_name, meta_data, cidr_list, token_bound_cidrs, num_uses, and ttl_seconds.
When:
Executing the generate_role_secret_command function to generate a secret ID for a given role.
Then:
Verify that the send_request function is called with the correct path and body, and that the return_results function
is called with the expected result containing the secret_id.
"""
mock_demisto = mocker.patch('HashiCorpVault.demisto.args')
mock_return_results = mocker.patch('HashiCorpVault.return_results')
response = {'secret_id': '123'}
mock_send_request = mocker.patch('HashiCorpVault.send_request', return_value=response)
mock_demisto.return_value = {
'role_name': 'test_role',
'meta_data': 'test_metadata',
'cidr_list': '',
'token_bound_cidrs': '',
'num_uses': '5',
'ttl_seconds': '3600'
}
mock_send_request.return_value = {'secret_id': '123'}

generate_role_secret_command()

mock_send_request.assert_called_once_with(
path='/auth/approle/role/test_role/secret-id',
method='post',
body={
"metadata": 'test_metadata',
"ttl": 3600,
"num_uses": 5
}
)
mock_return_results.assert_called_once()
result_call_args = mock_return_results.call_args[0][0]
assert 'secret_id' in result_call_args.readable_output
assert result_call_args.readable_output['secret_id'] == '123'


def test_get_role_id_command(mocker):
"""
Given:
A set of command arguments including role_name.
When:
Executing the get_role_id_command function to retrieve the role ID for a given role.
Then:
Verify that the send_request function is called with the correct path, method, and body, and that the return_results
function is called with the expected result containing the role_id.
"""
mock_demisto_args = mocker.patch('HashiCorpVault.demisto.args')
mock_return_results = mocker.patch('HashiCorpVault.return_results')
mock_send_request = mocker.patch('HashiCorpVault.send_request')
mock_demisto_args.return_value = {
'role_name': 'test_role'
}
mock_send_request.return_value = {
'data': {
'role_id': '12345'
}
}

expected_path = '/auth/approle/role/test_role/role-id'
expected_method = 'get'
expected_body = {
'role_name': 'test_role'
}
expected_outputs = {'Id': '12345', 'Name': 'test_role'}

get_role_id_command()

mock_send_request.assert_called_once_with(path=expected_path, method=expected_method, body=expected_body)
mock_return_results.assert_called_once()
result_call_args = mock_return_results.call_args[0][0]
assert result_call_args.outputs == expected_outputs
assert result_call_args.outputs_prefix == 'HashiCorp.AppRole'
Loading

0 comments on commit dbf306e

Please sign in to comment.