From 2ddbf39c30a87446a9b47932989b1b3ea824d2ac Mon Sep 17 00:00:00 2001 From: brokensound77 <16747370+brokensound77@users.noreply.github.com> Date: Fri, 11 Aug 2023 20:15:22 -0600 Subject: [PATCH 1/2] Refactored scopes and auth --- .gitignore | 1 + swat/base.py | 79 ++++++++++-------- swat/commands/audit.py | 34 +++++--- swat/commands/auth.py | 80 ------------------- swat/commands/creds.py | 3 +- swat/commands/scopes.py | 52 ------------ swat/emulations/base_emulation.py | 2 +- .../collection/drive_access_private_keys.py | 6 +- swat/main.py | 8 ++ swat/shell.py | 14 ++-- swat/utils.py | 23 +++++- 11 files changed, 109 insertions(+), 193 deletions(-) delete mode 100644 swat/commands/auth.py delete mode 100644 swat/commands/scopes.py diff --git a/.gitignore b/.gitignore index d8ef4f3..e42a051 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,4 @@ logs/* # etc swat/etc/*chrome* +swat/etc/custom_config.yaml diff --git a/swat/base.py b/swat/base.py index 50e1680..4a7e4af 100644 --- a/swat/base.py +++ b/swat/base.py @@ -1,4 +1,5 @@ +import copy import dataclasses import logging import pickle @@ -6,15 +7,19 @@ from pathlib import Path from typing import Optional, Literal, Union -import json from google.auth.transport.requests import Request +from google.oauth2.service_account import Credentials as ServiceCredentials from google.oauth2.credentials import Credentials -from .utils import ROOT_DIR, PathlibEncoder +import json +import yaml + +from .utils import ROOT_DIR, deep_merge DEFAULT_CRED_STORE_FILE = ROOT_DIR / 'swat' / 'etc' / '.cred_store.pkl' DEFAULT_EMULATION_ARTIFACTS_DIR = ROOT_DIR / 'swat' / 'etc' / 'artifacts' +DEFAULT_CUSTOM_CONFIG_PATH = ROOT_DIR / 'swat' / 'etc' / 'custom_config.yaml' @dataclass @@ -73,18 +78,22 @@ def to_dict(self): class Cred: creds: Optional[CRED_TYPES] - session: Optional[Credentials] + + def session(self, scopes: Optional[list[str]] = None) -> Optional[Credentials]: + if isinstance(self.creds, OAuthCreds): + session = Credentials.from_authorized_user_info(str(self.creds.to_dict()), scopes=scopes) + else: + session = ServiceCredentials.from_service_account_info(self.creds.to_dict(), scopes=scopes) + + if session.expired and session.refresh_token: + session.refresh(Request()) + return session @property def client_id(self) -> Optional[str]: if self.creds and hasattr(self.creds, 'client_id'): return self.creds.client_id - def refreshed_session(self) -> Optional[Credentials]: - if self.session and self.session.expired and self.session.refresh_token: - self.session.refresh(Request()) - return self.session - def to_dict(self): return {k: v for k, v in dataclasses.asdict(self).items() if not k.startswith('_')} @@ -100,14 +109,6 @@ def __post_init__(self): if not isinstance(self.path, Path): self.path = Path(self.path) - @property - def has_sessions(self) -> bool: - """Return a boolean indicating if the creds have sessions.""" - for key, cred in self.store.items(): - if cred.session: - return True - return False - @classmethod def from_file(cls, file: Path = DEFAULT_CRED_STORE_FILE) -> Optional['CredStore']: if file.exists(): @@ -118,21 +119,21 @@ def save(self): logging.info(f'Saved cred store to {self.path}') self.path.write_bytes(pickle.dumps(self)) - def add(self, key: str, creds: Optional[CRED_TYPES] = None, session: Optional[Credentials] = None, - override: bool = False, type: Optional[Literal['oauth', 'service']] = None): + def add(self, key: str, creds: Optional[CRED_TYPES] = None, override: bool = False, + cred_type: Optional[Literal["oauth", "service"]] = None): """Add a credential to the store.""" if key in self.store and not override: raise ValueError(f'Value exists for: {key}') - cred = Cred(creds=creds, session=session) + cred = Cred(creds=creds) self.store[key] = cred - logging.info(f'Added {type} cred with key: {key}') + logging.info(f'Added {cred_type} cred with key: {key}') def remove(self, key: str) -> bool: """Remove cred by key and type.""" return self.store.pop(key, None) is not None - def get(self, key: str, validate_type: Optional[Literal['oauth', 'service']] = None, + def get(self, key: str, validate_type: Optional[Literal["oauth", "service"]] = None, missing_error: bool = True) -> Optional[Cred]: value = self.store.get(key) creds = value.creds @@ -156,23 +157,35 @@ def get_by_client_id(self, client_id: str, validate_type: Optional[Literal['oaut def list_credentials(self) -> list[str]: """Get the list of creds from the store.""" - return [f'{k}{f":{v.creds}" if v.creds else ""}' for k, v in self.store.items()] + return [f'{k}:{v.creds.__class__.__name__}:{v.creds.project_id}' for k, v in self.store.items()] + + +class Config: + """Config class for handling config and custom_config.""" + + def __init__(self, path: Path, custom_path: Optional[Path] = DEFAULT_CUSTOM_CONFIG_PATH): + self.path = path + self.custom_path = custom_path + + assert path.exists(), f'Config file not found: {path}' + self.config = yaml.safe_load(path.read_text()) + + self.custom_config = yaml.safe_load(custom_path.read_text()) if custom_path.exists() else {} + + @property + def merged(self) -> dict: + """Safely retrieve a fresh merge of primary and custom configs.""" + # I regret nothing + config = copy.deepcopy(self.config) + return deep_merge(config, self.custom_config) - def list_sessions(self) -> list[str]: - """Get the list of sessions from the store.""" - sessions = [] - for k, v in self.store.items(): - if v.session: - if 'service' in v.session.__module__: - sessions.append(f'{k}:{v.session.__module__}:{v.session.service_account_email}') - else: - sessions.append(f'{k}:{v.session.__module__}:{v.session.client_id}') - return sessions + def save_custom(self): + self.custom_path.write_text(yaml.dump(self.custom_config)) @dataclass class SWAT: """Base object for SWAT.""" - config: dict + config: Config cred_store: CredStore = field(default_factory=lambda: CredStore.from_file() or CredStore()) diff --git a/swat/commands/audit.py b/swat/commands/audit.py index 6ff48cc..9a4eafa 100644 --- a/swat/commands/audit.py +++ b/swat/commands/audit.py @@ -41,6 +41,7 @@ def __call__(self, raise argparse.ArgumentError(self, f'invalid filter argument "{value}", expected "key=value"') setattr(namespace, self.dest, {key: val}) + @dataclass class Filters: """Dataclass representing a set of filters.""" @@ -68,10 +69,13 @@ class Command(BaseCommand): parser = get_custom_argparse_formatter(prog='audit', description='Google Workspace Audit') parser.add_argument('application', help='Application name') parser.add_argument('duration', help='Duration in format Xs, Xm, Xh or Xd.') - parser.add_argument('--columns', nargs='+', help='Columns to keep in the output. If not set, will take columns from config.') + parser.add_argument('--columns', nargs='+', + help='Columns to keep in the output. If not set, will take columns from config.') parser.add_argument('--export', action='store_true', default=False, help='Path to export the data') - parser.add_argument('--export-format', choices=['csv', 'ndjson'], default='csv', help='Export format. Default is csv.') - parser.add_argument('--filters', nargs='*', action=KeyValueAction, dest='filters', default={}, help='Filters to apply on the data') + parser.add_argument('--export-format', choices=['csv', 'ndjson'], default='csv', + help='Export format. Default is csv.') + parser.add_argument('--filters', nargs='*', action=KeyValueAction, dest='filters', default={}, + help='Filters to apply on the data') parser.add_argument('--interactive', action='store_true', help='Interactive mode') def __init__(self, **kwargs) -> None: @@ -89,7 +93,7 @@ def __init__(self, **kwargs) -> None: # Check if the session exists in the credential store if self.obj.cred_store.store.get('default') is None: - self.logger.error(f'Please authenticate with "auth session --default --creds" before running this command.') + self.logger.error(f'Please add "default" creds with "creds add default ..." before running this command.') return try: @@ -99,7 +103,8 @@ def __init__(self, **kwargs) -> None: return try: - self.service = build('admin', 'reports_v1', credentials=self.obj.cred_store.store['default'].session) + creds = self.obj.cred_store.get('default', validate_type='oauth') + self.service = build('admin', 'reports_v1', credentials=creds.session()) except HttpError as err: self.logger.error(f'An error occurred: {err}') return @@ -114,7 +119,6 @@ def __init__(self, **kwargs) -> None: self.args.filters = [f.strip('\'"') for f in self.args.filters] self.filters = Filters(self.args.filters) - def export_data(self, df: pd.DataFrame) -> None: """ Exports the dataframe to a specified format. @@ -135,7 +139,6 @@ def export_data(self, df: pd.DataFrame) -> None: else: self.logger.warning(f'Unsupported export format: {self.args.export_format}. No data was exported.') - def flatten_json(self, y: dict) -> dict: """ Flattens a nested dictionary and returns a new dictionary with @@ -187,7 +190,6 @@ def flatten_activities(self, activities: list) -> pd.DataFrame: flattened_data.append(merged_data) return pd.DataFrame(flattened_data) - def fetch_data(self) -> pd.DataFrame: """ Fetches the activity data from the Google Workspace Audit service, using the provided start time, @@ -227,9 +229,14 @@ def filter_columns(self, df: pd.DataFrame) -> pd.DataFrame: Returns: pd.DataFrame: The filtered dataframe. """ - columns = self.args.columns or self.obj.config['google']['audit']['columns'] + columns = self.args.columns or self.obj.config.merged['google']['audit']['columns'] modified_columns = ['.*' + column + '.*' for column in columns] - df = df[[column for column in df.columns for pattern in modified_columns if re.search(pattern, column, re.IGNORECASE)]] + df = df[ + [ + column for column in df.columns for pattern in modified_columns + if re.search(pattern, column, re.IGNORECASE) + ] + ] return df def interactive_session(self, df: pd.DataFrame, df_unfiltered: pd.DataFrame) -> None: @@ -244,7 +251,8 @@ def interactive_session(self, df: pd.DataFrame, df_unfiltered: pd.DataFrame) -> None """ # Ask the user which columns to display - selected_columns_input = input('Enter the columns to display, separated by commas (see logged available columns): ') + selected_columns_input = input('Enter the columns to display, separated by commas ' + '(see logged available columns): ') selected_columns = [column.strip() for column in selected_columns_input.split(',')] # Keep only the selected columns @@ -267,7 +275,8 @@ def interactive_session(self, df: pd.DataFrame, df_unfiltered: pd.DataFrame) -> except ValueError: self.logger.warning(f'Invalid row number: {row_number}') - def show_results(self, df: pd.DataFrame) -> None: + @staticmethod + def show_results(df: pd.DataFrame) -> None: """ Prints the DataFrame to the console in a markdown table format. @@ -279,7 +288,6 @@ def show_results(self, df: pd.DataFrame) -> None: """ print(Fore.GREEN + df.to_markdown(headers='keys', tablefmt='fancy_grid') + Fore.RESET) - def execute(self) -> None: """ Main execution method of the Command class. diff --git a/swat/commands/auth.py b/swat/commands/auth.py deleted file mode 100644 index 03e0eae..0000000 --- a/swat/commands/auth.py +++ /dev/null @@ -1,80 +0,0 @@ - -from pathlib import Path -from typing import Optional - -from google.oauth2.service_account import Credentials -from google_auth_oauthlib.flow import InstalledAppFlow - -from ..commands.base_command import BaseCommand -from ..commands.creds import OAuthCreds, ServiceAccountCreds -from ..utils import ROOT_DIR, check_file_exists -from ..misc import validate_args - -DEFAULT_TOKEN_FILE = ROOT_DIR / 'token.pkl' - - -class Command(BaseCommand): - - parser = BaseCommand.load_parser(description='Authenticate to google workspace with oauth or service accounts.') - subparsers = parser.add_subparsers(dest='subcommand', title='subcommands', required=True) - - parser_session = subparsers.add_parser('session', description='Authenticate with Google Workspace (default oauth)', - help='Authenticate with Google Workspace (default oauth)') - parser_session.add_argument('--key', help='Name of key to store the creds under') - parser_session.add_argument('--creds', type=Path, help='Path to the credentials file') - parser_session.add_argument('--service-account', action='store_true', help='Authenticate a service account') - parser_session.add_argument('--store-key', type=str, help='Add authenticated session to credential store with key') - parser_list = subparsers.add_parser('list', description='List credential sessions within the cred store', - help='List credential sessions within the cred store') - - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self.parser_session.set_defaults(func=self.authenticate) - self.parser_list.set_defaults(func=self.list_sessions) - - self.args = validate_args(self.parser, self.args) - - def authenticate(self) -> Optional[Credentials]: - """Authenticate with Google Workspace using OAuth2.0.""" - if self.args.key: - validate_type = 'service' if self.args.service_account else 'oauth' - cred = self.obj.cred_store.get(self.args.key, validate_type=validate_type) - self.logger.info(f'Using stored credentials with key: {self.args.key}') - - if cred.session: - session = cred.refreshed_session() - else: - if self.args.service_account: - session = Credentials.from_service_account_info(cred.creds.to_dict()) - else: - flow = InstalledAppFlow.from_client_config(cred.creds.to_dict(), self.obj.config['google']['scopes']) - session = flow.run_local_server(port=0) - cred.session = session - elif self.args.creds: - if self.args.service_account: - check_file_exists(self.args.creds, f'Missing service account credentials file') - self.logger.info(f'Using service account credentials file: {self.args.creds}') - scopes = self.obj.config['google']['scopes'] - session = Credentials.from_service_account_file(str(self.args.creds), scopes=scopes) - else: - check_file_exists(self.args.creds, f'Missing OAuth2.0 credentials file: {self.args.creds}') - flow = InstalledAppFlow.from_client_secrets_file(str(self.args.creds), self.obj.config['google']['scopes']) - session = flow.run_local_server(port=0) - else: - self.logger.info(f'Missing key or credentials file.') - return None - - self.logger.info(f'Authenticated successfully.' if session else f'Failed to authenticate.') - if self.args.store_key: - creds = self.args.creds - cred_type = 'service' if self.args.service_account else 'oauth' - creds_obj = OAuthCreds.from_file(creds) if cred_type == 'oauth' else ServiceAccountCreds.from_file(creds) - self.obj.cred_store.add(self.args.store_key, creds=creds_obj, session=session, type=cred_type) - return session - - def list_sessions(self): - cred_sessions = self.obj.cred_store.list_sessions() - self.logger.info(f'Stored auth sessions: {", ".join(cred_sessions) if cred_sessions else None}') - - def execute(self) -> None: - self.args.func() diff --git a/swat/commands/creds.py b/swat/commands/creds.py index 4c18c4e..1f57075 100644 --- a/swat/commands/creds.py +++ b/swat/commands/creds.py @@ -41,7 +41,8 @@ def add_creds(self): else: creds = OAuthCreds.from_file(self.args.creds) if self.args.creds else None - self.obj.cred_store.add(self.args.key, creds=creds, override=self.args.override) + cred_type = 'service' if self.args.service_account else 'oauth' + self.obj.cred_store.add(self.args.key, creds=creds, override=self.args.override, cred_type=cred_type) self.logger.info(f'Credentials added with key: {self.args.key}') except TypeError as e: self.logger.info(f'Invalid credentials file: {self.args.creds} - {e}') diff --git a/swat/commands/scopes.py b/swat/commands/scopes.py deleted file mode 100644 index 5a506b0..0000000 --- a/swat/commands/scopes.py +++ /dev/null @@ -1,52 +0,0 @@ - -from ..commands.base_command import BaseCommand -from ..utils import ROOT_DIR, format_scopes -from ..misc import validate_args - -DEFAULT_TOKEN_FILE = ROOT_DIR / "token.pickle" - - -class Command(BaseCommand): - parser = BaseCommand.load_parser(description='SWAT credential management.') - subparsers = parser.add_subparsers(dest='subcommand', title="subcommands", required=True) - - parser_add = subparsers.add_parser('add', description="Add a scope for the authentication", - help='Add a scope for the authentication') - parser_add.add_argument("--scope", required=True, help="Scope to add for the authentication") - - parser_remove = subparsers.add_parser('remove', description="Remove a scope for the authentication", - help='Remove a scope for the authentication') - parser_remove.add_argument("--scope", required=True, help="Scope to remove for the authentication") - - parser_list = subparsers.add_parser('list', description="List all the scopes for the authentication", - help='List all the scopes for the authentication') - - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self.parser_add.set_defaults(func=self.add_scope) - self.parser_remove.set_defaults(func=self.remove_scope) - self.parser_list.set_defaults(func=self.list_scopes) - - self.args = validate_args(self.parser, self.args) - - def add_scope(self): - self.args.scope = format_scopes(self.args.scope) - if self.args.scope in self.obj.config['google']['scopes']: - self.logger.info(f"Scope already exists: {self.args.scope}") - return - self.obj.config['google']['scopes'].append(self.args.scope) - self.logger.info(f"Added scope(s): {self.args.scope}") - - def remove_scope(self): - self.args.scope = format_scopes(self.args.scope) - if self.args.scope in self.obj.config['google']['scopes']: - self.obj.config['google']['scopes'].remove(self.args.scope) - self.logger.info(f"Removed scope: {self.args.scope}") - else: - self.logger.info(f"Scope not found: {self.args.scope}") - - def list_scopes(self): - self.logger.info(f"OAuth scopes: {', '.join(self.obj.config['google']['scopes'])}") - - def execute(self) -> None: - self.args.func() diff --git a/swat/emulations/base_emulation.py b/swat/emulations/base_emulation.py index 90bb484..d2fdf34 100644 --- a/swat/emulations/base_emulation.py +++ b/swat/emulations/base_emulation.py @@ -60,7 +60,7 @@ def __init__(self, args: list, obj: SWAT, **extra) -> None: self.obj = obj self.logger = logging.getLogger(__name__) emulation_name = '.'.join(self.__module__.split('.')[2:]) - self.elogger = configure_emulation_logger(emulation_name, obj.config) + self.elogger = configure_emulation_logger(emulation_name, obj.config.merged) self.econfig = self.load_emulation_config() self.attack_data = self.get_attack() self.artifacts_path = self.setup_artifacts_folder() diff --git a/swat/emulations/collection/drive_access_private_keys.py b/swat/emulations/collection/drive_access_private_keys.py index 83e80d6..9ff6fc6 100644 --- a/swat/emulations/collection/drive_access_private_keys.py +++ b/swat/emulations/collection/drive_access_private_keys.py @@ -12,7 +12,8 @@ class Emulation(BaseEmulation): - parser = BaseEmulation.load_parser(description='Stages sensitive encryption key files in Google Drive and accesses them via shared links.') + parser = BaseEmulation.load_parser( + description='Stages sensitive encryption key files in Google Drive and accesses them via shared links.') parser.add_argument('session_key', default='default', help='Session to use for service building API service') parser.add_argument('folder_id', help='Google Drive Folder ID') parser.add_argument('--cleanup', action='store_true', default=False, help='Clean up staged files after execution') @@ -25,7 +26,8 @@ class Emulation(BaseEmulation): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self.folder_id = self.args.folder_id - self.service = build('drive', 'v3', credentials=self.obj.cred_store.store[self.args.session_key].session) + creds = self.obj.cred_store.get('default', validate_type='oauth') + self.service = build('drive', 'v3', credentials=creds.session()) # file extensions filtered to 5 for testing purposes self.file_extensions = [ "token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem", "gpg.sig", "der", "key","p7r", diff --git a/swat/main.py b/swat/main.py index d23428a..b58f072 100644 --- a/swat/main.py +++ b/swat/main.py @@ -8,12 +8,19 @@ from .misc import colorful_exit_message from .shell import SWATShell + ROOT_DIR = Path(__file__).parent.parent.absolute() +ETC_DIR = ROOT_DIR / 'swat' / 'etc' + +DEFAULT_CUSTOM_CONFIG_PATH = ETC_DIR / 'custom_config.yaml' CONFIG: dict = utils.load_etc_file('config.yaml') + def main(): parser = argparse.ArgumentParser(description='SWAT CLI') parser.add_argument('--debug', action='store_true', help='Debug mode') + parser.add_argument('--custom-config', type=Path, default=DEFAULT_CUSTOM_CONFIG_PATH, + help='Optional custom config file') args = parser.parse_args() level = logging.DEBUG if args.debug else logging.INFO @@ -27,6 +34,7 @@ def main(): finally: if shell.save_on_exit: shell.obj.cred_store.save() + shell.obj.config.save_custom() print(colorful_exit_message()) diff --git a/swat/shell.py b/swat/shell.py index a4d5ba4..d64549e 100644 --- a/swat/shell.py +++ b/swat/shell.py @@ -7,7 +7,7 @@ from typing import Optional, Type, TypeVar from . import utils -from .base import SWAT +from .base import SWAT, Config from .commands.base_command import BaseCommand from .commands.emulate import Command as EmulateCommand from .misc import CustomHelpFormatter, colorful_swat @@ -15,7 +15,7 @@ ROOT_DIR = Path(__file__).parent.parent.absolute() COMMANDS_DIR = ROOT_DIR / 'swat' / 'commands' -CONFIG: dict = utils.load_etc_file('config.yaml') +CONFIG_FILE = ROOT_DIR / 'swat' / 'etc' / 'config.yaml' logo = """ ░██████╗░██╗░░░░░░░██╗░█████╗░████████╗ @@ -31,6 +31,7 @@ KEY = '\U0001F511' USER = '\U0001F464' + class SWATShell(cmd.Cmd): intro = colorful_swat(logo) prompt = 'SWAT> ' @@ -42,13 +43,13 @@ def __init__(self, args: argparse.Namespace) -> None: logging.info('Logging in debug mode.') self.args = args - self.obj = SWAT(CONFIG) - self.obj.config['google']['scopes'] = format_scopes(self.obj.config['google']['scopes']) + config = Config(CONFIG_FILE, args.custom_config) + self.obj = SWAT(config) self._command_name = None self._new_args = None - self.save_on_exit = self.obj.config['settings'].get('save_on_exit', False) + self.save_on_exit = self.obj.config.merged['settings'].get('save_on_exit', False) self._registered_commands = self._register_commands() @@ -172,8 +173,7 @@ def precmd(self, line: str) -> str: def postcmd(self, stop: bool, line: str) -> bool: """Handle post-command processing.""" key = f'{KEY}' if self.obj.cred_store.store else '' - session = f'{USER}' if self.obj.cred_store.has_sessions else '' - self.prompt = f'SWAT{key}{session}>' if key or session else 'SWAT>' + self.prompt = f'SWAT{key}>' if key else 'SWAT>' return stop def default(self, line: str) -> any: diff --git a/swat/utils.py b/swat/utils.py index 034d69a..5e08537 100644 --- a/swat/utils.py +++ b/swat/utils.py @@ -37,6 +37,7 @@ def load_etc_file(filename: str) -> Union[str, dict]: elif path.suffix in ('.yaml', '.yml'): return yaml.safe_load(contents) + def check_file_exists(file: Path, error_message: str) -> None: """Check if the given file exists, raise an error if it does not.""" if not file.exists(): @@ -44,6 +45,7 @@ def check_file_exists(file: Path, error_message: str) -> None: if file.is_dir(): raise IsADirectoryError(f'{error_message}: {file}') + def clear_terminal() -> None: """Clear the terminal.""" os.system('cls' if sys.platform == 'windows' else 'clear') @@ -68,6 +70,7 @@ def wrap_text(text): table = tabulate(table_data, wrapped_headers, tablefmt=table_format) return table + def download_chromedriver(destination_path: Path) -> Path: """Download the appropriate ChromeDriver for the system and return its path.""" base_url = "https://chromedriver.storage.googleapis.com/" @@ -111,6 +114,7 @@ def download_chromedriver(destination_path: Path) -> Path: return destination_path / 'chromedriver' + def get_chromedriver(chromedriver_path: Optional[Path] = None) -> webdriver.Chrome: """Return a Chrome WebDriver instance, downloading ChromeDriver if necessary.""" chromedriver_path = chromedriver_path or ETC_DIR / 'chromedriver' @@ -122,9 +126,9 @@ def get_chromedriver(chromedriver_path: Optional[Path] = None) -> webdriver.Chro DEFAULT_EMULATION_ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True) options.add_experimental_option('prefs', { - "download.default_directory": str(DEFAULT_EMULATION_ARTIFACTS_DIR), # set the download directory - "download.prompt_for_download": False, # disable download prompt -}) + "download.default_directory": str(DEFAULT_EMULATION_ARTIFACTS_DIR), # set the download directory + "download.prompt_for_download": False, # disable download prompt + }) options.add_argument('--headless') # Run Chrome in headless mode service = Service(executable_path=str(chromedriver_path)) @@ -132,6 +136,7 @@ def get_chromedriver(chromedriver_path: Optional[Path] = None) -> webdriver.Chro return driver + def format_scopes(scopes: Union[str, List[str]]) -> List[str]: """Format a list of scopes for display.""" if isinstance(scopes, str): @@ -139,4 +144,14 @@ def format_scopes(scopes: Union[str, List[str]]) -> List[str]: if 'https://www.googleapis.com/auth/' not in scopes else scopes else: return [f'https://www.googleapis.com/auth/{scope}' for - scope in scopes if 'https://www.googleapis.com/auth/' not in scope] \ No newline at end of file + scope in scopes if 'https://www.googleapis.com/auth/' not in scope] + + +def deep_merge(d1, d2): + merged = d1.copy() + for key, value in d2.items(): + if key in d1 and isinstance(d1[key], dict) and isinstance(value, dict): + merged[key] = deep_merge(d1[key], value) + else: + merged[key] = value + return merged From 559fe5ccaab64979cb03f70f05b53b5cf513cf2f Mon Sep 17 00:00:00 2001 From: brokensound77 <16747370+brokensound77@users.noreply.github.com> Date: Fri, 11 Aug 2023 20:40:32 -0600 Subject: [PATCH 2/2] update sessions with scopes --- swat/base.py | 2 +- swat/emulations/collection/drive_access_private_keys.py | 6 +++--- .../initial_access/gmail_html_with_embedded_js.py | 7 ++++--- swat/emulations/persistence/admin_add_creds_to_users.py | 2 +- swat/emulations/persistence/admin_add_roles_to_users.py | 6 ++++-- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/swat/base.py b/swat/base.py index 4a7e4af..1cbfebd 100644 --- a/swat/base.py +++ b/swat/base.py @@ -81,7 +81,7 @@ class Cred: def session(self, scopes: Optional[list[str]] = None) -> Optional[Credentials]: if isinstance(self.creds, OAuthCreds): - session = Credentials.from_authorized_user_info(str(self.creds.to_dict()), scopes=scopes) + session = Credentials.from_authorized_user_info(self.creds.to_dict(), scopes=scopes) else: session = ServiceCredentials.from_service_account_info(self.creds.to_dict(), scopes=scopes) diff --git a/swat/emulations/collection/drive_access_private_keys.py b/swat/emulations/collection/drive_access_private_keys.py index 9ff6fc6..761d3c5 100644 --- a/swat/emulations/collection/drive_access_private_keys.py +++ b/swat/emulations/collection/drive_access_private_keys.py @@ -14,7 +14,7 @@ class Emulation(BaseEmulation): parser = BaseEmulation.load_parser( description='Stages sensitive encryption key files in Google Drive and accesses them via shared links.') - parser.add_argument('session_key', default='default', help='Session to use for service building API service') + parser.add_argument('--creds', default='default', help='Session to use for service building API service') parser.add_argument('folder_id', help='Google Drive Folder ID') parser.add_argument('--cleanup', action='store_true', default=False, help='Clean up staged files after execution') @@ -26,8 +26,8 @@ class Emulation(BaseEmulation): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self.folder_id = self.args.folder_id - creds = self.obj.cred_store.get('default', validate_type='oauth') - self.service = build('drive', 'v3', credentials=creds.session()) + creds = self.obj.cred_store.get(self.args.creds, validate_type='oauth') + self.service = build('drive', 'v3', credentials=creds.session(scopes=self.scopes)) # file extensions filtered to 5 for testing purposes self.file_extensions = [ "token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem", "gpg.sig", "der", "key","p7r", diff --git a/swat/emulations/initial_access/gmail_html_with_embedded_js.py b/swat/emulations/initial_access/gmail_html_with_embedded_js.py index 3f18032..b8a6056 100644 --- a/swat/emulations/initial_access/gmail_html_with_embedded_js.py +++ b/swat/emulations/initial_access/gmail_html_with_embedded_js.py @@ -12,7 +12,7 @@ class Emulation(BaseEmulation): parser = BaseEmulation.load_parser(description='Sends a phishing email to a user with a HTML attachment.') - parser.add_argument('session_key', default='default', help='Session to use for service building API service') + parser.add_argument('--creds', default='default', help='Session to use for service building API service') parser.add_argument('--recipient', required=True, help='Recipient email address') parser.add_argument('--sender', required=True, help='Sender email address') parser.add_argument('--subject', default='Phishing Test Email', help='Email subject') @@ -20,12 +20,13 @@ class Emulation(BaseEmulation): techniques = ['T1566.001', 'T1204.002'] name = 'Send HTML with Embedded Javascript with Gmail' - scopes = ['gmail.send','gmail.readonly','gmail.compose'] + scopes = ['gmail.send', 'gmail.readonly','gmail.compose'] services = ['gmail'] def __init__(self, **kwargs) -> None: super().__init__(**kwargs) - self.service = build('gmail', 'v1', credentials=self.obj.cred_store.store[self.args.session_key].session) + creds = self.obj.cred_store.get(self.args.creds, validate_type='oauth') + self.service = build('gmail', 'v1', credentials=creds.session(scopes=self.scopes)) def create_html(self) -> io.BytesIO: """Create an HTML file with embedded javascript.""" diff --git a/swat/emulations/persistence/admin_add_creds_to_users.py b/swat/emulations/persistence/admin_add_creds_to_users.py index 9c85d48..3ca1d36 100644 --- a/swat/emulations/persistence/admin_add_creds_to_users.py +++ b/swat/emulations/persistence/admin_add_creds_to_users.py @@ -5,7 +5,7 @@ class Emulation(BaseEmulation): parser = BaseEmulation.load_parser(description='Adds cloud credentials to a user account.') - parser.add_argument('session_key', default='default', help='Session to use for service building API service') + parser.add_argument('--creds', default='default', help='Session to use for service building API service') parser.add_argument('--username', required=True, help='Username to create') parser.add_argument('--password', required=True, help='Password for user') diff --git a/swat/emulations/persistence/admin_add_roles_to_users.py b/swat/emulations/persistence/admin_add_roles_to_users.py index 3517b96..1f848ae 100644 --- a/swat/emulations/persistence/admin_add_roles_to_users.py +++ b/swat/emulations/persistence/admin_add_roles_to_users.py @@ -5,7 +5,7 @@ class Emulation(BaseEmulation): parser = BaseEmulation.load_parser(description='Add privileged roles to a user.') - parser.add_argument('session_key', default='default', help='Session to use for service building API service') + parser.add_argument('--creds', default='default', help='Session to use for service building API service') parser.add_argument('--username', required=True, help='Username to add the role to') parser.add_argument('--roles', required=True, help='Roles to add') @@ -16,7 +16,9 @@ class Emulation(BaseEmulation): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) + creds = self.obj.cred_store.get(self.args.creds, validate_type='oauth') + self.session = creds.session(scopes=self.scopes) def execute(self) -> None: self.elogger.info(self.exec_str(self.parser.description)) - self.elogger.info('Hello, world, from T1098!') + self.elogger.info(f'Hello, world, from T1098! with session: {self.session}')