-
Notifications
You must be signed in to change notification settings - Fork 1
Creates stateless version of OAuth 2 functions in generic case #34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| from pardner.stateless.base import Scope as Scope |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| from typing import Any, Optional, TypeAlias | ||
|
|
||
| from requests_oauthlib import OAuth2Session | ||
|
|
||
| Scope: TypeAlias = str | set[object] | tuple[object] | list[object] | ||
|
|
||
|
|
||
| def generic_construct_authorization_url( | ||
| authorization_url_endpoint: str, client_id: str, redirect_uri: str, scope: Scope | ||
| ) -> tuple[str, str]: | ||
| """ | ||
| Builds the authorization URL and state. Once the end-user (i.e., resource owner) | ||
| navigates to the authorization URL they can begin the authorization flow. | ||
|
|
||
| :param authorization_url_endpoint: The service's endpoint that must be hit to begin | ||
| the OAuth 2 flow. | ||
| :param client_id: Client identifier given by the OAuth provider upon registration. | ||
| :param redirect_uri: The registered callback URI. | ||
| :param scope: The scope of the access request. These may be any string but are | ||
| commonly URIs or various categories such as ``videos`` or ``documents``. | ||
|
|
||
| :returns: the authorization URL and state, respectively. | ||
| """ | ||
| oAuth2Session = OAuth2Session( | ||
| client_id=client_id, redirect_uri=redirect_uri, scope=scope | ||
| ) | ||
| return oAuth2Session.authorization_url(authorization_url_endpoint) | ||
|
|
||
|
|
||
| def generic_fetch_token( | ||
| client_id: str, | ||
| redirect_uri: str, | ||
| scope: Scope, | ||
| token_url: str, | ||
| authorization_response: Optional[str] = None, | ||
| client_secret: Optional[str] = None, | ||
| code: Optional[str] = None, | ||
| include_client_id: Optional[bool] = None, | ||
| ) -> dict[str, Any]: | ||
| """ | ||
| Once the end-user authorizes the application to access their data, the | ||
| resource server sends a request to `redirect_uri` with the authorization code as | ||
| a parameter. Using this authorization code, this method makes a request to the | ||
| resource server to obtain the access token. | ||
|
|
||
| One of either `code` or `authorization_response` must not be None. | ||
|
|
||
| :param client_id: Client identifier given by the OAuth provider upon registration. | ||
| :param redirect_uri: The registered callback URI. | ||
| :param scope: The scope of the access request. These may be any string but are | ||
| commonly URIs or various categories such as ``videos`` or ``documents``. | ||
| :param token_url: Token endpoint HTTPS URL. | ||
| :param authorization_response: the URL (with parameters) the end-user's browser | ||
| redirected to after authorization. | ||
| :param client_secret: The `client_secret` paired to the `client_id`. | ||
| :param code: Authorization code (used by WebApplicationClients). | ||
| :param include_client_id: Should the request body include the | ||
| `client_id` parameter. | ||
|
|
||
| :returns: the authorization URL and state, respectively. | ||
| """ | ||
| oAuth2Session = OAuth2Session( | ||
| client_id=client_id, redirect_uri=redirect_uri, scope=scope | ||
| ) | ||
| return oAuth2Session.fetch_token( | ||
| token_url=token_url, | ||
| code=code, | ||
| authorization_response=authorization_response, | ||
| include_client_id=include_client_id, | ||
| client_secret=client_secret, | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| from typing import Any, Optional | ||
|
|
||
| from oauthlib.oauth2.rfc6749.utils import scope_to_list | ||
|
|
||
| from pardner.stateless import Scope | ||
|
|
||
|
|
||
| def scope_to_set(scope: Any) -> set[str]: | ||
| """ | ||
| Splits `scope` into each individual scope and puts the values in a set of strings. | ||
| Leverages OAuthlib library helpers. | ||
|
|
||
| :param scope: the string or sequence/iterable of objects that will be converted to | ||
| a set of strings. | ||
|
|
||
| :returns: a set of strings where each string is an individual scope. | ||
| """ | ||
| return set(scope_to_list(scope)) if scope else set() | ||
|
|
||
|
|
||
| def has_sufficient_scope( | ||
| old_scope: Optional[Scope], new_scope: Optional[Scope] | ||
| ) -> bool: | ||
| """ | ||
| Given an old scope and a new scope, determines if the new scope has scopes that are | ||
| not in the original `old_scope`. | ||
|
|
||
| :param old_scope: zero or more scopes. | ||
| :param new_scope: zero or more scopes. | ||
|
|
||
| :returns: True if `new_scope` has no new scopes and False otherwise. | ||
| """ | ||
| old_scope_set = scope_to_set(old_scope) | ||
| new_scope_set = scope_to_set(new_scope) | ||
| return new_scope_set.issubset(old_scope_set) |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| from typing import Any | ||
| from urllib import parse | ||
|
|
||
| import pytest | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_outbound_requests(mocker): | ||
| mock_oauth2session_request = mocker.patch('requests_oauthlib.OAuth2Session.request') | ||
| mock_client_parse_request_body_response = mocker.patch( | ||
| 'oauthlib.oauth2.rfc6749.clients.WebApplicationClient.parse_request_body_response' | ||
| ) | ||
| return (mock_oauth2session_request, mock_client_parse_request_body_response) | ||
|
|
||
|
|
||
| def get_url_params(url: str) -> dict[str, Any]: | ||
| url_query = parse.urlsplit(url).query | ||
| return dict(parse.parse_qsl(url_query)) |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| import pytest | ||
|
|
||
| from pardner.stateless.base import ( | ||
| generic_construct_authorization_url, | ||
| generic_fetch_token, | ||
| ) | ||
| from tests.conftest import get_url_params | ||
|
|
||
|
|
||
| def test_generic_construct_authorization_url(): | ||
| auth_url, state = generic_construct_authorization_url( | ||
| 'https://authorize.com', | ||
| 'fake_client_id', | ||
| 'https://redirect_uri', | ||
| {'fake', 'scope'}, | ||
| ) | ||
| assert auth_url.startswith('https://authorize.com') | ||
|
|
||
| auth_url_params = get_url_params(auth_url) | ||
|
|
||
| assert 'client_id' in auth_url_params | ||
| assert auth_url_params['client_id'] == 'fake_client_id' | ||
| assert 'redirect_uri' in auth_url_params | ||
| assert auth_url_params['redirect_uri'] == 'https://redirect_uri' | ||
| assert 'state' in auth_url_params | ||
| assert auth_url_params['state'] == state | ||
| assert 'scope' in auth_url_params | ||
| assert 'fake' in auth_url_params['scope'] | ||
| assert 'scope' in auth_url_params['scope'] | ||
|
|
||
|
|
||
| def test_generic_fetch_token_raises_error(): | ||
| with pytest.raises(ValueError): | ||
| generic_fetch_token( | ||
| 'fake_client_id', | ||
| 'https://redirect_uri', | ||
| {'fake', 'scope'}, | ||
| 'https://token_url.com', | ||
| authorization_response=None, | ||
| client_secret='fake client secret', | ||
| code=None, | ||
| ) | ||
|
|
||
|
|
||
| def test_generic_fetch_token_with_code(mock_outbound_requests): | ||
| mock_oauth2session_request, mock_client_parse_request_body_response = ( | ||
| mock_outbound_requests | ||
| ) | ||
| generic_fetch_token( | ||
| 'fake_client_id', | ||
| 'https://redirect_uri', | ||
| {'fake', 'scope'}, | ||
| 'https://token_url.com', | ||
| client_secret='fake client secret', | ||
| code='the_best_code', | ||
| ) | ||
| mock_oauth2session_request.assert_called_once() | ||
| mock_client_parse_request_body_response.assert_called_once() | ||
|
|
||
|
|
||
| def test_generic_fetch_token_with_authorization_response(mock_outbound_requests): | ||
| mock_oauth2session_request, mock_client_parse_request_body_response = ( | ||
| mock_outbound_requests | ||
| ) | ||
| generic_fetch_token( | ||
| 'fake_client_id', | ||
| 'https://redirect_uri', | ||
| {'fake', 'scope'}, | ||
| 'https://token_url.com', | ||
| authorization_response='https://redirect/?code=the_best_code', | ||
| client_secret='fake client secret', | ||
| ) | ||
| mock_oauth2session_request.assert_called_once() | ||
| mock_client_parse_request_body_response.assert_called_once() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| import pytest | ||
|
|
||
| from pardner.stateless.utils import has_sufficient_scope, scope_to_set | ||
|
|
||
| expected_set = {'scope1', 'scope2', 'scope3'} | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| ['scopes', 'expected'], | ||
| [ | ||
| ('scope1 scope2 scope3', expected_set), | ||
| (['scope1', 'scope2', 'scope3'], expected_set), | ||
| (['scope2', 'scope1', 'scope3'], expected_set), | ||
| (('scope1', 'scope2', 'scope3'), expected_set), | ||
| (['scope1', 'scope2', 'scope3'], expected_set), | ||
| ({'scope1', 'scope2', 'scope3'}, expected_set), | ||
| ], | ||
| ) | ||
| def test_scope_to_set(scopes, expected): | ||
| assert scope_to_set(scopes) == expected | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| ['old_scope', 'new_scope', 'expected'], | ||
| [ | ||
| ('scope1 scope2 scope3', 'scope1 scope2 scope3', True), | ||
| ('scope1 scope2 scope3', ['scope1', 'scope2'], True), | ||
| ({'scope1', 'scope2'}, 'scope1 scope2 scope3', False), | ||
| ('', 'scope1 scope2 scope3', False), | ||
| ], | ||
| ) | ||
| def test_has_sufficient_scope(old_scope, new_scope, expected): | ||
| assert has_sufficient_scope(old_scope, new_scope) == expected |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no reason that the stateless methods need to be functional instead of object-oriented just because they're stateless, but this is fine of course!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For these to be used from outside the library when the library is packaged and installed via something like setup, these methods will need to be exported in an init.py in the main directory of the library, but that can be added.