diff --git a/.changes/next-release/configure-tempcreds.json b/.changes/next-release/configure-tempcreds.json new file mode 100644 index 000000000000..41f92b036cd6 --- /dev/null +++ b/.changes/next-release/configure-tempcreds.json @@ -0,0 +1,11 @@ +{ + "type": "enhancement", + "category": "``configure``", + "description": "Added support for temporary credentials with ``aws configure``. The CLI will prompt for an ``aws_session_token`` if the provided access key ID is temporary." +}, +{ + "type": "enhancement", + "category": "``configure``", + "description": "Added the new ``aws configure mfa-login` command, which creates a profile with temporary credentials corresponding to an IAM user with an MFA code." +}, +] \ No newline at end of file diff --git a/awscli/customizations/configure/configure.py b/awscli/customizations/configure/configure.py index 93eb5e53a81f..9160a09b5abe 100644 --- a/awscli/customizations/configure/configure.py +++ b/awscli/customizations/configure/configure.py @@ -12,6 +12,7 @@ # language governing permissions and limitations under the License. import logging import os +import sys from botocore.exceptions import ProfileNotFound @@ -25,6 +26,7 @@ from awscli.customizations.configure.importer import ConfigureImportCommand from awscli.customizations.configure.list import ConfigureListCommand from awscli.customizations.configure.listprofiles import ListProfilesCommand +from awscli.customizations.configure.mfalogin import ConfigureMFALoginCommand from awscli.customizations.configure.set import ConfigureSetCommand from awscli.customizations.configure.sso import ( ConfigureSSOCommand, @@ -43,9 +45,13 @@ def register_configure_cmd(cli): class InteractivePrompter: def get_value(self, current_value, config_name, prompt_text=''): - if config_name in ('aws_access_key_id', 'aws_secret_access_key'): + if config_name in ( + 'aws_access_key_id', + 'aws_secret_access_key', + 'aws_session_token', + ): current_value = mask_value(current_value) - response = compat_input("%s [%s]: " % (prompt_text, current_value)) + response = compat_input(f"{prompt_text} [{current_value}]: ") if not response: # If the user hits enter, we return a value of None # instead of an empty string. That way we can determine @@ -84,6 +90,7 @@ class ConfigureCommand(BasicCommand): {'name': 'list-profiles', 'command_class': ListProfilesCommand}, {'name': 'sso', 'command_class': ConfigureSSOCommand}, {'name': 'sso-session', 'command_class': ConfigureSSOSessionCommand}, + {'name': 'mfa-login', 'command_class': ConfigureMFALoginCommand}, { 'name': 'export-credentials', 'command_class': ConfigureExportCredentialsCommand, @@ -95,12 +102,13 @@ class ConfigureCommand(BasicCommand): # (logical_name, config_name, prompt_text) ('aws_access_key_id', "AWS Access Key ID"), ('aws_secret_access_key', "AWS Secret Access Key"), + ('aws_session_token', "AWS Session Token"), ('region', "Default region name"), ('output', "Default output format"), ] def __init__(self, session, prompter=None, config_writer=None): - super(ConfigureCommand, self).__init__(session) + super().__init__(session) if prompter is None: prompter = InteractivePrompter() self._prompter = prompter @@ -108,8 +116,24 @@ def __init__(self, session, prompter=None, config_writer=None): config_writer = ConfigFileWriter() self._config_writer = config_writer + def _needs_session_token(self, new_values): + """Check if session token is needed based on access key ID.""" + access_key = new_values.get('aws_access_key_id') + return access_key and access_key.startswith('ASIA') + + def _should_prompt_for_session_token(self, new_values, config): + """Determine if we should prompt for session token.""" + # Don't prompt if explicitly switching to long-term credentials + new_access_key = new_values.get('aws_access_key_id') + if new_access_key and not self._needs_session_token(new_values): + return False + + # Prompt if needed for temporary credentials or if already exists + return self._needs_session_token(new_values) or config.get('aws_session_token') + def _run_main(self, parsed_args, parsed_globals): # Called when invoked with no args "aws configure" + new_values = {} # This is the config from the config file scoped to a specific # profile. @@ -117,13 +141,25 @@ def _run_main(self, parsed_args, parsed_globals): config = self._session.get_scoped_config() except ProfileNotFound: config = {} + for config_name, prompt_text in self.VALUES_TO_PROMPT: + if config_name == 'aws_session_token' and not self._should_prompt_for_session_token(new_values, config): + continue + current_value = config.get(config_name) new_value = self._prompter.get_value( current_value, config_name, prompt_text ) if new_value is not None and new_value != current_value: new_values[config_name] = new_value + + # Remove session token for non-temporary credentials + if ( + 'aws_access_key_id' in new_values + and not self._needs_session_token(new_values) + and config.get('aws_session_token') + ): + new_values['aws_session_token'] = None config_filename = os.path.expanduser( self._session.get_config_variable('config_file') ) @@ -150,6 +186,10 @@ def _write_out_creds_file_values(self, new_values, profile_name): credential_file_values['aws_secret_access_key'] = new_values.pop( 'aws_secret_access_key' ) + if 'aws_session_token' in new_values: + credential_file_values['aws_session_token'] = new_values.pop( + 'aws_session_token' + ) if credential_file_values: if profile_name is not None: credential_file_values['__section__'] = profile_name diff --git a/awscli/customizations/configure/mfalogin.py b/awscli/customizations/configure/mfalogin.py new file mode 100644 index 000000000000..885e624c45a7 --- /dev/null +++ b/awscli/customizations/configure/mfalogin.py @@ -0,0 +1,280 @@ +# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import logging +import os +import random +import string +import sys + +# Import botocore at module level to avoid repeated imports +import botocore.session +from botocore.exceptions import ClientError, ProfileNotFound + +from awscli.compat import compat_input +from awscli.customizations.commands import BasicCommand +from awscli.customizations.configure import profile_to_section +from awscli.customizations.configure.writer import ConfigFileWriter + +LOG = logging.getLogger(__name__) + + +class InteractiveMFAPrompter: + """Handles interactive prompting for MFA login.""" + + def get_value(self, current_value, prompt_text=''): + """Prompt for a value, showing the current value as a default.""" + response = compat_input(f"{prompt_text} [{current_value}]: ") + if not response: + # If the user hits enter, return the current value + return current_value + return response + + def get_credential_value(self, current_value, config_name, prompt_text=''): + """Prompt for credential values with masking for sensitive data.""" + response = compat_input(f"{prompt_text}: ") + if not response: + return None + return response + + +class ConfigureMFALoginCommand(BasicCommand): + """Configures MFA login for AWS CLI by creating temporary credentials.""" + + NAME = 'mfa-login' + DESCRIPTION = BasicCommand.FROM_FILE( + 'configure', 'mfa-login', '_description.rst' + ) + SYNOPSIS = ( + 'aws configure mfa-login [--profile profile-name] ' + '[--update-profile profile-to-update] [--duration-seconds seconds] ' + '[--serial-number mfa-serial-number]' + ) + EXAMPLES = BasicCommand.FROM_FILE('configure', 'mfa-login', '_examples.rst') + ARG_TABLE = [ + { + 'name': 'update-profile', + 'help_text': ( + 'The profile to update with temporary credentials. ' + 'If not provided, a default name will be generated.' + ), + 'action': 'store', + 'required': False, + 'cli_type_name': 'string', + }, + { + 'name': 'duration-seconds', + 'help_text': ( + 'The duration, in seconds, that the credentials should remain valid. ' + 'Minimum is 900 seconds (15 minutes), maximum is 129600 seconds (36 hours).' + ), + 'action': 'store', + 'required': False, + 'cli_type_name': 'integer', + 'default': 43200, + }, + { + 'name': 'serial-number', + 'help_text': ( + 'The ARN or serial number of the MFA device associated with the IAM user. ' + 'If not provided, will use the mfa_serial from the profile configuration.' + ), + 'action': 'store', + 'required': False, + 'cli_type_name': 'string', + }, + ] + + # Values to prompt for during interactive setup + VALUES_TO_PROMPT = [ + ('aws_access_key_id', 'AWS Access Key ID'), + ('aws_secret_access_key', 'AWS Secret Access Key'), + ('mfa_serial', 'MFA serial number or ARN'), + ('mfa_token', 'MFA token code'), + ] + + def __init__(self, session, prompter=None, config_writer=None): + super().__init__(session) + if prompter is None: + prompter = InteractiveMFAPrompter() + self._prompter = prompter + if config_writer is None: + config_writer = ConfigFileWriter() + self._config_writer = config_writer + + def _generate_profile_name_from_mfa(self, mfa_serial): + """Generate a deterministic profile name from MFA serial/ARN.""" + if isinstance(mfa_serial, str) and mfa_serial.startswith('arn:aws:iam::'): + # Parse ARN: arn:aws:iam::123456789012:mfa/device-name + parts = mfa_serial.split(':') + account_id = parts[4] + device_name = parts[5].split('/')[-1] # Get device name after 'mfa/' + return f"{account_id}-{device_name}" + else: + # Assume it's just a serial number + return f"session-{mfa_serial}" + + def _get_target_profile(self, parsed_args, mfa_serial=None): + """Get or generate the target profile name.""" + target_profile = parsed_args.update_profile + if not target_profile: + if mfa_serial: + target_profile = self._generate_profile_name_from_mfa(mfa_serial) + else: + target_profile = "session-temp" + target_profile = self._prompter.get_value( + target_profile, 'Profile to update' + ) + return target_profile + + def _get_mfa_token(self): + """Prompt for MFA token code.""" + token_code = self._prompter.get_credential_value( + 'None', 'mfa_token', 'MFA token code' + ) + if not token_code: + sys.stderr.write("MFA token code is required\n") + return None + return token_code + + def _call_sts_get_session_token(self, sts_client, duration_seconds, mfa_serial, token_code): + """Call STS to get temporary credentials.""" + try: + response = sts_client.get_session_token( + DurationSeconds=duration_seconds, + SerialNumber=mfa_serial, + TokenCode=token_code, + ) + return response + except ClientError as e: + sys.stderr.write(f"An error occurred: {e}\n") + return None + + def _resolve_mfa_serial(self, parsed_args, source_config): + """Resolve MFA serial from args, config, or prompt.""" + mfa_serial = parsed_args.serial_number or source_config.get('mfa_serial') + if not mfa_serial: + mfa_serial = self._prompter.get_credential_value( + 'None', 'mfa_serial', 'MFA serial number or ARN' + ) + if not mfa_serial: + sys.stderr.write("MFA serial number or MFA device ARN is required\n") + return None + return mfa_serial + + def _write_temporary_credentials(self, temp_credentials, target_profile): + """Write temporary credentials to the credentials file.""" + credentials_file = os.path.expanduser(self._session.get_config_variable('credentials_file')) + + credential_values = { + '__section__': target_profile, + 'aws_access_key_id': temp_credentials['AccessKeyId'], + 'aws_secret_access_key': temp_credentials['SecretAccessKey'], + 'aws_session_token': temp_credentials['SessionToken'], + } + + try: + expiration_time = temp_credentials['Expiration'].strftime( + '%Y-%m-%d %H:%M:%S UTC' + ) + except AttributeError: + expiration_time = str(temp_credentials['Expiration']) + + self._config_writer.update_config(credential_values, credentials_file) + + sys.stdout.write( + f"Temporary credentials written to profile '{target_profile}'\n" + ) + sys.stdout.write(f"Credentials will expire at {expiration_time}\n") + sys.stdout.write( + f"To use these credentials, specify --profile {target_profile} when running AWS CLI commands\n" + ) + return 0 + + def _run_main(self, parsed_args, parsed_globals): + duration_seconds = parsed_args.duration_seconds + + # Use the CLI session directly + credentials = self._session.get_credentials() + if credentials is None: + return self._handle_interactive_prompting(parsed_args, duration_seconds) + + source_config = self._session.get_scoped_config() + + # Resolve MFA serial number + mfa_serial = self._resolve_mfa_serial(parsed_args, source_config) + if not mfa_serial: + return 1 + + # Get MFA token code + token_code = self._get_mfa_token() + if not token_code: + return 1 + + # Get the target profile name + target_profile = self._get_target_profile(parsed_args, mfa_serial) + + # Call STS to get temporary credentials + sts_client = self._session.create_client('sts') + response = self._call_sts_get_session_token( + sts_client, duration_seconds, mfa_serial, token_code + ) + if not response: + return 1 + + # Write credentials and return + return self._write_temporary_credentials( + response['Credentials'], target_profile + ) + + def _handle_interactive_prompting(self, parsed_args, duration_seconds): + """Handle the case where no default profile exists, and there is no profile explicitly named as a configuration source""" + sys.stdout.write( + "Please provide your AWS credentials:\n" + ) + + values = {} + for config_name, prompt_text in self.VALUES_TO_PROMPT: + if config_name == 'mfa_serial' and parsed_args.serial_number: + values[config_name] = parsed_args.serial_number + continue + + value = self._prompter.get_credential_value( + 'None', config_name, prompt_text + ) + if not value or value == 'None': + sys.stderr.write(f"{prompt_text} is required\n") + return 1 + values[config_name] = value + + # Get the target profile name + target_profile = self._get_target_profile(parsed_args, values['mfa_serial']) + + # Create STS client with the provided credentials + session = botocore.session.Session() + sts_client = session.create_client( + 'sts', + aws_access_key_id=values['aws_access_key_id'], + aws_secret_access_key=values['aws_secret_access_key'], + ) + + # Call STS to get temporary credentials + response = self._call_sts_get_session_token( + sts_client, duration_seconds, values['mfa_serial'], values['mfa_token'] + ) + if not response: + return 1 + + # Write credentials and return + return self._write_temporary_credentials( + response['Credentials'], target_profile + ) \ No newline at end of file diff --git a/awscli/customizations/configure/writer.py b/awscli/customizations/configure/writer.py index d4daa9dea35a..8a173ab68673 100644 --- a/awscli/customizations/configure/writer.py +++ b/awscli/customizations/configure/writer.py @@ -93,7 +93,7 @@ def _write_new_section(self, section_name, new_values, config_filename): with open(config_filename, 'a') as f: if needs_newline: f.write('\n') - f.write('[%s]\n' % section_name) + f.write(f'[{section_name}]\n') contents = [] self._insert_new_values( line_number=0, contents=contents, new_values=new_values @@ -148,8 +148,12 @@ def _update_section_contents(self, contents, section_name, new_values): # out now. if not isinstance(new_values[key_name], dict): option_value = new_values[key_name] - new_line = '%s = %s\n' % (key_name, option_value) - contents[j] = new_line + if option_value is None: + # Remove the line by replacing with empty string + contents[j] = '' + else: + new_line = f'{key_name} = {option_value}\n' + contents[j] = new_line del new_values[key_name] else: j = self._update_subattributes( @@ -182,10 +186,8 @@ def _update_subattributes(self, index, contents, values, starting_indent): key_name = match.group(1).strip() if key_name in values: option_value = values[key_name] - new_line = '%s%s = %s\n' % ( - ' ' * current_indent, - key_name, - option_value, + new_line = ( + f"{' ' * current_indent}{key_name} = {option_value}\n" ) contents[i] = new_line del values[key_name] @@ -208,23 +210,21 @@ def _insert_new_values(self, line_number, contents, new_values, indent=''): for key, value in list(new_values.items()): if isinstance(value, dict): subindent = indent + ' ' - new_contents.append('%s%s =\n' % (indent, key)) + new_contents.append(f'{indent}{key} =\n') for subkey, subval in list(value.items()): - new_contents.append( - '%s%s = %s\n' % (subindent, subkey, subval) - ) - else: - new_contents.append('%s%s = %s\n' % (indent, key, value)) + new_contents.append(f'{subindent}{subkey} = {subval}\n') + elif value is not None: + new_contents.append(f'{indent}{key} = {value}\n') del new_values[key] - contents.insert(line_number + 1, ''.join(new_contents)) + if new_contents: + contents.insert(line_number + 1, ''.join(new_contents)) def _matches_section(self, match, section_name): parts = section_name.split(' ') - unquoted_match = match.group(0) == '[%s]' % section_name + unquoted_match = match.group(0) == f'[{section_name}]' if len(parts) > 1: - quoted_match = match.group(0) == '[%s "%s"]' % ( - parts[0], - ' '.join(parts[1:]), + quoted_match = ( + match.group(0) == f'[{parts[0]} "{" ".join(parts[1:])}"]' ) return unquoted_match or quoted_match return unquoted_match diff --git a/awscli/examples/configure/mfa-login.rst b/awscli/examples/configure/mfa-login.rst new file mode 100644 index 000000000000..b5290179248f --- /dev/null +++ b/awscli/examples/configure/mfa-login.rst @@ -0,0 +1,49 @@ +**To create a new profile with temporary MFA credentials** + +The following ``mfa-login`` command creates a new profile with temporary credentials obtained using MFA authentication. :: + + aws configure mfa-login + +Output:: + + MFA serial number or ARN: arn:aws:iam::123456789012:mfa/MFADeviceName + MFA token code: 123456 + Profile to update [session-MFADeviceName]: + Temporary credentials written to profile 'session-MFADeviceName' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile session-MFADeviceName when running AWS CLI commands + +**To create credentials when no default profile exists** + +If you don't have a default profile configured, the ``mfa-login`` command will prompt you for your AWS credentials first. :: + + aws configure mfa-login + +Output:: + + No default profile found. Please provide your AWS credentials: + AWS Access Key ID: AKIAIOSFODNN7EXAMPLE + AWS Secret Access Key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY + MFA serial number or ARN: arn:aws:iam::123456789012:mfa/MFADeviceName + MFA token code: 123456 + Profile to update [session-MFADeviceName]: + Temporary credentials written to profile 'session-MFADeviceName' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile session-MFADeviceName when running AWS CLI commands + +**To update an existing profile with temporary MFA credentials** + +The following ``mfa-login`` command updates an existing profile with temporary credentials obtained using MFA authentication. :: + + aws configure mfa-login --profile myprofile --update-profile mfaprofile + +Output:: + + MFA token code: 123456 + Temporary credentials written to profile 'mfaprofile' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile mfaprofile when running AWS CLI commands + +**Note:** This command currently supports only hardware or software based one-time password (OTP) authenticators. Passkeys and U2F devices are not currently supported with this command. + +For more information, see `Using Multi-Factor Authentication (MFA) in AWS `__ in the *AWS IAM User Guide*. \ No newline at end of file diff --git a/awscli/examples/configure/mfa-login/_description.rst b/awscli/examples/configure/mfa-login/_description.rst new file mode 100644 index 000000000000..e028526c9c10 --- /dev/null +++ b/awscli/examples/configure/mfa-login/_description.rst @@ -0,0 +1,3 @@ +This command gets temporary AWS security credentials for use with the AWS CLI and SDK, and places them in an AWS profile. It will use a long-lived IAM user access key, and the MFA code from either a virtual TOTP MFA device, or a hardware OTP authenticator to call STS get-session-token to get the temporary credentials. If no default profile exists, the command will prompt you to provide your AWS credentials first. + +Note: This command currently supports only hardware or software based one-time password (OTP) authenticators. Passkeys and U2F devices are not currently supported. \ No newline at end of file diff --git a/awscli/examples/configure/mfa-login/_examples.rst b/awscli/examples/configure/mfa-login/_examples.rst new file mode 100644 index 000000000000..6643b6005316 --- /dev/null +++ b/awscli/examples/configure/mfa-login/_examples.rst @@ -0,0 +1,47 @@ +**To create a new profile with temporary MFA credentials** + +The following ``mfa-login`` command creates a new profile with temporary credentials obtained using MFA authentication. :: + + aws configure mfa-login + +Output:: + + MFA serial number or ARN: arn:aws:iam::123456789012:mfa/user + MFA token code: 123456 + Profile to update [session-12345]: + Temporary credentials written to profile 'session-12345' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile session-12345 when running AWS CLI commands + +**To update an existing profile with temporary MFA credentials** + +The following ``mfa-login`` command updates an existing profile with temporary credentials obtained using MFA authentication. :: + + aws configure mfa-login --profile myprofile --update-profile mytemp + +Output:: + + MFA token code: 123456 + Temporary credentials written to profile 'mytemp' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile mytemp when running AWS CLI commands + +**To create credentials when no default profile exists** + +If you don't have a default profile configured, the ``mfa-login`` command will prompt you for your AWS credentials first. :: + + aws configure mfa-login + +Output:: + + No default profile found. Please provide your AWS credentials: + AWS Access Key ID: AKIAIOSFODNN7EXAMPLE + AWS Secret Access Key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY + MFA serial number or ARN: arn:aws:iam::123456789012:mfa/user + MFA token code: 123456 + Profile to update [session-12345]: + Temporary credentials written to profile 'session-12345' + Credentials will expire at 2023-05-19 18:06:10 UTC + To use these credentials, specify --profile session-12345 when running AWS CLI commands + +**Note:** This command currently supports only hardware or software based one-time password (OTP) authenticators. Passkeys and U2F devices are not currently supported. \ No newline at end of file diff --git a/tests/unit/customizations/configure/test_configure.py b/tests/unit/customizations/configure/test_configure.py index 386de2be0528..d68c33c81385 100644 --- a/tests/unit/customizations/configure/test_configure.py +++ b/tests/unit/customizations/configure/test_configure.py @@ -154,6 +154,84 @@ def test_session_says_profile_does_not_exist(self): 'myconfigfile', ) + def test_temporary_credentials_prompts_for_session_token(self): + # When user enters temporary credentials (starting with ASIA), + # should prompt for session token after secret key + responses = { + "AWS Access Key ID": "ASIATEMP123456789", + "AWS Secret Access Key": "secret123", + "AWS Session Token": "session_token_123", + "Default region name": "us-west-2", + "Default output format": "json", + } + prompter = KeyValuePrompter(responses) + self.configure = configure.ConfigureCommand( + self.session, prompter=prompter, config_writer=self.writer + ) + self.configure(args=[], parsed_globals=self.global_args) + + # Should write all three credential values to credentials file + self.assert_credentials_file_updated_with( + { + 'aws_access_key_id': 'ASIATEMP123456789', + 'aws_secret_access_key': 'secret123', + 'aws_session_token': 'session_token_123', + } + ) + + # Non-credentials config is written to the config file + self.writer.update_config.assert_called_with( + {'region': 'us-west-2', 'output': 'json'}, 'myconfigfile' + ) + + def test_regular_credentials_no_session_token_prompt(self): + # When user enters regular credentials (not starting with ASIA), + # should NOT prompt for session token + responses = { + "AWS Access Key ID": "AKIAREGULAR123456", + "AWS Secret Access Key": "secret123", + "Default region name": "us-west-2", + "Default output format": "json", + } + prompter = KeyValuePrompter(responses) + self.configure = configure.ConfigureCommand( + self.session, prompter=prompter, config_writer=self.writer + ) + self.configure(args=[], parsed_globals=self.global_args) + + # Should only write access key and secret key (no session token) + self.assert_credentials_file_updated_with( + { + 'aws_access_key_id': 'AKIAREGULAR123456', + 'aws_secret_access_key': 'secret123', + } + ) + + def test_iam_user_credentials_remove_session_token(self): + # When configuring IAM user credentials (AKIA), existing session token should be removed + session = FakeSession({'config_file': 'myconfigfile'}) + session.config = {'aws_session_token': 'existing_token'} + responses = { + "AWS Access Key ID": "AKIAUSER123456789", + "AWS Secret Access Key": "secret123", + "Default region name": "us-west-2", + "Default output format": "json", + } + prompter = KeyValuePrompter(responses) + self.configure = configure.ConfigureCommand( + session, prompter=prompter, config_writer=self.writer + ) + self.configure(args=[], parsed_globals=self.global_args) + + # Should write credentials and remove session token (set to None) + self.assert_credentials_file_updated_with( + { + 'aws_access_key_id': 'AKIAUSER123456789', + 'aws_secret_access_key': 'secret123', + 'aws_session_token': None, + } + ) + class TestInteractivePrompter(unittest.TestCase): def setUp(self): @@ -219,6 +297,18 @@ def test_non_secret_keys_are_not_masked(self): self.assertIn('mycurrentvalue', prompt_text) self.assertRegex(prompt_text, r'\[mycurrentvalue\]') + def test_session_token_is_masked(self): + prompter = configure.InteractivePrompter() + prompter.get_value( + current_value='mysessiontoken123', + config_name='aws_session_token', + prompt_text='Session Token', + ) + # Session token should be masked like other credentials + prompt_text = self.stdout.getvalue() + self.assertNotIn('mysessiontoken123', prompt_text) + self.assertRegex(prompt_text, r'\[\*\*\*\*.*\]') + def test_user_hits_enter_returns_none(self): # If a user hits enter, then raw_input returns the empty string. self.mock_raw_input.return_value = '' diff --git a/tests/unit/customizations/configure/test_mfalogin.py b/tests/unit/customizations/configure/test_mfalogin.py new file mode 100644 index 000000000000..1c6ab7ba9b18 --- /dev/null +++ b/tests/unit/customizations/configure/test_mfalogin.py @@ -0,0 +1,578 @@ +# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import datetime +import os +from unittest import mock + +import botocore.session +from botocore.exceptions import ClientError, ProfileNotFound + +from awscli.customizations.configure.mfalogin import ( + ConfigureMFALoginCommand, + InteractiveMFAPrompter, +) +from awscli.testutils import unittest + + +class TestInteractiveMFAPrompter(unittest.TestCase): + def test_get_value_with_response(self): + prompter = InteractiveMFAPrompter() + # Mock the entire compat_input function, not just the return value + with mock.patch( + 'awscli.customizations.configure.mfalogin.compat_input' + ) as mock_input: + mock_input.return_value = 'response' + self.assertEqual( + prompter.get_value('current', 'prompt'), 'response' + ) + + def test_get_value_with_no_response(self): + prompter = InteractiveMFAPrompter() + # Mock the entire compat_input function, not just the return value + with mock.patch( + 'awscli.customizations.configure.mfalogin.compat_input' + ) as mock_input: + mock_input.return_value = '' + self.assertEqual( + prompter.get_value('current', 'prompt'), 'current' + ) + + +class TestConfigureMFALoginCommand(unittest.TestCase): + def setUp(self): + self.session = mock.Mock() + self.session.get_scoped_config.return_value = {} + self.session.get_credentials.return_value = mock.Mock() + # Add available_profiles to the session mock + self.session.available_profiles = ['default', 'test'] + self.prompter = mock.Mock() + self.config_writer = mock.Mock() + self.command = ConfigureMFALoginCommand( + self.session, + prompter=self.prompter, + config_writer=self.config_writer, + ) + self.parsed_args = mock.Mock() + self.parsed_args.profile = None + self.parsed_args.update_profile = None + self.parsed_args.duration_seconds = None + self.parsed_args.serial_number = None + self.parsed_globals = mock.Mock() + # Set profile in parsed_globals + self.parsed_globals.profile = 'default' + + def test_no_credentials_found(self): + # Setup mock responses for interactive prompting + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key + 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key + 'arn:aws:iam::123456789012:mfa/user', # MFA serial + '123456', # MFA token + ] + self.prompter.get_value.return_value = 'session-test' # profile name + + # Set session to return None credentials to trigger interactive prompting + self.session.get_credentials.return_value = None + + # Mock STS for the interactive prompting path + sts_client = mock.Mock() + sts_client.get_session_token.return_value = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'SESSION_TOKEN', + 'Expiration': datetime.datetime(2023, 5, 19, 18, 6, 10), + } + } + + with mock.patch('botocore.session.Session') as mock_session_class: + mock_session_instance = mock.Mock() + mock_session_instance.create_client.return_value = sts_client + mock_session_class.return_value = mock_session_instance + + with mock.patch( + 'sys.stdin.isatty', return_value=False + ): # Non-interactive + with mock.patch('os.path.expanduser', return_value='/tmp/credentials'): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + self.assertEqual(rc, 0) # Should succeed via interactive prompting + + def test_profile_not_found(self): + # Set profile to a non-existent profile + self.parsed_globals.profile = 'nonexistent' + + # Mock the session to have no credentials for the nonexistent profile + self.session.get_credentials.return_value = None + self.session.get_scoped_config.return_value = {} + + # Setup mock responses for interactive prompting since no credentials found + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key + 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key + 'arn:aws:iam::123456789012:mfa/user', # MFA serial + '123456', # MFA token + ] + self.prompter.get_value.return_value = 'session-test' # profile name + + # Mock STS for the interactive prompting path + sts_client = mock.Mock() + sts_client.get_session_token.return_value = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'SESSION_TOKEN', + 'Expiration': datetime.datetime(2023, 5, 19, 18, 6, 10), + } + } + + with mock.patch('botocore.session.Session') as mock_session_class: + mock_session_instance = mock.Mock() + mock_session_instance.create_client.return_value = sts_client + mock_session_class.return_value = mock_session_instance + + with mock.patch('os.path.expanduser', return_value='/tmp/credentials'): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + # Should succeed via interactive prompting + self.assertEqual(rc, 0) + + def test_no_mfa_serial_provided(self): + # Mock botocore.session.Session + mock_session = mock.Mock() + mock_session.get_credentials.return_value = mock.Mock() + mock_session.get_scoped_config.return_value = {} + mock_session.available_profiles = ['default'] + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch('sys.stdin.isatty', return_value=True): + self.prompter.get_credential_value.return_value = None + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "MFA serial number or MFA device ARN is required\n" + ) + + def test_no_token_code_provided(self): + # Mock botocore.session.Session + mock_session = mock.Mock() + mock_session.get_credentials.return_value = mock.Mock() + mock_session.get_scoped_config.return_value = {} + mock_session.available_profiles = ['default'] + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch('sys.stdin.isatty', return_value=True): + self.prompter.get_credential_value.side_effect = [ + 'arn:aws:iam::123456789012:mfa/user', + None, + ] + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "MFA token code is required\n" + ) + + def test_sts_client_error(self): + self.session.get_scoped_config.return_value = {} + + sts_client = mock.Mock() + sts_client.get_session_token.side_effect = ClientError( + { + 'Error': { + 'Code': 'InvalidClientTokenId', + 'Message': 'Test error', + } + }, + 'GetSessionToken', + ) + self.session.create_client.return_value = sts_client + + self.prompter.get_credential_value.side_effect = [ + 'arn:aws:iam::123456789012:mfa/user', + '123456', + ] + self.prompter.get_value.return_value = 'session-test' + + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + mock.ANY + ) # Just check it was called + + def test_successful_mfa_login(self): + # Setup + self.parsed_args.duration_seconds = 43200 + self.prompter.get_credential_value.side_effect = [ + 'arn:aws:iam::123456789012:mfa/user', + '123456', + ] + self.prompter.get_value.return_value = 'session-test' + + expiration = datetime.datetime(2023, 5, 19, 18, 6, 10) + sts_response = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE', + 'Expiration': expiration, + } + } + + sts_client = mock.Mock() + sts_client.get_session_token.return_value = sts_response + self.session.create_client.return_value = sts_client + + with mock.patch('sys.stdin.isatty', return_value=True): + with mock.patch( + 'os.path.expanduser', return_value='/tmp/credentials' + ): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + # Verify + self.assertEqual(rc, 0) + + # Check STS was called correctly + sts_client.get_session_token.assert_called_with( + DurationSeconds=43200, + SerialNumber='arn:aws:iam::123456789012:mfa/user', + TokenCode='123456', + ) + + # Check config writer was called correctly + expected_values = { + '__section__': 'session-test', + 'aws_access_key_id': 'ASIAIOSFODNN7EXAMPLE', + 'aws_secret_access_key': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'aws_session_token': 'AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE', + } + + self.config_writer.update_config.assert_called_with( + expected_values, '/tmp/credentials' + ) + + def test_serial_number_from_parameter(self): + # Setup - use serial number from parameter + self.parsed_args.serial_number = ( + 'arn:aws:iam::123456789012:mfa/user-param' + ) + self.parsed_args.duration_seconds = 43200 + + self.session.get_scoped_config.return_value = { + 'mfa_serial': 'arn:aws:iam::123456789012:mfa/user-config' + } + + sts_client = mock.Mock() + expiration = datetime.datetime(2023, 5, 19, 18, 6, 10) + sts_response = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'SESSION_TOKEN', + 'Expiration': expiration, + } + } + sts_client.get_session_token.return_value = sts_response + self.session.create_client.return_value = sts_client + + with mock.patch('sys.stdin.isatty', return_value=True): + self.prompter.get_credential_value.return_value = '123456' + self.prompter.get_value.return_value = 'session-test' + with mock.patch( + 'os.path.expanduser', return_value='/tmp/credentials' + ): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + # Verify + self.assertEqual(rc, 0) + + # Check that the parameter value was used instead of the config value + sts_client.get_session_token.assert_called_with( + DurationSeconds=43200, + SerialNumber='arn:aws:iam::123456789012:mfa/user-param', + TokenCode='123456', + ) + + def test_missing_default_profile_interactive(self): + """Test prompting for credentials when no default profile exists in interactive mode.""" + self.parsed_globals.profile = None # Use default profile + + # Set session to return None credentials to trigger interactive prompting + self.session.get_credentials.return_value = None + + # Mock sys.stdin.isatty to return True (interactive) + with mock.patch('sys.stdin.isatty', return_value=True): + # Mock the _handle_interactive_prompting method + with mock.patch.object( + self.command, + '_handle_interactive_prompting', + return_value=0, + ) as mock_handle: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + self.assertEqual(rc, 0) + mock_handle.assert_called_once_with( + self.parsed_args, None + ) + + def test_missing_default_profile_non_interactive(self): + """Test error when no default profile exists in non-interactive mode.""" + self.parsed_globals.profile = None # Use default profile + + # Setup mock responses for interactive prompting + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key + 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key + 'arn:aws:iam::123456789012:mfa/user', # MFA serial + '123456', # MFA token + ] + self.prompter.get_value.return_value = 'session-test' # profile name + + # Set session to return None credentials to trigger interactive prompting + self.session.get_credentials.return_value = None + + # Mock STS for the interactive prompting path + sts_client = mock.Mock() + sts_client.get_session_token.return_value = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'SESSION_TOKEN', + 'Expiration': datetime.datetime(2023, 5, 19, 18, 6, 10), + } + } + + with mock.patch('botocore.session.Session') as mock_session_class: + mock_session_instance = mock.Mock() + mock_session_instance.create_client.return_value = sts_client + mock_session_class.return_value = mock_session_instance + + with mock.patch('sys.stdin.isatty', return_value=False): + with mock.patch('os.path.expanduser', return_value='/tmp/credentials'): + with mock.patch('sys.stdout'): + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + self.assertEqual(rc, 0) # Should succeed via interactive prompting + + def test_handle_missing_default_profile_success(self): + """Test successful credential prompting and MFA login when no default profile exists.""" + # Setup mock responses for prompting - now all via get_credential_value + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key + 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key + 'arn:aws:iam::123456789012:mfa/user', # MFA serial + '123456', # MFA token + ] + self.prompter.get_value.return_value = 'session-test' # profile name + + # Mock STS response + expiration = datetime.datetime(2023, 5, 19, 18, 6, 10) + sts_response = { + 'Credentials': { + 'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE', + 'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY', + 'SessionToken': 'SESSION_TOKEN', + 'Expiration': expiration, + } + } + + # Mock botocore session and STS client + mock_session = mock.Mock() + sts_client = mock.Mock() + sts_client.get_session_token.return_value = sts_response + mock_session.create_client.return_value = sts_client + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch( + 'os.path.expanduser', return_value='/tmp/credentials' + ): + with mock.patch('sys.stdout'): + rc = self.command._handle_interactive_prompting( + self.parsed_args, 43200 + ) + + # Verify success + self.assertEqual(rc, 0) + + # Verify STS call + sts_client.get_session_token.assert_called_with( + DurationSeconds=43200, + SerialNumber='arn:aws:iam::123456789012:mfa/user', + TokenCode='123456', + ) + + # Verify only the session profile was written + self.assertEqual(self.config_writer.update_config.call_count, 1) + + def test_handle_missing_default_profile_missing_access_key(self): + """Test error when access key is not provided.""" + self.prompter.get_credential_value.return_value = ( + None # No access key provided + ) + + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._handle_interactive_prompting( + self.parsed_args, 43200 + ) + + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "AWS Access Key ID is required\n" + ) + + def test_handle_missing_default_profile_missing_secret_key(self): + """Test error when secret key is not provided.""" + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key provided + None, # secret key not provided + ] + + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._handle_interactive_prompting( + self.parsed_args, 43200 + ) + + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "AWS Secret Access Key is required\n" + ) + + def test_credential_value_prompting_clean_display(self): + """Test that credential prompting doesn't show default values.""" + prompter = InteractiveMFAPrompter() + with mock.patch( + 'awscli.customizations.configure.mfalogin.compat_input' + ) as mock_input: + mock_input.return_value = 'test-value' + result = prompter.get_credential_value( + 'None', 'aws_access_key_id', 'AWS Access Key ID' + ) + + # Verify the prompt doesn't show [None] or any default value + mock_input.assert_called_with('AWS Access Key ID: ') + self.assertEqual(result, 'test-value') + + def test_handle_missing_default_profile_sts_error(self): + """Test STS error handling in missing default profile scenario.""" + self.prompter.get_credential_value.side_effect = [ + 'AKIAIOSFODNN7EXAMPLE', # access key + 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', # secret key + 'arn:aws:iam::123456789012:mfa/user', # MFA serial + '123456', # MFA token + ] + self.prompter.get_value.return_value = 'session-test' # profile name + + # Mock STS client to raise an error + mock_session = mock.Mock() + sts_client = mock.Mock() + sts_client.get_session_token.side_effect = ClientError( + { + 'Error': { + 'Code': 'InvalidClientTokenId', + 'Message': 'Invalid credentials', + } + }, + 'GetSessionToken', + ) + mock_session.create_client.return_value = sts_client + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._handle_interactive_prompting( + self.parsed_args, 43200 + ) + + self.assertEqual(rc, 1) + # Verify error message was written + mock_stderr.write.assert_called() + self.assertIn( + 'An error occurred', str(mock_stderr.write.call_args) + ) + + def test_non_interactive_missing_mfa_serial(self): + """Test non-interactive mode when MFA serial is missing.""" + mock_session = mock.Mock() + mock_session.get_credentials.return_value = mock.Mock() + mock_session.get_scoped_config.return_value = {} # No mfa_serial in config + mock_session.available_profiles = ['default'] + + with mock.patch('botocore.session.Session', return_value=mock_session): + with mock.patch( + 'sys.stdin.isatty', return_value=False + ): # Non-interactive + self.prompter.get_credential_value.return_value = None + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "MFA serial number or MFA device ARN is required\n" + ) + + def test_non_interactive_missing_token_code(self): + """Test non-interactive mode when token code would be prompted.""" + self.session.get_scoped_config.return_value = { + 'mfa_serial': 'arn:aws:iam::123456789012:mfa/user' + } + + with mock.patch( + 'sys.stdin.isatty', return_value=False + ): # Non-interactive + self.prompter.get_credential_value.return_value = None + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._run_main( + self.parsed_args, self.parsed_globals + ) + + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "MFA token code is required\n" + ) + + def test_empty_credential_input_handling(self): + """Test handling of empty credential inputs.""" + self.prompter.get_credential_value.return_value = '' # Empty string + + with mock.patch('sys.stderr') as mock_stderr: + rc = self.command._handle_interactive_prompting( + self.parsed_args, 43200 + ) + + self.assertEqual(rc, 1) + mock_stderr.write.assert_called_with( + "AWS Access Key ID is required\n" + )