Skip to content

Add MFA support for azd #42488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def close(self) -> None:
def get_token(
self,
*scopes: str,
claims: Optional[str] = None, # pylint:disable=unused-argument
claims: Optional[str] = None,
tenant_id: Optional[str] = None,
**kwargs: Any,
) -> AccessToken:
Expand All @@ -111,7 +111,8 @@ def get_token(
:param str scopes: desired scope for the access token. This credential allows only one scope per request.
For more information about scopes, see
https://learn.microsoft.com/entra/identity-platform/scopes-oidc.
:keyword str claims: not used by this credential; any value provided will be ignored.
:keyword str claims: additional claims required in the token, such as those returned in a resource provider's
claims challenge following an authorization failure.
:keyword str tenant_id: optional tenant to include in the token request.

:return: An access token with the desired scopes.
Expand All @@ -125,6 +126,8 @@ def get_token(
options: TokenRequestOptions = {}
if tenant_id:
options["tenant_id"] = tenant_id
if claims:
options["claims"] = claims

token_info = self._get_token_base(*scopes, options=options, **kwargs)
return AccessToken(token_info.token, token_info.expires_on)
Expand Down Expand Up @@ -159,6 +162,7 @@ def _get_token_base(
raise ValueError("Missing scope in request. \n")

tenant_id = options.get("tenant_id") if options else None
claims = options.get("claims") if options else None
if tenant_id:
validate_tenant_id(tenant_id)
for scope in scopes:
Expand All @@ -175,16 +179,23 @@ def _get_token_base(
)
if tenant:
command_args += ["--tenant-id", tenant]
if claims:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If claims are populated, shouldn't we raise immediately with the error message containing the command to run to login again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The azd implementation differs from the CLI/PSH by not using "azd auth login --claims."

Instead, users must first run "azd auth token --claims", which stores the claims.

Then, users can execute "azd auth login" without the "--claims" flag, as azd retrieves the claims from the cached data created in the previous step.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - but shouldn't we raise immediately when we have claims to tell the user to run those commands to proceed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my observations, the command 'azd auth token --claims' may return a valid token, such as when the device is enrolled. To ensure robust code, let's handle both scenarios.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we know if it is a valid token? Once we leave the credential and try to use the token in a policy, where would we throw with the azd specific error message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we execute azd auth token --claims and receive a token, it is considered valid.

If token retrieval fails, an error is returned in JSON format from azd.

command_args += ["--claims", claims]
output = _run_command(command_args, self._process_timeout)

token = parse_token(output)
if not token:
sanitized_output = sanitize_output(output)
message = (
f"Unexpected output from Azure Developer CLI: '{sanitized_output}'. \n"
f"To mitigate this issue, please refer to the troubleshooting guidelines here at "
f"https://aka.ms/azsdk/python/identity/azdevclicredential/troubleshoot."
)
# Try to extract a meaningful error from azd consoleMessage JSON lines
extracted = extract_cli_error_message(output)
if extracted:
message = extracted
else:
sanitized_output = sanitize_output(output)
message = (
f"Unexpected output from Azure Developer CLI: '{sanitized_output}'. \n"
f"To mitigate this issue, please refer to the troubleshooting guidelines here at "
f"https://aka.ms/azsdk/python/identity/azdevclicredential/troubleshoot."
)
if within_dac.get():
raise CredentialUnavailableError(message=message)
raise ClientAuthenticationError(message=message)
Expand Down Expand Up @@ -241,6 +252,54 @@ def sanitize_output(output: str) -> str:
return re.sub(r"\"token\": \"(.*?)(\"|$)", "****", output)


def extract_cli_error_message(output: str) -> Optional[str]:
"""
Extract a single, user-friendly message from azd consoleMessage JSON output.

:param str output: The output from the Azure Developer CLI command.
:return: A user-friendly error message if found, otherwise None.
:rtype: Optional[str]

Preference order:
1) A message containing "Suggestion" (case-insensitive)
2) The second message if multiple are present
3) The first message if only one exists
Returns None if no messages can be parsed.
"""
messages: List[str] = []
for line in output.splitlines():
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError: # not JSON -> ignore
continue
if isinstance(obj, dict):
data = obj.get("data")
if isinstance(data, dict):
msg = data.get("message")
if isinstance(msg, str) and msg.strip():
messages.append(msg.strip())
continue
msg = obj.get("message")
if isinstance(msg, str) and msg.strip():
messages.append(msg.strip())

if not messages:
return None

# Prefer the suggestion line if present
for msg in messages:
if "suggestion" in msg.lower():
return sanitize_output(msg)

# Otherwise, return the second message when available, else the first
if len(messages) >= 2:
return sanitize_output(messages[1])
return sanitize_output(messages[0])


def _run_command(command_args: List[str], timeout: int) -> str:
# Ensure executable exists in PATH first. This avoids a subprocess call that would fail anyway.
azd_path = shutil.which(EXECUTABLE_NAME)
Expand All @@ -267,16 +326,18 @@ def _run_command(command_args: List[str], timeout: int) -> str:
# Fallback check in case the executable is not found while executing subprocess.
if ex.returncode == 127 or (ex.stderr is not None and ex.stderr.startswith("'azd' is not recognized")):
raise CredentialUnavailableError(message=CLI_NOT_FOUND) from ex
if ex.stderr is not None and (
"not logged in, run `azd auth login` to login" in ex.stderr and "AADSTS" not in ex.stderr
combined_text = "{}\n{}".format(ex.output or "", ex.stderr or "")
if combined_text and (
"not logged in, run `azd auth login` to login" in combined_text and "AADSTS" not in combined_text
):
raise CredentialUnavailableError(message=NOT_LOGGED_IN) from ex

# return code is from the CLI -> propagate its output
if ex.stderr:
message = sanitize_output(ex.stderr)
else:
message = "Failed to invoke Azure Developer CLI"
message = (
extract_cli_error_message(ex.output or "")
or extract_cli_error_message(ex.stderr or "")
or (sanitize_output(ex.stderr) if ex.stderr else "Failed to invoke Azure Developer CLI")
)
if within_dac.get():
raise CredentialUnavailableError(message=message) from ex
raise ClientAuthenticationError(message=message) from ex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
NOT_LOGGED_IN,
parse_token,
sanitize_output,
extract_cli_error_message,
)
from ..._internal import resolve_tenant, within_dac, validate_tenant_id, validate_scope

Expand Down Expand Up @@ -87,7 +88,7 @@ def __init__(
async def get_token(
self,
*scopes: str,
claims: Optional[str] = None, # pylint:disable=unused-argument
claims: Optional[str] = None,
tenant_id: Optional[str] = None,
**kwargs: Any,
) -> AccessToken:
Expand All @@ -99,7 +100,8 @@ async def get_token(
:param str scopes: desired scope for the access token. This credential allows only one scope per request.
For more information about scopes, see
https://learn.microsoft.com/entra/identity-platform/scopes-oidc.
:keyword str claims: not used by this credential; any value provided will be ignored.
:keyword str claims: additional claims required in the token, such as those returned in a resource provider's
claims challenge following an authorization failure.
:keyword str tenant_id: optional tenant to include in the token request.

:return: An access token with the desired scopes.
Expand All @@ -110,11 +112,13 @@ async def get_token(
"""
# only ProactorEventLoop supports subprocesses on Windows (and it isn't the default loop on Python < 3.8)
if sys.platform.startswith("win") and not isinstance(asyncio.get_event_loop(), asyncio.ProactorEventLoop):
return _SyncAzureDeveloperCliCredential().get_token(*scopes, tenant_id=tenant_id, **kwargs)
return _SyncAzureDeveloperCliCredential().get_token(*scopes, claims=claims, tenant_id=tenant_id, **kwargs)

options: TokenRequestOptions = {}
if tenant_id:
options["tenant_id"] = tenant_id
if claims:
options["claims"] = claims

token_info = await self._get_token_base(*scopes, options=options, **kwargs)
return AccessToken(token_info.token, token_info.expires_on)
Expand Down Expand Up @@ -152,6 +156,7 @@ async def _get_token_base(
raise ValueError("Missing scope in request. \n")

tenant_id = options.get("tenant_id") if options else None
claims = options.get("claims") if options else None
if tenant_id:
validate_tenant_id(tenant_id)
for scope in scopes:
Expand All @@ -169,16 +174,22 @@ async def _get_token_base(

if tenant:
command_args += ["--tenant-id", tenant]
if claims:
command_args += ["--claims", claims]
output = await _run_command(command_args, self._process_timeout)

token = parse_token(output)
if not token:
sanitized_output = sanitize_output(output)
message = (
f"Unexpected output from Azure Developer CLI: '{sanitized_output}'. \n"
f"To mitigate this issue, please refer to the troubleshooting guidelines here at "
f"https://aka.ms/azsdk/python/identity/azdevclicredential/troubleshoot."
)
extracted = extract_cli_error_message(output)
if extracted:
message = extracted
else:
sanitized_output = sanitize_output(output)
message = (
f"Unexpected output from Azure Developer CLI: '{sanitized_output}'. \n"
f"To mitigate this issue, please refer to the troubleshooting guidelines here at "
f"https://aka.ms/azsdk/python/identity/azdevclicredential/troubleshoot."
)
if within_dac.get():
raise CredentialUnavailableError(message=message)
raise ClientAuthenticationError(message=message)
Expand Down Expand Up @@ -226,10 +237,15 @@ async def _run_command(command_args: List[str], timeout: int) -> str:
if proc.returncode == 127 or stderr.startswith("'azd' is not recognized"):
raise CredentialUnavailableError(CLI_NOT_FOUND)

if "not logged in, run `azd auth login` to login" in stderr and "AADSTS" not in stderr:
combined_text = f"{output}\n{stderr}"
if "not logged in, run `azd auth login` to login" in combined_text and "AADSTS" not in combined_text:
raise CredentialUnavailableError(message=NOT_LOGGED_IN)

message = sanitize_output(stderr) if stderr else "Failed to invoke Azure Developer CLI"
message = (
extract_cli_error_message(output)
or extract_cli_error_message(stderr)
or (sanitize_output(stderr) if stderr else "Failed to invoke Azure Developer CLI")
)
if within_dac.get():
raise CredentialUnavailableError(message=message)
raise ClientAuthenticationError(message=message)