-
Notifications
You must be signed in to change notification settings - Fork 6
[Maintenance] Refactored scopes and auth #66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@@ -73,18 +78,22 @@ def to_dict(self): | |||
class Cred: | |||
|
|||
creds: Optional[CRED_TYPES] | |||
session: Optional[Credentials] | |||
|
|||
def session(self, scopes: Optional[list[str]] = None) -> Optional[Credentials]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to get a session, just call this method, with optional scopes
self.custom_config = yaml.safe_load(custom_path.read_text()) if custom_path.exists() else {} | ||
|
||
@property | ||
def merged(self) -> dict: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when getting items, use this property
when setting, set directly to custom_config
- these are all saved on exit
@@ -41,6 +41,7 @@ def __call__(self, | |||
raise argparse.ArgumentError(self, f'invalid filter argument "{value}", expected "key=value"') | |||
setattr(namespace, self.dest, {key: val}) | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This command is failing, but I think unrelated?
@@ -25,7 +26,8 @@ class Emulation(BaseEmulation): | |||
def __init__(self, **kwargs) -> None: | |||
super().__init__(**kwargs) | |||
self.folder_id = self.args.folder_id | |||
self.service = build('drive', 'v3', credentials=self.obj.cred_store.store[self.args.session_key].session) | |||
creds = self.obj.cred_store.get('default', validate_type='oauth') | |||
self.service = build('drive', 'v3', credentials=creds.session()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
creds.session()!
def main(): | ||
parser = argparse.ArgumentParser(description='SWAT CLI') | ||
parser.add_argument('--debug', action='store_true', help='Debug mode') | ||
parser.add_argument('--custom-config', type=Path, default=DEFAULT_CUSTOM_CONFIG_PATH, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default custom but optional param
@@ -27,6 +34,7 @@ def main(): | |||
finally: | |||
if shell.save_on_exit: | |||
shell.obj.cred_store.save() | |||
shell.obj.config.save_custom() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
saved on exit
329d857
to
559fe5c
Compare
gets error
Looks like |
class OAuthCreds(BaseCreds):
"""Data class for OAuth2.0 application credentials."""
auth_provider_x509_cert_url: str
auth_uri: str
client_id: str
client_secret: str
project_id: str
redirect_uris: list[str]
token_uri: str
refresh_token: str = None |
|
||
def session(self, scopes: Optional[list[str]] = None) -> Optional[Credentials]: | ||
if isinstance(self.creds, OAuthCreds): | ||
session = Credentials.from_authorized_user_info(self.creds.to_dict(), scopes=scopes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
session = Credentials.from_authorized_user_info(self.creds.to_dict(), scopes=scopes) | |
session = Credentials.from_authorized_user_info(self.creds.to_dict()['installed'], scopes=scopes) |
if isinstance(self.creds, OAuthCreds): | ||
session = Credentials.from_authorized_user_info(self.creds.to_dict(), scopes=scopes) | ||
else: | ||
session = ServiceCredentials.from_service_account_info(self.creds.to_dict(), scopes=scopes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
session = ServiceCredentials.from_service_account_info(self.creds.to_dict(), scopes=scopes) | |
session = ServiceCredentials.from_service_account_info(self.creds.to_dict()['installed'], scopes=scopes) |
Some additional thoughts here:
|
The commands for scopes and auth have been removed.
Cred
, simply by usingCred.session()
with scopes as an optional parameter