|
61 | 61 | from packaging.version import parse
|
62 | 62 | import time
|
63 | 63 | from datetime import datetime
|
| 64 | +from dataclasses import dataclass |
64 | 65 | from threading import Thread, Barrier, Event
|
65 | 66 | from firebird.driver import connect, connect_server, create_database, driver_config, \
|
66 | 67 | NetProtocol, Server, CHARSET_MAP, Connection, Cursor, \
|
@@ -2751,3 +2752,76 @@ def temp_files_fixture(tmp_path) -> List[Path]:
|
2751 | 2752 |
|
2752 | 2753 | return temp_files_fixture
|
2753 | 2754 |
|
| 2755 | +@dataclass |
| 2756 | +class ConfigManagerBackup: |
| 2757 | + action: str |
| 2758 | + config_file: Path |
| 2759 | + backup_file: Path = None |
| 2760 | + |
| 2761 | +class ConfigManager: |
| 2762 | + """Object to replace specified server configuration file. |
| 2763 | +
|
| 2764 | + Arguments: |
| 2765 | + tmp_path: Path to directory where backup will be stored. |
| 2766 | + old_config: Old config file which will be keeped in backup (e.g. databases.conf) |
| 2767 | +
|
| 2768 | + .. important:: |
| 2769 | +
|
| 2770 | + Do not create instances of this class directly! Use **only** fixtures created by `store_config`. |
| 2771 | + """ |
| 2772 | + |
| 2773 | + def __init__(self, tmp_path: Path): |
| 2774 | + self.__tmp_path = tmp_path |
| 2775 | + self.__bak_configs: Dict[str, ConfigManagerBackup] = {} |
| 2776 | + |
| 2777 | + def _backup(self, config_name: str): |
| 2778 | + old_config = _vars_['home-dir'] / config_name |
| 2779 | + if config_name in self.__bak_configs: |
| 2780 | + return old_config |
| 2781 | + if old_config.exists(): |
| 2782 | + backup = self.__tmp_path / (config_name.replace('/', '_') + '.bak') |
| 2783 | + if backup.exists(): |
| 2784 | + backup.unlink() |
| 2785 | + shutil.copy(str(old_config), str(backup)) |
| 2786 | + self.__bak_configs[config_name] = ConfigManagerBackup('replace', old_config, backup) |
| 2787 | + else: |
| 2788 | + self.__bak_configs[config_name] = ConfigManagerBackup('delete', old_config) |
| 2789 | + return old_config |
| 2790 | + |
| 2791 | + def replace(self, config_name: str, new_config: Union[Path, str]): |
| 2792 | + """ |
| 2793 | + config_name: Relative path to config in server |
| 2794 | + new_config: Path to new config or content of config |
| 2795 | + """ |
| 2796 | + old_config = self._backup(config_name) |
| 2797 | + if isinstance(new_config, Path): |
| 2798 | + shutil.copy(str(new_config), str(old_config)) |
| 2799 | + else: |
| 2800 | + with open(old_config, 'w') as old: |
| 2801 | + old.write(new_config) |
| 2802 | + |
| 2803 | + def add(self, config_name: str, new_config: Union[Path, str]): |
| 2804 | + """ |
| 2805 | + config_name: Relative path to config in server |
| 2806 | + new_config: Path to new config or content of config |
| 2807 | + """ |
| 2808 | + old_config = self._backup(config_name) |
| 2809 | + new_content = new_config.read_text() if isinstance(new_config, Path) else new_config |
| 2810 | + with open(old_config, 'a') as old: |
| 2811 | + old.write(new_content) |
| 2812 | + |
| 2813 | + def restore(self, final=False): |
| 2814 | + for backup in self.__bak_configs.values(): |
| 2815 | + if backup.action == 'replace': |
| 2816 | + shutil.copy(str(backup.backup_file), str(backup.config_file)) |
| 2817 | + if final: |
| 2818 | + backup.backup_file.unlink() |
| 2819 | + elif backup.action == 'delete': |
| 2820 | + backup.config_file.unlink() |
| 2821 | + self.__bak_configs = {} |
| 2822 | + |
| 2823 | +@pytest.fixture |
| 2824 | +def store_config(db_path) -> ConfigManager: |
| 2825 | + manager = ConfigManager(db_path) |
| 2826 | + yield manager |
| 2827 | + manager.restore(final=True) |
0 commit comments