From 3b412f907f442e0d4e421c79f753cebe79898e63 Mon Sep 17 00:00:00 2001 From: Brian Torres-Gil Date: Mon, 8 Aug 2022 21:59:56 -0700 Subject: [PATCH 1/2] feat: Software updater path can exclude base image The software updater installed the base image before installing the patch image for each major and minor release. This comes with the disadvantage that the base image might have bugs and is running for a short time before the patch upgrade begins. To avoid these potential bugs in the base image, a new option to skip installing the base image is now possible. --- panos/__init__.py | 29 ++++++-- panos/updater.py | 158 +++++++++++++++++++++++++++--------------- tests/test_updater.py | 112 ++++++++++++++++++++++++++++++ 3 files changed, 237 insertions(+), 62 deletions(-) create mode 100644 tests/test_updater.py diff --git a/panos/__init__.py b/panos/__init__.py index a63d74e1..fedab2dd 100755 --- a/panos/__init__.py +++ b/panos/__init__.py @@ -127,6 +127,12 @@ def isstring(arg): pan.DEBUG3 = pan.DEBUG2 - 1 +def stringToVersion(other): + if isstring(other): + other = PanOSVersion(other) + return other + + class PanOSVersion(LooseVersion): """LooseVersion with convenience properties to access version components""" @@ -150,6 +156,10 @@ def patch(self): def mainrelease(self): return self.version[0:3] + @property + def minorrelease(self): + return self.version[0:2] + @property def subrelease(self): try: @@ -174,6 +184,19 @@ def subrelease_num(self): subrelease_num = None return subrelease_num + @property + def baseimage(self): + # Account for lack of PAN-OS 7.0.0 + if self.major == 7 and self.minor == 0: + base_patch = "1" + else: + base_patch = "0" + version_string = str(self) + version_tokens = version_string.split("-")[0].split(".") + version_tokens[2] = base_patch + base_image_string = ".".join(version_tokens) + return PanOSVersion(base_image_string) + def __repr__(self): return "PanOSVersion ('%s')" % str(self) @@ -221,12 +244,6 @@ def __ne__(self, other): return not self.__eq__(other) -def stringToVersion(other): - if isstring(other): - other = PanOSVersion(other) - return other - - def tree_legend_dot(): """Create a graphviz dot string for a legend graph""" modules = ["firewall", "policies", "objects", "network", "device", "panorama", "ha"] diff --git a/panos/updater.py b/panos/updater.py index bbf025e4..3ad985b2 100644 --- a/panos/updater.py +++ b/panos/updater.py @@ -17,6 +17,8 @@ """Device updater handles software versions and updates for devices""" +from typing import Any, List, Literal, Optional, Union, cast + from pan.config import PanConfig import panos.errors as err @@ -151,7 +153,12 @@ def _parse_current_version(self, response_element): self._logger.debug("Found current version: %s" % current_version) return current_version - def download_install(self, version, load_config=None, sync=False): + def download_install( + self, + version: Union[str, PanOSVersion], + load_config: Optional[str] = None, + sync: bool = False, + ) -> Any: """Download and install the requested PAN-OS version. Like a combinations of the ``check()``, ``download()``, and @@ -173,13 +180,11 @@ def download_install(self, version, load_config=None, sync=False): If sync, returns result of PAN-OS install job """ - if isstring(version): - version = PanOSVersion(version) # Get list of software if needed if not self.versions: self.check() # Get versions as StrictVersion objects - available_versions = map(PanOSVersion, self.versions.keys()) + available_versions = list(map(PanOSVersion, self.versions.keys())) target_version = PanOSVersion(str(version)) current_version = PanOSVersion(self.pandevice.version) @@ -202,7 +207,12 @@ def download_install(self, version, load_config=None, sync=False): result = self.install(target_version, load_config=load_config, sync=sync) return result - def download_install_reboot(self, version, load_config=None, sync=False): + def download_install_reboot( + self, + version: Union[str, PanOSVersion], + load_config: Optional[str] = None, + sync: bool = False, + ) -> Optional[str]: """Download and install the requested PAN-OS version, then reboot. Like a combinations of the ``check()``, ``download()``, and @@ -219,9 +229,8 @@ def download_install_reboot(self, version, load_config=None, sync=False): err.PanDeviceError: problem found in pre-download checks or after reboot """ - if isstring(version): - version = PanOSVersion(version) - self.download_install(version, load_config, sync=True) + target_version = PanOSVersion(str(version)) + self.download_install(target_version, load_config, sync=True) # Reboot the device self._logger.info( "Device %s is rebooting after upgrading to version %s. This will take a while." @@ -229,7 +238,7 @@ def download_install_reboot(self, version, load_config=None, sync=False): ) self.pandevice.restart() if sync: - new_version = self.pandevice.syncreboot() + new_version: str = self.pandevice.syncreboot() if version != new_version: raise err.PanDeviceError( "Attempt to upgrade to version %s failed." @@ -241,14 +250,62 @@ def download_install_reboot(self, version, load_config=None, sync=False): else: return None - def upgrade_to_version(self, target_version, dryrun=False): + def _next_upgrade_version( + self, + target_version: Union[PanOSVersion, Literal["latest"]], + install_base: bool, + ) -> PanOSVersion: + current_version = PanOSVersion(self.pandevice.version) + available_versions = list(map(PanOSVersion, self.versions.keys())) + latest_version = max(available_versions) + next_minor_version = self._next_minor_version(current_version) + if install_base: + if target_version == "latest": + return min(latest_version, next_minor_version) + elif latest_version < target_version: + return next_minor_version + elif not self._direct_upgrade_possible(current_version, target_version): + return next_minor_version + else: + return cast(PanOSVersion, target_version) + else: + if target_version == "latest": + return latest_version + elif latest_version < target_version: + return latest_version + else: + return cast(PanOSVersion, target_version) + + def _current_version_is_target( + self, target_version: Union[PanOSVersion, str] + ) -> bool: + target_version = PanOSVersion(str(target_version)) + current_version = PanOSVersion(self.pandevice.version) + available_versions = list(map(PanOSVersion, self.versions.keys())) + latest_version = max(available_versions) + if current_version == target_version: + return True + elif target_version == "latest" and current_version == latest_version: + return True + else: + return False + + def upgrade_to_version( + self, + target_version: Union[str, PanOSVersion], + dryrun: bool = False, + install_base: bool = True, + ) -> List[str]: """Upgrade to the target version, completing all intermediate upgrades. For example, if firewall is running version 9.0.5 and target version is 10.0.2, then this method will proceed through the following steps: + - Download 9.1.0 - Upgrade to 9.1.0 and reboot - - Upgrade to 10.0.0 and reboot + - Download 10.0.0 + - Upgrade to 10.0.0 and reboot (to skip this step, set `install_base` to False) + - Download 10.0.2 - Upgrade to 10.0.2 and reboot Does not account for HA pairs. @@ -259,13 +316,17 @@ def upgrade_to_version(self, target_version, dryrun=False): from panos.firewall import Firewall - fw = Firewall("10.0.0.5", "admin", "password") + fw = Firewall("192.168.1.1", "admin", "password") fw.software.upgrade_to_version("10.0.2") Args: target_version (string): PAN-OS version (eg. "10.0.2") or "latest" dryrun (bool, optional): Log what steps would be taken, but don't make any changes to the live device. Defaults to False. + install_base (bool, optional): The upgrade path will include an + upgrade to each base image (eg. 10.0.0) before upgrade to the + patch version. If this is False, the base image will download + but not install. Raises: err.PanDeviceError: any problem during the upgrade process @@ -279,10 +340,11 @@ def upgrade_to_version(self, target_version, dryrun=False): starting_version = self.pandevice.version # Get versions as StrictVersion objects - available_versions = map(PanOSVersion, self.versions.keys()) + target_is_latest = target_version == "latest" + target_version = ( + PanOSVersion(str(target_version)) if not target_is_latest else "latest" + ) current_version = PanOSVersion(self.pandevice.version) - latest_version = max(available_versions) - next_minor_version = self._next_minor_version(current_version) # Check that this is an upgrade, not a downgrade if current_version > target_version: @@ -291,48 +353,27 @@ def upgrade_to_version(self, target_version, dryrun=False): % (self.pandevice.id, self.pandevice.version, target_version) ) - # Determine the next version to upgrade to - if target_version == "latest": - next_version = min(latest_version, next_minor_version) - elif latest_version < target_version: - next_version = next_minor_version - elif not self._direct_upgrade_possible(current_version, target_version): - next_version = next_minor_version - else: - next_version = PanOSVersion(str(target_version)) - - if next_version not in available_versions and not dryrun: - self._logger.info( - "Device %s upgrading to %s, currently on %s. Checking for newer versions." - % (self.pandevice.id, target_version, self.pandevice.version) - ) - self.check() - available_versions = map(PanOSVersion, self.versions.keys()) - latest_version = max(available_versions) - # Check if done upgrading - if current_version == target_version: + if self._current_version_is_target(target_version): self._logger.info( "Device %s is running target version: %s" - % (self.pandevice.id, target_version) - ) - return True - elif target_version == "latest" and current_version == latest_version: - self._logger.info( - "Device %s is running latest version: %s" - % (self.pandevice.id, latest_version) + % (self.pandevice.id, current_version) ) if dryrun: self._logger.info( - "NOTE: dryrun with 'latest' does not show all upgrades," - ) - self._logger.info( - "as new versions are learned through the upgrade process," + "NOTE: dryrun with 'latest' does not show all upgrades, as new versions are learned through the upgrade process, so results may be different than dryrun output when using 'latest'." ) - self._logger.info( - "so results may be different than dryrun output when using 'latest'." - ) - return True + return [str(current_version)] + + # Determine the next version to upgrade to + next_version = self._next_upgrade_version(target_version, install_base) + + # Download base image if needed + if ( + not install_base + and not self.versions[str(next_version.baseimage)]["downloaded"] + ): + self.download(next_version.baseimage, sync=True) # Ensure the content pack is upgraded to the latest self.pandevice.content.download_and_install_latest(sync=True) @@ -347,10 +388,12 @@ def upgrade_to_version(self, target_version, dryrun=False): else: self.download_install_reboot(next_version, sync=True) self.check() - result = self.upgrade_to_version(target_version, dryrun=dryrun) + result = self.upgrade_to_version( + target_version, dryrun=dryrun, install_base=install_base + ) if result and dryrun: self.pandevice.version = starting_version - return result + return [str(current_version)] + result def _next_major_version(self, version): if isstring(version): @@ -361,12 +404,15 @@ def _next_major_version(self, version): next_version = PanOSVersion("7.0.1") return next_version - def _next_minor_version(self, version): + def _next_minor_version(self, version: Union[PanOSVersion, str]) -> PanOSVersion: from panos.firewall import Firewall - if isstring(version): - next_version = PanOSVersion(version) - if version.minor == 1: + version = PanOSVersion(str(version)) + + # Account for 10.2.x (only release with minor version of '2') + if version.major == 10 and version.minor == 1: + next_version = PanOSVersion("10.2.0") + elif version.minor == 1: next_version = PanOSVersion(str(version.major + 1) + ".0.0") # There is no PAN-OS 5.1 for firewalls, so next minor release from 5.0.x is 6.0.0. elif ( diff --git a/tests/test_updater.py b/tests/test_updater.py new file mode 100644 index 00000000..f7139658 --- /dev/null +++ b/tests/test_updater.py @@ -0,0 +1,112 @@ +try: + from unittest import mock +except ImportError: + import mock + +from panos import PanOSVersion +from panos.firewall import Firewall + + +def _fw(version): + fw = Firewall("127.0.0.1", "admin", "admin", "secret") + fw._set_version_and_version_info(version) + return fw + + +def _updater_fw_setup(*args): + fw = _fw() + + return fw + + +def versionStrToTuple(version_string): + tokens = version_string.split(".")[:3] + tokens[2] = tokens[2].split("-")[0] + return tuple(int(x) for x in tokens) + + +def _create_mock_check(fw): + patches = range(5) + + def mock_check(): + version_info = versionStrToTuple(fw.version) + current_minor = ".".join(map(lambda x: str(x), version_info[0:-1])) + if version_info[1] == 0: + next_minor = ".".join([str(version_info[0]), "1"]) + else: + next_minor = ".".join([str(version_info[0] + 1), "0"]) + versions = [".".join((str(current_minor), str(patch))) for patch in patches] + versions += [".".join((str(next_minor), str(patch))) for patch in patches] + fw.software.versions = {version: {"downloaded": False} for version in versions} + + return mock_check + + +def _create_mock_download_install_reboot(fw): + def mock_download_install_reboot(next_version, sync): + fw.version = str(next_version) + return next_version + + return mock_download_install_reboot + + +def test_upgrade_to_version_with_install_base(): + fw = _fw("8.0.2") + + fw.software.check = mock.Mock(side_effect=_create_mock_check(fw)) + fw.software.download_install_reboot = mock.Mock( + side_effect=_create_mock_download_install_reboot(fw) + ) + fw.content.download_and_install_latest = mock.Mock() + fw.software.download = mock.Mock() + + result = fw.software.upgrade_to_version("10.1.3") + assert result == ["8.0.2", "8.1.0", "9.0.0", "9.1.0", "10.0.0", "10.1.0", "10.1.3"] + + +def test_upgrade_to_version_without_install_base(): + fw = _fw("8.0.2") + + fw.software.check = mock.Mock(side_effect=_create_mock_check(fw)) + fw.software.download_install_reboot = mock.Mock( + side_effect=_create_mock_download_install_reboot(fw) + ) + fw.content.download_and_install_latest = mock.Mock() + fw.software.download = mock.Mock() + + result = fw.software.upgrade_to_version("10.1.3", install_base=False) + assert result == ["8.0.2", "8.1.4", "9.0.4", "9.1.4", "10.0.4", "10.1.3"] + + +def test_next_upgrade_version_with_10_2_with_install_base(): + fw = _fw("10.1.3") + fw.software.versions = { + "10.1.0": "", + "10.1.1": "", + "10.1.2": "", + "10.1.3": "", + "10.1.4": "", + "10.2.0": "", + "10.2.1": "", + "10.2.2": "", + "10.2.3": "", + } + result = fw.software._next_upgrade_version("11.0.2", install_base=True) + assert result == PanOSVersion("10.2.0") + + +def test_next_upgrade_version_with_10_2_without_install_base(): + fw = _fw("10.1.3") + fw.software.versions = { + "10.1.0": "", + "10.1.1": "", + "10.1.2": "", + "10.1.3": "", + "10.1.4": "", + "10.2.0": "", + "10.2.1": "", + "10.2.2": "", + "10.2.3": "", + } + result = fw.software._next_upgrade_version("11.0.2", install_base=False) + assert result == PanOSVersion("10.2.3") From 388da2d3ba1afc600a7c82b3f2b40128d5d274be Mon Sep 17 00:00:00 2001 From: Brian Torres-Gil Date: Thu, 18 Jan 2024 01:43:51 +0000 Subject: [PATCH 2/2] chore: Fix issues with PANOS software updater subsystem --- panos/updater.py | 99 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 77 insertions(+), 22 deletions(-) diff --git a/panos/updater.py b/panos/updater.py index 3ad985b2..8005d333 100644 --- a/panos/updater.py +++ b/panos/updater.py @@ -233,7 +233,7 @@ def download_install_reboot( self.download_install(target_version, load_config, sync=True) # Reboot the device self._logger.info( - "Device %s is rebooting after upgrading to version %s. This will take a while." + "Device %s is rebooting after upgrading to version %s. This will take a while." % (self.pandevice.id, version) ) self.pandevice.restart() @@ -256,36 +256,56 @@ def _next_upgrade_version( install_base: bool, ) -> PanOSVersion: current_version = PanOSVersion(self.pandevice.version) + if target_version != "latest" and current_version == target_version: + return None available_versions = list(map(PanOSVersion, self.versions.keys())) latest_version = max(available_versions) next_minor_version = self._next_minor_version(current_version) + if next_minor_version not in available_versions: + next_minor_version = None if install_base: if target_version == "latest": - return min(latest_version, next_minor_version) - elif latest_version < target_version: - return next_minor_version - elif not self._direct_upgrade_possible(current_version, target_version): - return next_minor_version + return ( + next_minor_version + if next_minor_version is not None + else latest_version + ) + elif self._direct_upgrade_possible( + current_version, target_version, install_base + ): + # No minor upgrade needed to target + return target_version + elif next_minor_version is None: + return latest_version else: - return cast(PanOSVersion, target_version) + return next_minor_version else: if target_version == "latest": - return latest_version - elif latest_version < target_version: - return latest_version + if next_minor_version is None: + return latest_version + else: + return self._latest_patch_version( + next_minor_version, available_versions + ) + elif self._direct_upgrade_possible( + current_version, target_version, install_base + ): + return target_version else: - return cast(PanOSVersion, target_version) + # More than one minor upgrade needed to target + return self._latest_patch_version( + next_minor_version, available_versions + ) def _current_version_is_target( - self, target_version: Union[PanOSVersion, str] + self, target_version: Union[PanOSVersion, Literal["latest"]] ) -> bool: - target_version = PanOSVersion(str(target_version)) current_version = PanOSVersion(self.pandevice.version) available_versions = list(map(PanOSVersion, self.versions.keys())) latest_version = max(available_versions) - if current_version == target_version: + if target_version == "latest" and current_version == latest_version: return True - elif target_version == "latest" and current_version == latest_version: + elif current_version == target_version: return True else: return False @@ -373,14 +393,21 @@ def upgrade_to_version( not install_base and not self.versions[str(next_version.baseimage)]["downloaded"] ): - self.download(next_version.baseimage, sync=True) + if dryrun: + self._logger.info( + "Device %s will download base image: %s" + % (self.pandevice.id, next_version.baseimage) + ) + else: + self.download(next_version.baseimage, sync=True) # Ensure the content pack is upgraded to the latest - self.pandevice.content.download_and_install_latest(sync=True) + if not dryrun: + self.pandevice.content.download_and_install_latest(sync=True) # Upgrade to the next version self._logger.info( - "Device %s will be upgraded to version: %s" + "Device %s will download and upgrade to version: %s" % (self.pandevice.id, next_version) ) if dryrun: @@ -412,7 +439,7 @@ def _next_minor_version(self, version: Union[PanOSVersion, str]) -> PanOSVersion # Account for 10.2.x (only release with minor version of '2') if version.major == 10 and version.minor == 1: next_version = PanOSVersion("10.2.0") - elif version.minor == 1: + elif version.minor > 0: next_version = PanOSVersion(str(version.major + 1) + ".0.0") # There is no PAN-OS 5.1 for firewalls, so next minor release from 5.0.x is 6.0.0. elif ( @@ -436,7 +463,22 @@ def _next_patch_version(self, version): ) return next_version - def _direct_upgrade_possible(self, current_version, target_version): + def _latest_patch_version( + self, version: Union[str, PanOSVersion], available_versions: List[PanOSVersion] + ): + if isstring(version): + version = PanOSVersion(version) + found_patch = False + latest_patch: PanOSVersion = PanOSVersion("0.0.0") + for v in available_versions: + if v.major == version.major and v.minor == version.minor: + latest_patch = max(latest_patch, v) + found_patch = True + return latest_patch if found_patch else None + + def _direct_upgrade_possible( + self, current_version, target_version, install_base=True + ): """Check if current version can directly upgrade to target version :returns True if a direct upgrade is possible, False if not @@ -461,7 +503,7 @@ def _direct_upgrade_possible(self, current_version, target_version): current_version.major == target_version.major and current_version.minor == 0 and target_version.minor == 1 - and target_version.patch == 0 + and (not install_base or target_version.patch == 0) ): return True @@ -471,10 +513,12 @@ def _direct_upgrade_possible(self, current_version, target_version): current_version.major + 1 == target_version.major and current_version.minor == 1 and target_version.minor == 0 - and target_version.patch == 0 + and (not install_base or target_version.patch == 0) ): return True + # SPECIAL CASES + # Upgrading a firewall from PAN-OS 5.0.x to 6.0.x # This is a special case because there is no PAN-OS 5.1.x from panos.firewall import Firewall @@ -487,6 +531,17 @@ def _direct_upgrade_possible(self, current_version, target_version): ): return True + # Upgrade from PAN-OS 10.1.x to 10.2.x + # This is a special case because only minor release with a 2 + if ( + current_version.major == 10 + and current_version.minor == 1 + and target_version.major == 10 + and target_version.minor == 2 + and (not install_base or target_version.patch == 0) + ): + return True + return False