diff --git a/.github/workflows/_discover_python_ver.yml b/.github/workflows/_discover_python_ver.yml index 87c3515..50e1248 100644 --- a/.github/workflows/_discover_python_ver.yml +++ b/.github/workflows/_discover_python_ver.yml @@ -22,7 +22,7 @@ jobs: pyversion: ${{ steps.pyversion.outputs.pyversion }} steps: - name: checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: discover Python version id: pyversion uses: ./.github/actions/discover_python_version diff --git a/.github/workflows/_docker.yml b/.github/workflows/_docker.yml index 80924d1..70caecf 100644 --- a/.github/workflows/_docker.yml +++ b/.github/workflows/_docker.yml @@ -29,7 +29,7 @@ jobs: contents: read steps: - name: checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: ref: ${{ inputs.tag_name }} @@ -55,7 +55,7 @@ jobs: - name: determine docker tags and labels id: meta - uses: docker/metadata-action@v4 + uses: docker/metadata-action@v5 with: images: ghcr.io/paloaltonetworks/panos_upgrade_assurance tags: | @@ -66,14 +66,14 @@ jobs: - name: login to GHCR if: inputs.publish - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: registry: ghcr.io username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - name: build ${{ inputs.publish && 'and publish' || '' }} - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: . push: ${{ inputs.publish }} diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index e315847..26ae04e 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -65,7 +65,7 @@ jobs: runs-on: ubuntu-latest steps: - name: checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: pack the documentation working-directory: docs run: tar --exclude .DS_Store --exclude sidebars.js -cvf documentation.tar * @@ -84,7 +84,7 @@ jobs: pull-requests: write steps: - name: checkout pan.dev - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: repository: PaloAltoNetworks/pan.dev token: ${{ secrets.CLSC_PAT }} diff --git a/.github/workflows/publish_documentation.yml b/.github/workflows/publish_documentation.yml index d1c2292..6f24839 100644 --- a/.github/workflows/publish_documentation.yml +++ b/.github/workflows/publish_documentation.yml @@ -52,7 +52,7 @@ jobs: runs-on: ubuntu-latest steps: - name: checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: pack the documentation working-directory: docs run: tar --exclude .DS_Store --exclude sidebars.js -cvf documentation.tar * @@ -73,7 +73,7 @@ jobs: pull-requests: write steps: - name: checkout pan.dev - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: repository: PaloAltoNetworks/pan.dev token: ${{ secrets.CLSC_PAT }} diff --git a/.github/workflows/publish_python_package.yml b/.github/workflows/publish_python_package.yml index d0799c6..cc2d73b 100644 --- a/.github/workflows/publish_python_package.yml +++ b/.github/workflows/publish_python_package.yml @@ -19,7 +19,7 @@ jobs: needs: pyversion steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Python uses: actions/setup-python@v4 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d5def47..82ec16f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -22,11 +22,11 @@ jobs: ver: ${{ steps.rc.outputs.new_release_version }} steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Check if new version will be produced id: rc - uses: cycjimmy/semantic-release-action@v3 + uses: cycjimmy/semantic-release-action@v4 with: dry_run: true semantic_version: 19.0 @@ -83,7 +83,7 @@ jobs: tag: ${{ steps.release.outputs.new_release_git_tag }} steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Python uses: actions/setup-python@v4 @@ -105,11 +105,11 @@ jobs: - name: Create release and publish to GitHub id: release - uses: cycjimmy/semantic-release-action@v3 + uses: cycjimmy/semantic-release-action@v4 with: semantic_version: 19.0 extra_plugins: | conventional-changelog-conventionalcommits@^5.0.0 @semantic-release/git@^10.0.1 env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_TOKEN: ${{ secrets.RELEASE_PAT }} diff --git a/.github/workflows/sub_docs.yml b/.github/workflows/sub_docs.yml index d4604cc..63f0258 100644 --- a/.github/workflows/sub_docs.yml +++ b/.github/workflows/sub_docs.yml @@ -19,7 +19,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Python uses: actions/setup-python@v4 diff --git a/.github/workflows/sub_format.yml b/.github/workflows/sub_format.yml index 39a7bbe..4dbf61a 100644 --- a/.github/workflows/sub_format.yml +++ b/.github/workflows/sub_format.yml @@ -18,7 +18,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Python uses: actions/setup-python@v4 diff --git a/.github/workflows/sub_unittest.yml b/.github/workflows/sub_unittest.yml index 4c57427..8f2b327 100644 --- a/.github/workflows/sub_unittest.yml +++ b/.github/workflows/sub_unittest.yml @@ -19,7 +19,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Python uses: actions/setup-python@v4 diff --git a/Makefile b/Makefile index 66b565c..14b6587 100644 --- a/Makefile +++ b/Makefile @@ -1,21 +1,28 @@ +.phony: lint lint: flake8 panos_upgrade_assurance tests +.phony: security security: bandit -c pyproject.toml -r . +.phony: format_check format_check: black --diff --check panos_upgrade_assurance tests +.phony: format format: black panos_upgrade_assurance tests +.phony: test_coverage test_coverage: pytest --cov panos_upgrade_assurance --cov-report=term-missing --cov-report=xml:coverage.xml +.phony: documentation documentation: pydoc-markdown +.phony: check_line_length check_line_length: @for FILE in $$(find . -type f -name '*.py'); do \ echo $$FILE; \ @@ -29,4 +36,8 @@ check_line_length: done < "$$FILE"; \ done +.phony: all all: lint format security test_coverage documentation + +.phony: sca +sca: format lint security \ No newline at end of file diff --git a/README.md b/README.md index 2649b8b..ac1204e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,11 @@ +![GitHub release (latest by date)](https://img.shields.io/github/v/release/PaloAltoNetworks/pan-os-upgrade-assurance?style=flat-square) +![GitHub](https://img.shields.io/github/license/PaloAltoNetworks/terraform-modules-vmseries-ci-workflows?style=flat-square) +![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/PaloAltoNetworks/pan-os-upgrade-assurance/release.yml?style=flat-square) +![GitHub issues](https://img.shields.io/github/issues/PaloAltoNetworks/pan-os-upgrade-assurance?style=flat-square) +![GitHub pull requests](https://img.shields.io/github/issues-pr/PaloAltoNetworks/pan-os-upgrade-assurance?style=flat-square) +![PyPI - Downloads](https://img.shields.io/pypi/dm/panos-upgrade-assurance?style=flat-square) + + # PAN-OS Upgrade Assurance ## Overview diff --git a/docs/panos-upgrade-assurance/api/check_firewall.md b/docs/panos-upgrade-assurance/api/check_firewall.md index 4f9efe1..6e41e88 100644 --- a/docs/panos-upgrade-assurance/api/check_firewall.md +++ b/docs/panos-upgrade-assurance/api/check_firewall.md @@ -459,6 +459,76 @@ __Returns__ * [`CheckStatus.ERROR`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when the certificate's properties (installed or required) are not supported. +### `CheckFirewall._calculate_schedule_time_diff` + +```python +def _calculate_schedule_time_diff(now_dt: datetime, schedule_type: str, + schedule: dict) -> (int, str) +``` + +A method that calculates the time distance between two `datetime` objects. + +:::note +This method is used only by [`CheckFirewall.check_scheduled_updates()`](#checkfirewallcheck_scheduled_updates) method and it expects some information +to be already available. +::: + +__Parameters__ + + +- __now_dt__ (`datetime`): A `datetime` object representing the current moment in time. Ideally this should be the device's local + time, taken from the management plane clock. +- __schedule_type__ (`str`): A schedule type returned by PanOS, can be one of: `every-*`, `hourly`, `daily`, `weekly`, + `real-time`. +- __schedule__ (`dict`): Value of the `recurring` key in the API response, see + [`FirewallProxy.get_update_schedules()`](/panos/docs/panos-upgrade-assurance/api/firewall_proxy#firewallproxyget_update_schedules) + documentation for details. Both formats (locally configured and pushed from a Panorama template) are supported. + +__Raises__ + + +- `MalformedResponseException`: Thrown then the `schedule_type` is not recognizable. + +__Returns__ + + +`tuple(int, str)`: A tuple containing the calculated time difference (in minutes) and human-readable description. + +### `CheckFirewall.check_scheduled_updates` + +```python +def check_scheduled_updates(test_window: int = 60) -> CheckResult +``` + +Check if any Dynamic Update job is scheduled to run within the specified time window. + +When device is configured via Panorama, this includes schedules set up in Templates. It does not however include schedules +configured in `Panorama/Device Deployment/Dynamic Updates/Schedules`. + +__Parameters__ + + +- __test_window__ (`int, optional`): (defaults to 60 minutes). A time window in minutes to look for an update job occurrence. + Has to be a value between `60` and `10080` (1 week equivalent). The time window is calculated based on the device's + local time (taken from the management plane). + +__Raises__ + + +- `MalformedResponseException`: Thrown in case API response does not meet expectations. + +__Returns__ + + +`CheckResult`: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking value of: + +* [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when there is no update job + planned within the test window. +* [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` + field contains information about the planned jobs with next occurrence time provided if possible. +* [`CheckStatus.ERROR`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when the `test_window` parameter + does not meet criteria. + ### `CheckFirewall.check_non_finished_jobs` ```python @@ -479,6 +549,33 @@ __Returns__ * [`CheckStatus.SKIPPED`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when there are no jobs on a device. +### `CheckFirewall.check_unsupported_transceivers` + +```python +def check_unsupported_transceivers( + supported_sfp_regex: Optional[List[str]] = None) -> CheckResult +``` + +Check for any Optical Transceivers (SFPs or otherwise) that aren't supported by Palo Alto Networks. + +__Parameters__ + + +- __supported_sfp_regex__ (`list, optional`): List of supported transceivers, as regex strings, to mark SFP's as + supported even if they aren't OEM. + +__Returns__ + + +`CheckResult`: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking value of: + +* [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) When all optics are OEM and + PAN supported +* [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` + field contains information about which Slots and Physical ports currently have unsupported transceivers installed +* [`CheckStatus.SKIPPED`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when there are no transceiver + slots at all. + ### `CheckFirewall.get_content_db_version` ```python diff --git a/docs/panos-upgrade-assurance/api/exceptions.md b/docs/panos-upgrade-assurance/api/exceptions.md index 9e25674..ec5ff8d 100644 --- a/docs/panos-upgrade-assurance/api/exceptions.md +++ b/docs/panos-upgrade-assurance/api/exceptions.md @@ -28,10 +28,19 @@ module. Parent class for all exceptions coming from [Utils](/panos/docs/panos-upgrade-assurance/api/utils) module. +## class `WrongNumberOfArgumentsException` + +Thrown when [FirewallProxy](/panos/docs/panos-upgrade-assurance/api/firewall_proxy) constructor is given wrong number or +set of arguments. + ## class `CommandRunFailedException` Used when a command run on a device does not return the `success` status. +## class `GetXpathConfigFailedException` + +Used when XAPI does not return a `success` state when running a `get` operation. + ## class `MalformedResponseException` A generic exception class used when a response does not meet the expected standards. diff --git a/docs/panos-upgrade-assurance/api/firewall_proxy.md b/docs/panos-upgrade-assurance/api/firewall_proxy.md index 21b642a..34ea8cb 100644 --- a/docs/panos-upgrade-assurance/api/firewall_proxy.md +++ b/docs/panos-upgrade-assurance/api/firewall_proxy.md @@ -7,21 +7,73 @@ custom_edit_url: null --- ## class `FirewallProxy` -Class representing a Firewall. +A proxy to the [Firewall][fw] class. -Proxy in this class means that it is between the *high level* -[`CheckFirewall`](/panos/docs/panos-upgrade-assurance/api/check_firewall#class-checkfirewall) class and a device itself. -Inherits the [Firewall][fw] class but adds methods to interpret XML API commands. The class constructor is also inherited -from the [Firewall][fw] class. +Proxy in this case means that this class is between the *high level* +[`CheckFirewall`](/panos/docs/panos-upgrade-assurance/api/check_firewall#class-checkfirewall) class and the +[`Firewall`][fw] class representing the device itself. +There is no inheritance between the [`Firewall`][fw] and [`FirewallProxy`][fwp] classes, but an object of the latter one +has access to all attributes of the former. All interaction with a device are read-only. Therefore, a less privileged user can be used. -All methods starting with `is_` check the state, they do not present any data besides simple `boolean`values. +All methods starting with `is_` check the state, they do not present any data besides simple `boolean` values. All methods starting with `get_` fetch data from a device by running a command and parsing the output. The return data type can be different depending on what kind of information is returned from a device. [fw]: https://pan-os-python.readthedocs.io/en/latest/module-firewall.html#module-panos.firewall +[fwp]: /panos/docs/panos-upgrade-assurance/api/firewall_proxy + +__Attributes__ + + +- `_fw (Firewall)`: an object of the [`Firewall`][fw] class. + +### `FirewallProxy.__init__` + +```python +def __init__(firewall: Optional[Firewall] = None, **kwargs) +``` + +Constructor of the [`FirewallProxy`][fwp] class. + +Main purpose of this constructor is to store an object of the [`Firewall`][fw] class. This can be done in two ways: + +1. by passing an existing object +1. by passing credentials and address of a device (all parameters used byt the [`Firewall`][fw] class constructor + are supported). + +:::tip +Please note that positional arguments are not supported. +::: + +__Parameters__ + + +- __firewall__ (`Firewall`): An existing object of the [`Firewall`][fw] class. +- __**kwargs__: Used to pass keyword arguments that will be used directly in the [`Firewall`][fw] class constructor. + +__Raises__ + + +- `WrongNumberOfArgumentsException`: Raised when a mixture of arguments is passed (for example a [`Firewall`][fw] + object and firewall credentials). + +### `FirewallProxy.__getattr__` + +```python +def __getattr__(attr) +``` + +An overload of the default `__getattr__()` method. + +Its main purpose is to provide backwards compatibility to the old [`FirewallProxy`][fwp] class structure. It's called +when a requested attribute does not exist in the [`FirewallProxy`][fwp] class object, and it tries to fetch it +from the [`Firewall`][fw] object stored within the [`FirewallProxy`][fwp] object. + +From the [`FirewallProxy`][fwp] object's interface perspective, this provides the same behaviour as if the +[`FirewallProxy`][fwp] would still inherit from the [`Firewall`][fw] class. ### `FirewallProxy.op_parser` @@ -35,8 +87,7 @@ Execute a command on node, parse, and return response. This is just a wrapper around the [`Firewall.op()`](https://pan-os-python.readthedocs.io/en/latest/module-firewall.html#panos.firewall.Firewall.op) method. -It additionally does basic error handling and tries to extract the actual device -response. +It additionally does basic error handling and tries to extract the actual device response. __Parameters__ @@ -57,6 +108,36 @@ __Returns__ `dict, xml.etree.ElementTree.Element`: The actual command output. A type is defined by the `return_xml` parameter. +### `FirewallProxy.get_parser` + +```python +def get_parser(xml_path: str, + return_xml: Optional[bool] = False) -> Union[dict, ET.Element] +``` + +Execute a configuration get command on a node, parse and return response. + +This is a wrapper around the +[`pan.xapi.get()` method](https://github.com/kevinsteves/pan-python/blob/master/doc/pan.xapi.rst#getxpathnone) +from the [`pan-python` package](https://pypi.org/project/pan-python/). +It does a basic error handling and tries to extract the actual response. + +__Parameters__ + + +- __xml_path__ (`str`): An XPATH pointing to the config to be retrieved. +- __return_xml__ (`bool`): (defaults to `False`) When set to `True`, the return data is an [`XML object`](https://docs.python.org/3/library/xml.etree.elementtree.html#xml.etree.ElementTree.Element) + instead of a Python dictionary. + +__Raises__ + + +- `GetXpathConfigFailedException`: This exception is raised when XPATH is not provided or does not exist. + +__Returns__ + +`dict, xml.etree.ElementTree.Element`: The actual command output. A type is defined by the `return_xml` parameter. + ### `FirewallProxy.is_pending_changes` ```python @@ -854,7 +935,7 @@ __Returns__ ### `FirewallProxy.get_mp_clock` ```python -def get_mp_clock() -> dict +def get_mp_clock() -> datetime ``` Get the clock information from management plane. @@ -864,18 +945,7 @@ The actual API command is `show clock`. __Returns__ -`dict`: The clock information represented as a dictionary. - -```python showLineNumbers title="Sample output" -{ - 'time': '00:41:36', - 'tz': 'PDT', - 'day': '19', - 'month': 'Apr', - 'year': '2023', - 'day_of_week': 'Wed' -} -``` +`datetime`: The clock information represented as a `datetime` object. ### `FirewallProxy.get_dp_clock` @@ -890,18 +960,7 @@ The actual API command is `show clock more`. __Returns__ -`dict`: The clock information represented as a dictionary. - -```python showLineNumbers title="Sample output" -{ - 'time': '00:41:36', - 'tz': 'PDT', - 'day': '19', - 'month': 'Apr', - 'year': '2023', - 'day_of_week': 'Wed' -} -``` +`datetime`: The clock information represented as a `datetime` object. ### `FirewallProxy.get_certificates` @@ -956,6 +1015,58 @@ __Returns__ } ``` +### `FirewallProxy.get_update_schedules` + +```python +def get_update_schedules() -> dict +``` + +Get schedules for all dynamic updates. + +This method gets scheduled dynamic updates on a device. This includes the ones pushed from Panorama, +but it does not include the ones configured via `Panorama/Device Deployment/Dynamic Updates/Schedules`. + +The actual XMLAPI command run here is `config/get` with XPATH set to +`/config/devices/entry[@name='localhost.localdomain']/deviceconfig/system/update-schedule`. + +__Returns__ + + +`dict`: All dynamic updates schedules, key is the entity type to update, like: threats, wildfire, etc. + +```python showLineNumbers title="Sample output, showing values coming from Panorama" +{'@ptpl': 'lab', +'@src': 'tpl', +'anti-virus': {'@ptpl': 'lab', + '@src': 'tpl', + 'recurring': {'@ptpl': 'lab', + '@src': 'tpl', + 'hourly': {'@ptpl': 'lab', + '@src': 'tpl', + 'action': {'`text`': 'download-and-install', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'at': {'`text`': '0', + '@ptpl': 'lab', + '@src': 'tpl'}}}}, +'global-protect-clientless-vpn': {'@ptpl': 'lab', + '@src': 'tpl', + 'recurring': {'@ptpl': 'lab', + '@src': 'tpl', + 'weekly': {'@ptpl': 'lab', + '@src': 'tpl', + 'action': {'`text`': 'download-only', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'at': {'`text`': '20:00', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'day-of-week': {'`text`': 'wednesday', + '@ptpl': 'lab', + '@src': 'tpl'}}}} +} +``` + ### `FirewallProxy.get_jobs` ```python @@ -1024,3 +1135,31 @@ __Returns__ 'warnings': None}} ``` +### `FirewallProxy.get_system_state` + +```python +def get_system_state() -> Dict[str, str] +``` + +Gets the entire output of the show system state command. + +Show system state returns low level information about PAN-OS and the attributes of the system. Note that this function +does not parse the data structures beyond the first level. + +The actual API command is `show system state`. + +__Returns__ + + +`dict`: Each item from the state, where the key is the item key and the value is the value as a string. + +```python showLineNumbers title="Sample output" +{ + local.name: "mp" + local.octeon: "{ }" + local.ppid: "0" + local.role: "mp" + local.slot: "1" +} +``` + diff --git a/docs/panos-upgrade-assurance/configuration_details.mdx b/docs/panos-upgrade-assurance/configuration_details.mdx index 2913773..2f37841 100644 --- a/docs/panos-upgrade-assurance/configuration_details.mdx +++ b/docs/panos-upgrade-assurance/configuration_details.mdx @@ -71,7 +71,7 @@ Elements of this list can be either of the `str` or `dict` type: ```mdx-code-block - + ``` ```yaml @@ -106,7 +106,7 @@ checks_configuration = [ ```mdx-code-block - + ``` ```yaml @@ -151,7 +151,7 @@ The [`CheckFirewall.run_readiness_checks()`](/panos/docs/panos-upgrade-assurance ```mdx-code-block - + ``` ```yaml @@ -187,7 +187,7 @@ The [`CheckFirewall.run_readiness_checks()`](/panos/docs/panos-upgrade-assurance ```mdx-code-block - + ``` ```yaml @@ -223,7 +223,7 @@ The [`CheckFirewall.run_readiness_checks()`](/panos/docs/panos-upgrade-assurance ```mdx-code-block - + ``` ```yaml @@ -270,6 +270,7 @@ checks_configuration = [ } }, {'content_version': {'version': '8634-7678'}}, + {'dynamic_updates': {'test_window': 120}}, {"expired_licenses": {"skip_licenses": ["Threat Prevention"]}}, {'free_disk_space': {'image_version': '10.1.6-h6'}}, {'ha': {'skip_config_sync': True}}, @@ -289,7 +290,7 @@ checks_configuration = [ ```mdx-code-block - + ``` ```yaml @@ -309,6 +310,8 @@ checks_configuration: hash_method: "sha1" - content_version: version: "8634-7678" + - dynamic_updates: + test_window: 120 - expired_licenses: skip_licenses: - "Threat Prevention" @@ -379,7 +382,7 @@ checks_configuration = [ ```mdx-code-block - + ``` ```yaml showLineNumbers title="Lookup limited to a single interface" @@ -454,7 +457,7 @@ checks_configuration = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -507,7 +510,7 @@ checks_configuration = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -522,6 +525,57 @@ checks_configuration: ``` +### `dynamic_updates` + +Check if any Dynamic Update job is scheduled to run within the specified time window. + +:::note +This includes schedules pushed from Panorama via a template, but does not include the ones configured +in `Panorama/Device Deployment/Dynamic Updates/Schedules`. +::: + +**Method:** [`CheckFirewall.check_scheduled_updates()`](/panos/docs/panos-upgrade-assurance/api/check_firewall#checkfirewallcheck_scheduled_updates) + +**Configuration parameters** + +paramter | description +--- | --- +`test_window` | (optional) time window in minutes to look for an update job occurrence + +**Sample configuration** + +```mdx-code-block + + +``` + +```python showLineNumbers +checks_configuration = [ + { + 'dynamic_updates': { + 'test_window': 120 + } + } +] +``` + +```mdx-code-block + + +``` + +```yaml showLineNumbers +checks_configuration: + - dynamic_updates: + test_window: 120 +``` + +```mdx-code-block + + +``` + + ### `free_disk_space` Checks if there is enough free space on the `/opt/panrepo` volume to download a PanOS image before an upgrade. @@ -553,7 +607,7 @@ checks_configuration = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -598,7 +652,7 @@ checks_configuration = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -646,7 +700,7 @@ checks_configuration = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -692,7 +746,7 @@ checks_configuration = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -762,7 +816,7 @@ checks_configuration = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -811,7 +865,7 @@ checks_configuration = [ ```mdx-code-block - + ``` ```yaml @@ -917,7 +971,7 @@ snapshots_config = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -1022,7 +1076,7 @@ reports = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -1100,7 +1154,7 @@ reports = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -1160,7 +1214,7 @@ reports = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -1213,7 +1267,7 @@ reports = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -1265,7 +1319,7 @@ reports = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -1323,7 +1377,7 @@ reports = [ ```mdx-code-block - + ``` ```yaml showLineNumbers @@ -1379,7 +1433,7 @@ reports = [ ```mdx-code-block - + ``` ```yaml showLineNumbers diff --git a/examples/low_level_methods/run_low_level_methods.py b/examples/low_level_methods/run_low_level_methods.py index 5d529fc..84e95fc 100755 --- a/examples/low_level_methods/run_low_level_methods.py +++ b/examples/low_level_methods/run_low_level_methods.py @@ -4,6 +4,7 @@ from panos.panorama import Panorama from argparse import ArgumentParser from getpass import getpass +from pprint import pprint if __name__ == "__main__": argparser = ArgumentParser( @@ -117,6 +118,8 @@ print(f"\n certificates: {firewall.get_certificates()}") + print(f"\n dynamic schedules: {firewall.get_update_schedules()}") + print(f"\n jobs: {firewall.get_jobs()}") diff --git a/examples/readiness_checks/run_readiness_checks.py b/examples/readiness_checks/run_readiness_checks.py index 047596d..bb69389 100755 --- a/examples/readiness_checks/run_readiness_checks.py +++ b/examples/readiness_checks/run_readiness_checks.py @@ -85,6 +85,7 @@ "candidate_config", "active_support", "jobs", + "unsupported_transceivers", # checks below have optional configuration {"ha": {"skip_config_sync": True, "ignore_non_functional": True}}, {"content_version": {"version": "8635-7675"}}, @@ -102,6 +103,7 @@ } } }, + {"dynamic_updates": {"test_window": 500}}, # checks below require additional configuration { "session_exist": { diff --git a/panos_upgrade_assurance/__init__.py b/panos_upgrade_assurance/__init__.py index e69de29..59ab523 100644 --- a/panos_upgrade_assurance/__init__.py +++ b/panos_upgrade_assurance/__init__.py @@ -0,0 +1,3 @@ +from importlib import metadata + +__version__ = metadata.version(__package__) diff --git a/panos_upgrade_assurance/check_firewall.py b/panos_upgrade_assurance/check_firewall.py index ce92999..53d06af 100644 --- a/panos_upgrade_assurance/check_firewall.py +++ b/panos_upgrade_assurance/check_firewall.py @@ -1,7 +1,9 @@ +import re from typing import Optional, Union, List, Dict -from math import ceil -from datetime import datetime +from math import ceil, floor +from datetime import datetime, timedelta import locale +import time from panos_upgrade_assurance.utils import ( CheckResult, @@ -94,7 +96,9 @@ def __init__(self, node: FirewallProxy, skip_force_locale: Optional[bool] = Fals CheckType.FREE_DISK_SPACE: self.check_free_disk_space, CheckType.MP_DP_CLOCK_SYNC: self.check_mp_dp_sync, CheckType.CERTS: self.check_ssl_cert_requirements, + CheckType.UPDATES: self.check_scheduled_updates, CheckType.JOBS: self.check_non_finished_jobs, + CheckType.UNSUPPORTED_TRANSCEIVERS: self.check_unsupported_transceivers, } if not skip_force_locale: locale.setlocale( @@ -699,7 +703,7 @@ def check_free_disk_space(self, image_version: Optional[str] = None) -> CheckRes if free_space_panrepo > minimum_free_space: result.status = CheckStatus.SUCCESS else: - result.reason = f"There is not enough free space, only {str(round(free_space_panrepo/1024,1)) + 'G' if free_space_panrepo >= 1024 else str(free_space_panrepo) + 'M'}B is available." + result.reason = f"There is not enough free space, only {str(round(free_space_panrepo / 1024, 1)) + 'G' if free_space_panrepo >= 1024 else str(free_space_panrepo) + 'M'}B is available." return result def check_mp_dp_sync(self, diff_threshold: int = 0) -> CheckResult: @@ -733,16 +737,7 @@ def check_mp_dp_sync(self, diff_threshold: int = 0) -> CheckResult: mp_clock = self._node.get_mp_clock() dp_clock = self._node.get_dp_clock() - mp_dt = datetime.strptime( - f"{mp_clock['year']}-{mp_clock['month']}-{mp_clock['day']} {mp_clock['time']}", - "%Y-%b-%d %H:%M:%S", - ) - dp_dt = datetime.strptime( - f"{dp_clock['year']}-{dp_clock['month']}-{dp_clock['day']} {dp_clock['time']}", - "%Y-%b-%d %H:%M:%S", - ) - - time_fluctuation = abs((mp_dt - dp_dt).total_seconds()) + time_fluctuation = abs((mp_clock - dp_clock).total_seconds()) if time_fluctuation > diff_threshold: result.reason = f"The data plane clock and management clock are different by {time_fluctuation} seconds." else: @@ -871,6 +866,177 @@ def check_ssl_cert_requirements(self, rsa: dict = {}, ecdsa: dict = {}) -> Check result.status = CheckStatus.SUCCESS return result + def _calculate_schedule_time_diff(self, now_dt: datetime, schedule_type: str, schedule: dict) -> (int, str): + """A method that calculates the time distance between two `datetime` objects. + + :::note + This method is used only by [`CheckFirewall.check_scheduled_updates()`](#checkfirewallcheck_scheduled_updates) method and it expects some information + to be already available. + ::: + + # Parameters + + now_dt (datetime): A `datetime` object representing the current moment in time. Ideally this should be the device's local + time, taken from the management plane clock. + schedule_type (str): A schedule type returned by PanOS, can be one of: `every-*`, `hourly`, `daily`, `weekly`, + `real-time`. + schedule (dict): Value of the `recurring` key in the API response, see + [`FirewallProxy.get_update_schedules()`](/panos/docs/panos-upgrade-assurance/api/firewall_proxy#firewallproxyget_update_schedules) + documentation for details. Both formats (locally configured and pushed from a Panorama template) are supported. + + # Raises + + MalformedResponseException: Thrown then the `schedule_type` is not recognizable. + + # Returns + + tuple(int, str): A tuple containing the calculated time difference (in minutes) and human-readable description. + + """ + time_distance = 0 + details = "unsupported schedule type" + + if schedule_type == "daily": + occurrence = schedule["at"] if isinstance(schedule["at"], str) else schedule["at"]["#text"] + next_occurrence = datetime.strptime(f"{str(now_dt.date())} {occurrence}", "%Y-%m-%d %H:%M") + + if now_dt > next_occurrence: + next_occurrence = next_occurrence + timedelta(days=1) + diff = next_occurrence - now_dt + time_distance = floor(diff.total_seconds() / 60) + details = f"at {next_occurrence.time()}" + + elif schedule_type == "hourly": + time_distance = 60 + details = "every hour" + elif schedule_type == "weekly": + occurrence_time = schedule["at"] if isinstance(schedule["at"], str) else schedule["at"]["#text"] + occurrence_day = ( + schedule["day-of-week"] if isinstance(schedule["day-of-week"], str) else schedule["day-of-week"]["#text"] + ) + occurrence_wday = time.strptime(occurrence_day, "%A").tm_wday + now_wday = now_dt.weekday() + + diff_days = (0 if occurrence_wday >= now_wday else 7) + occurrence_wday - now_wday + next_occurrence_date = (now_dt + timedelta(days=diff_days)).date() + next_occurrence = datetime.strptime(f"{str(next_occurrence_date)} {occurrence_time}", "%Y-%m-%d %H:%M") + + if now_dt > next_occurrence: + next_occurrence = next_occurrence + timedelta(days=7) + diff = next_occurrence - now_dt + time_distance = floor(diff.total_seconds() / 60) + details = f"in {str(diff).split('.')[0]}" + + elif schedule_type.split("-")[0] == "every": + if schedule_type.split("-")[1] == "min": + time_distance = 1 + details = "every minute" + elif schedule_type.split("-")[1] == "hour": + time_distance = 60 + details = "every hour" + elif schedule_type.split("-")[1].isnumeric(): + time_distance = int(schedule_type.split("-")[1]) + details = f"every {time_distance} minutes" + else: + raise exceptions.MalformedResponseException(f"Unknown schedule type: {schedule_type}.") + elif schedule_type == "real-time": + details = "unpredictable (real-time)" + else: + raise exceptions.MalformedResponseException(f"Unknown schedule type: {schedule_type}.") + + return time_distance, details + + def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: + """Check if any Dynamic Update job is scheduled to run within the specified time window. + + When device is configured via Panorama, this includes schedules set up in Templates. It does not however include schedules + configured in `Panorama/Device Deployment/Dynamic Updates/Schedules`. + + # Parameters + + test_window (int, optional): (defaults to 60 minutes). A time window in minutes to look for an update job occurrence. + Has to be a value between `60` and `10080` (1 week equivalent). The time window is calculated based on the device's + local time (taken from the management plane). + + # Raises + + MalformedResponseException: Thrown in case API response does not meet expectations. + + # Returns + + CheckResult: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking \ + value of: + + * [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when there is no update job + planned within the test window. + * [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` + field contains information about the planned jobs with next occurrence time provided if possible. + * [`CheckStatus.ERROR`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when the `test_window` parameter + does not meet criteria. + + """ + if not isinstance(test_window, int): + raise exceptions.WrongDataTypeException( + f"The test_windows parameter should be of type , got {type(test_window)} instead." + ) + + result = CheckResult() + + mp_now = self._node.get_mp_clock() + + schedules = self._node.get_update_schedules() + if not schedules: + result.status = CheckStatus.SKIPPED + result.reason = "No scheduled job present on the device." + return result + + if test_window < 60: + result.status = CheckStatus.ERROR + result.reason = "Schedules test window is below the supported, safe minimum of 60 minutes." + return result + if test_window > 10080: + result.status = CheckStatus.ERROR + result.reason = "Schedules test window is set to over 1 week. This test will always fail." + return result + + schedules_in_window = [] + for name, schedule in schedules.items(): + # config can come from a Template, it will have some additional keys starting with '@' + # that we would like to skip + if "@" not in name: + if "recurring" not in schedule.keys(): + raise exceptions.MalformedResponseException( + f"Schedule {name} has malformed configuration, missing a schedule.." + ) + + schedule_details = schedule["recurring"] + + # let's get rid of all keys that are not related to a schedule + for k in list(schedule_details.keys()): + if k in ["sync-to-peer", "threshold"] or k.startswith("@"): + schedule_details.pop(k) + + # we now should have a single element dict + if len(schedule_details) != 1: + raise exceptions.MalformedResponseException(f"Schedule {name} has malformed configuration: {schedule}") + + if "none" not in schedule_details: + time_distance, details = self._calculate_schedule_time_diff( + now_dt=mp_now, + schedule_type=next(iter(schedule_details.keys())), + schedule=next(iter(schedule_details.values())), + ) + if time_distance <= test_window: + schedules_in_window.append(f"{name} ({details})") + + if schedules_in_window: + result.reason = f"Following schedules fall into test window: {', '.join(schedules_in_window)}." + return result + + result.status = CheckStatus.SUCCESS + + return result + def check_non_finished_jobs(self) -> CheckResult: """Check for any job with status different than FIN. @@ -903,6 +1069,57 @@ def check_non_finished_jobs(self) -> CheckResult: result.reason = "No jobs found on device. This is unusual, please investigate." return result + def check_unsupported_transceivers(self, supported_sfp_regex: Optional[List[str]] = None) -> CheckResult: + """Check for any Optical Transceivers (SFPs or otherwise) that aren't supported by Palo Alto Networks. + + # Parameters + + supported_sfp_regex (list, optional): List of supported transceivers, as regex strings, to mark SFP's as + supported even if they aren't OEM. + + # Returns + + CheckResult: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking \ + value of: + + * [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) When all optics are OEM and + PAN supported + * [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` + field contains information about which Slots and Physical ports currently have unsupported transceivers installed + * [`CheckStatus.SKIPPED`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when there are no transceiver + slots at all. + """ + result = CheckResult() + + system_state = self._node.get_system_state() + + compiled_regex = [] + if supported_sfp_regex: + compiled_regex = [re.compile(regex_string) for regex_string in supported_sfp_regex] + + no_sfp_interfaces = True + bad_interfaces = [] + for key, value in system_state.items(): + if re.match(r"sys\.s[0-9]+\.p[0-9]+\.phy", key): + if "'sfp':" in value and "'vendor-name': OEM" not in value: + if not any(regex.search(value) for regex in compiled_regex): + bad_interfaces.append(key) + + if "'sfp'" in value: + no_sfp_interfaces = False + + if bad_interfaces: + result.reason = f"The following interfaces have non-Palo Alto Networks supported transceivers installed: {', '.join(bad_interfaces)}" + return result + + if no_sfp_interfaces: + result.status = CheckStatus.SKIPPED + result.reason = "No SFP Interfaces were found, or no SFP Transceivers were present in the system." + return result + + result.status = CheckStatus.SUCCESS + return result + def get_content_db_version(self) -> Dict[str, str]: """Get Content DB version. diff --git a/panos_upgrade_assurance/exceptions.py b/panos_upgrade_assurance/exceptions.py index c7f02cd..893e3ac 100644 --- a/panos_upgrade_assurance/exceptions.py +++ b/panos_upgrade_assurance/exceptions.py @@ -31,12 +31,26 @@ class UtilsException(Exception): pass +class WrongNumberOfArgumentsException(FirewallProxyException): + """Thrown when [FirewallProxy](/panos/docs/panos-upgrade-assurance/api/firewall_proxy) constructor is given wrong number or + set of arguments. + """ + + pass + + class CommandRunFailedException(FirewallProxyException): """Used when a command run on a device does not return the `success` status.""" pass +class GetXpathConfigFailedException(FirewallProxyException): + """Used when XAPI does not return a `success` state when running a `get` operation.""" + + pass + + class MalformedResponseException(FirewallProxyException): """A generic exception class used when a response does not meet the expected standards.""" diff --git a/panos_upgrade_assurance/firewall_proxy.py b/panos_upgrade_assurance/firewall_proxy.py index 35a9c7c..f083148 100644 --- a/panos_upgrade_assurance/firewall_proxy.py +++ b/panos_upgrade_assurance/firewall_proxy.py @@ -1,31 +1,83 @@ import xml.etree.ElementTree as ET from panos_upgrade_assurance.utils import interpret_yes_no from xmltodict import parse as XMLParse -from typing import Optional, Union +from typing import Optional, Union, Dict from panos.firewall import Firewall from pan.xapi import PanXapiError from panos_upgrade_assurance import exceptions from math import floor +from datetime import datetime -class FirewallProxy(Firewall): - """Class representing a Firewall. +class FirewallProxy: + """A proxy to the [Firewall][fw] class. - Proxy in this class means that it is between the *high level* - [`CheckFirewall`](/panos/docs/panos-upgrade-assurance/api/check_firewall#class-checkfirewall) class and a device itself. - Inherits the [Firewall][fw] class but adds methods to interpret XML API commands. The class constructor is also inherited - from the [Firewall][fw] class. + Proxy in this case means that this class is between the *high level* + [`CheckFirewall`](/panos/docs/panos-upgrade-assurance/api/check_firewall#class-checkfirewall) class and the + [`Firewall`][fw] class representing the device itself. + There is no inheritance between the [`Firewall`][fw] and [`FirewallProxy`][fwp] classes, but an object of the latter one + has access to all attributes of the former. All interaction with a device are read-only. Therefore, a less privileged user can be used. - All methods starting with `is_` check the state, they do not present any data besides simple `boolean`values. + All methods starting with `is_` check the state, they do not present any data besides simple `boolean` values. All methods starting with `get_` fetch data from a device by running a command and parsing the output. The return data type can be different depending on what kind of information is returned from a device. [fw]: https://pan-os-python.readthedocs.io/en/latest/module-firewall.html#module-panos.firewall + [fwp]: /panos/docs/panos-upgrade-assurance/api/firewall_proxy + + # Attributes + + _fw (Firewall): an object of the [`Firewall`][fw] class. + """ + def __init__(self, firewall: Optional[Firewall] = None, **kwargs): + """Constructor of the [`FirewallProxy`][fwp] class. + + Main purpose of this constructor is to store an object of the [`Firewall`][fw] class. This can be done in two ways: + + 1. by passing an existing object + 1. by passing credentials and address of a device (all parameters used byt the [`Firewall`][fw] class constructor + are supported). + + :::tip + Please note that positional arguments are not supported. + ::: + + # Parameters + + firewall (Firewall): An existing object of the [`Firewall`][fw] class. + **kwargs: Used to pass keyword arguments that will be used directly in the [`Firewall`][fw] class constructor. + + # Raises + + WrongNumberOfArgumentsException: Raised when a mixture of arguments is passed (for example a [`Firewall`][fw] + object and firewall credentials). + + """ + if firewall and len(kwargs) > 0: + raise exceptions.WrongNumberOfArgumentsException( + "You cannot pass the Firewall object and the credentials at the same time." + ) + + self._fw = firewall if firewall else Firewall(**kwargs) + + def __getattr__(self, attr): + """An overload of the default `__getattr__()` method. + + Its main purpose is to provide backwards compatibility to the old [`FirewallProxy`][fwp] class structure. It's called + when a requested attribute does not exist in the [`FirewallProxy`][fwp] class object, and it tries to fetch it + from the [`Firewall`][fw] object stored within the [`FirewallProxy`][fwp] object. + + From the [`FirewallProxy`][fwp] object's interface perspective, this provides the same behaviour as if the + [`FirewallProxy`][fwp] would still inherit from the [`Firewall`][fw] class. + + """ + return getattr(self._fw, attr) + def op_parser( self, cmd: str, @@ -36,8 +88,7 @@ def op_parser( This is just a wrapper around the [`Firewall.op()`](https://pan-os-python.readthedocs.io/en/latest/module-firewall.html#panos.firewall.Firewall.op) method. - It additionally does basic error handling and tries to extract the actual device - response. + It additionally does basic error handling and tries to extract the actual device response. # Parameters @@ -71,6 +122,47 @@ def op_parser( return resp_result + def get_parser(self, xml_path: str, return_xml: Optional[bool] = False) -> Union[dict, ET.Element]: + """Execute a configuration get command on a node, parse and return response. + + This is a wrapper around the + [`pan.xapi.get()` method](https://github.com/kevinsteves/pan-python/blob/master/doc/pan.xapi.rst#getxpathnone) + from the [`pan-python` package](https://pypi.org/project/pan-python/). + It does a basic error handling and tries to extract the actual response. + + # Parameters + + xml_path (str): An XPATH pointing to the config to be retrieved. + return_xml (bool): (defaults to `False`) When set to `True`, the return data is an \ + [`XML object`](https://docs.python.org/3/library/xml.etree.elementtree.html#xml.etree.ElementTree.Element) + instead of a Python dictionary. + + # Raises + + GetXpathConfigFailedException: This exception is raised when XPATH is not provided or does not exist. + + # Returns + dict, xml.etree.ElementTree.Element: The actual command output. A type is defined by the `return_xml` parameter. + + """ + if xml_path is None: + raise exceptions.GetXpathConfigFailedException("No XPATH provided.") + + raw_response = self.xapi.get(xml_path) + if raw_response.get("status") != "success": + raise exceptions.GetXpathConfigFailedException( + f'Failed get data under XPATH: {xml_path}, status: {raw_response.get("status")}.' + ) + + resp_result = raw_response.find("result") + if resp_result is None: + raise exceptions.GetXpathConfigFailedException(f"No data found under XPATH: {xml_path}, or path does not exist.") + + if not return_xml: + resp_result = XMLParse(ET.tostring(resp_result, encoding="utf8", method="xml"))["result"] + + return resp_result + def is_pending_changes(self) -> bool: """Get information if there is a candidate configuration pending to be committed. @@ -963,39 +1055,32 @@ def get_available_image_data(self) -> dict: return result - def get_mp_clock(self) -> dict: + def get_mp_clock(self) -> datetime: """Get the clock information from management plane. The actual API command is `show clock`. # Returns - dict: The clock information represented as a dictionary. - - ```python showLineNumbers title="Sample output" - { - 'time': '00:41:36', - 'tz': 'PDT', - 'day': '19', - 'month': 'Apr', - 'year': '2023', - 'day_of_week': 'Wed' - } - ``` + datetime: The clock information represented as a `datetime` object. """ time_string = self.op_parser(cmd="show clock") - time_dict = time_string.split() - result = { - "time": time_dict[3], - "tz": time_dict[4], - "day": time_dict[2], - "month": time_dict[1], - "year": time_dict[5], - "day_of_week": time_dict[0], + time_parsed = time_string.split() + time_dict = { + "time": time_parsed[3], + "tz": time_parsed[4], + "day": time_parsed[2], + "month": time_parsed[1], + "year": time_parsed[5], + "day_of_week": time_parsed[0], } + dt = datetime.strptime( + f"{time_dict['year']}-{time_dict['month']}-{time_dict['day']} {time_dict['time']}", + "%Y-%b-%d %H:%M:%S", + ) - return result + return dt def get_dp_clock(self) -> dict: """Get the clock information from data plane. @@ -1004,33 +1089,26 @@ def get_dp_clock(self) -> dict: # Returns - dict: The clock information represented as a dictionary. - - ```python showLineNumbers title="Sample output" - { - 'time': '00:41:36', - 'tz': 'PDT', - 'day': '19', - 'month': 'Apr', - 'year': '2023', - 'day_of_week': 'Wed' - } - ``` + datetime: The clock information represented as a `datetime` object. """ response = self.op_parser(cmd="show clock more") time_string = dict(response)["member"] - time_dict = time_string.split() - result = { - "time": time_dict[5], - "tz": time_dict[6], - "day": time_dict[4], - "month": time_dict[3], - "year": time_dict[7], - "day_of_week": time_dict[2], + time_parsed = time_string.split() + time_dict = { + "time": time_parsed[5], + "tz": time_parsed[6], + "day": time_parsed[4], + "month": time_parsed[3], + "year": time_parsed[7], + "day_of_week": time_parsed[2], } + dt = datetime.strptime( + f"{time_dict['year']}-{time_dict['month']}-{time_dict['day']} {time_dict['time']}", + "%Y-%b-%d %H:%M:%S", + ) - return result + return dt def get_certificates(self) -> dict: """Get information about certificates installed on a device. @@ -1094,6 +1172,61 @@ def get_certificates(self) -> dict: return result + def get_update_schedules(self) -> dict: + """Get schedules for all dynamic updates. + + This method gets scheduled dynamic updates on a device. This includes the ones pushed from Panorama, + but it does not include the ones configured via `Panorama/Device Deployment/Dynamic Updates/Schedules`. + + The actual XMLAPI command run here is `config/get` with XPATH set to + `/config/devices/entry[@name='localhost.localdomain']/deviceconfig/system/update-schedule`. + + # Returns + + dict: All dynamic updates schedules, key is the entity type to update, like: threats, wildfire, etc. + + ```python showLineNumbers title="Sample output, showing values coming from Panorama" + {'@ptpl': 'lab', + '@src': 'tpl', + 'anti-virus': {'@ptpl': 'lab', + '@src': 'tpl', + 'recurring': {'@ptpl': 'lab', + '@src': 'tpl', + 'hourly': {'@ptpl': 'lab', + '@src': 'tpl', + 'action': {'#text': 'download-and-install', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'at': {'#text': '0', + '@ptpl': 'lab', + '@src': 'tpl'}}}}, + 'global-protect-clientless-vpn': {'@ptpl': 'lab', + '@src': 'tpl', + 'recurring': {'@ptpl': 'lab', + '@src': 'tpl', + 'weekly': {'@ptpl': 'lab', + '@src': 'tpl', + 'action': {'#text': 'download-only', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'at': {'#text': '20:00', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'day-of-week': {'#text': 'wednesday', + '@ptpl': 'lab', + '@src': 'tpl'}}}} + } + ``` + + """ + schedules = self.get_parser( + xml_path="/config/devices/entry[@name='localhost.localdomain']/deviceconfig/system/update-schedule" + ) + if schedules is None or "update-schedule" not in schedules: + return {} + + return schedules["update-schedule"] + def get_jobs(self) -> dict: """Get details on all jobs. @@ -1167,3 +1300,34 @@ def get_jobs(self) -> dict: results[jid] = job return results + + def get_system_state(self) -> Dict[str, str]: + """Gets the entire output of the show system state command. + + Show system state returns low level information about PAN-OS and the attributes of the system. Note that this function + does not parse the data structures beyond the first level. + + The actual API command is `show system state`. + + # Returns + + dict: Each item from the state, where the key is the item key and the value is the value as a string. + + ```python showLineNumbers title="Sample output" + { + local.name: "mp" + local.octeon: "{ }" + local.ppid: "0" + local.role: "mp" + local.slot: "1" + } + ``` + """ + result = {} + show_system_state_str = self.op_parser(cmd="show system state", return_xml=True) + for line in show_system_state_str.text.split("\n"): + clean_line = line.strip() + key, value = clean_line.split(":")[0], ":".join(clean_line.split(":")[1:]).strip() + result[key] = value + + return result diff --git a/panos_upgrade_assurance/utils.py b/panos_upgrade_assurance/utils.py index 9c1e0df..ece75e7 100644 --- a/panos_upgrade_assurance/utils.py +++ b/panos_upgrade_assurance/utils.py @@ -28,7 +28,9 @@ class CheckType: FREE_DISK_SPACE = "free_disk_space" MP_DP_CLOCK_SYNC = "planes_clock_sync" CERTS = "certificates_requirements" + UPDATES = "dynamic_updates" JOBS = "jobs" + UNSUPPORTED_TRANSCEIVERS = "unsupported_transceivers" class SnapType: diff --git a/pyproject.toml b/pyproject.toml index 1ac340f..0e8e8b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "panos-upgrade-assurance" -version = "0.1.4" +version = "0.3.0" description = "" authors = ["Palo Alto Networks"] readme = "README.md" @@ -18,15 +18,15 @@ classifiers = [ "Bug Tracker" = "https://github.com/PaloAltoNetworks/pan-os-upgrade-assurance/issues" [tool.poetry.dependencies] -python = "^3.8.1" +python = "^3.8" pan-os-python = "^1.8" pan-python = "^0.17" -xmltodict = "^0.13" +xmltodict = "^0.12" pyopenssl = "^23.2" [tool.poetry.group.dev.dependencies] pydoc-markdown = "^4.6" -flake8 = "^6.0" +flake8 = "^5" black = "^23.3" bandit = "^1.7" flake8-pyproject = "^1.2" diff --git a/tests/test_check_firewall.py b/tests/test_check_firewall.py index 207bc15..587b747 100644 --- a/tests/test_check_firewall.py +++ b/tests/test_check_firewall.py @@ -11,7 +11,9 @@ ContentDBVersionsFormatException, WrongDiskSizeFormatException, UnknownParameterException, + MalformedResponseException, ) +from datetime import datetime @pytest.fixture @@ -429,6 +431,99 @@ def test_check_ntp_synchronization_synched_unknown(self, check_firewall_mock): reason="NTP synchronization in unknown state: unknown." ) + def test_check_unsupported_transceivers_not_oem(self, check_firewall_mock): + check_firewall_mock._node.get_system_state.return_value = { + "": "", + "local.brdagent": "{ }", + "local.family": "vm", + "local.info": "{ 'family': vm, 'model': PA-VM, 'name': mp, 'ppid': 0, 'role': mp, 'slot': 1, }", + "local.model": "PA-VM", + "local.name": "mp", + "local.octeon": "{ }", + "local.ppid": "0", + "local.role": "mp", + "local.slot": "1", + "sys.s5.p1.phy": "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }", + "sys.s6.p2.phy": "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }", + } + assert check_firewall_mock.check_unsupported_transceivers() == CheckResult( + reason="The following interfaces have non-Palo Alto Networks supported transceivers installed: sys.s5.p1.phy, sys.s6.p2.phy", + ) + + def test_check_unsupported_transceivers_custom_supported_regex_match(self, check_firewall_mock): + check_firewall_mock._node.get_system_state.return_value = { + "": "", + "local.brdagent": "{ }", + "local.family": "vm", + "local.info": "{ 'family': vm, 'model': PA-VM, 'name': mp, 'ppid': 0, 'role': mp, 'slot': 1, }", + "local.model": "PA-VM", + "local.name": "mp", + "local.octeon": "{ }", + "local.ppid": "0", + "local.role": "mp", + "local.slot": "1", + "sys.s5.p1.phy": "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }", + "sys.s6.p2.phy": "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }", + } + supported_regex = ["FCBN4"] + assert check_firewall_mock.check_unsupported_transceivers(supported_sfp_regex=supported_regex) == CheckResult( + status=CheckStatus.SUCCESS + ) + + def test_check_unsupported_transceivers_custom_supported_regex_miss(self, check_firewall_mock): + check_firewall_mock._node.get_system_state.return_value = { + "": "", + "local.brdagent": "{ }", + "local.family": "vm", + "local.info": "{ 'family': vm, 'model': PA-VM, 'name': mp, 'ppid': 0, 'role': mp, 'slot': 1, }", + "local.model": "PA-VM", + "local.name": "mp", + "local.octeon": "{ }", + "local.ppid": "0", + "local.role": "mp", + "local.slot": "1", + "sys.s5.p1.phy": "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }", + "sys.s6.p2.phy": "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }", + } + supported_regex = ["BADREGEX"] + assert check_firewall_mock.check_unsupported_transceivers(supported_sfp_regex=supported_regex) == CheckResult( + reason="The following interfaces have non-Palo Alto Networks supported transceivers installed: sys.s5.p1.phy, sys.s6.p2.phy", + ) + + def test_check_unsupported_transceivers_all_supported(self, check_firewall_mock): + check_firewall_mock._node.get_system_state.return_value = { + "": "", + "local.brdagent": "{ }", + "local.family": "vm", + "local.info": "{ 'family': vm, 'model': PA-VM, 'name': mp, 'ppid': 0, 'role': mp, 'slot': 1, }", + "local.model": "PA-VM", + "local.name": "mp", + "local.octeon": "{ }", + "local.ppid": "0", + "local.role": "mp", + "local.slot": "1", + "sys.s5.p1.phy": "{ 'link-partner': { }, 'media': SFP-Plus-Fiber, 'sfp': { 'connector': LC, 'encoding': Reserved, 'identifier': SFP, 'transceiver': 10000B-SR, 'vendor-name': OEM , 'vendor-part-number': PAN-SFP-PLUS-SR , 'vendor-part-rev': B4 , }, 'type': Ethernet, }", + } + assert check_firewall_mock.check_unsupported_transceivers() == CheckResult(status=CheckStatus.SUCCESS) + + def test_check_unsupported_transceivers_no_sfp(self, check_firewall_mock): + check_firewall_mock._node.get_system_state.return_value = { + "": "", + "local.brdagent": "{ }", + "local.family": "vm", + "local.info": "{ 'family': vm, 'model': PA-VM, 'name': mp, 'ppid': 0, 'role': mp, 'slot': 1, }", + "local.model": "PA-VM", + "local.name": "mp", + "local.octeon": "{ }", + "local.ppid": "0", + "local.role": "mp", + "local.slot": "1", + "sys.s4.p20.phy": "{ 'link-partner': { }, 'media': CAT5, 'type': Ethernet, }", + } + assert check_firewall_mock.check_unsupported_transceivers() == CheckResult( + status=CheckStatus.SKIPPED, reason="No SFP Interfaces were found, or no SFP Transceivers were present in the system." + ) + def test_check_arp_entry_none(self, check_firewall_mock): assert check_firewall_mock.check_arp_entry(ip=None) == CheckResult( CheckStatus.SKIPPED, reason="Missing ARP table entry description." @@ -618,44 +713,34 @@ def test_check_mp_dp_sync_wrong_input_data(self, check_firewall_mock): assert str(exception_msg.value) == "[diff_threshold] should be of type [int] but is of type []." def test_check_mp_dp_sync_time_diff(self, check_firewall_mock): - check_firewall_mock._node.get_mp_clock.return_value = { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:50:21", - "tz": "PDT", - "year": "2023", - } - check_firewall_mock._node.get_dp_clock.return_value = { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:52:34", - "tz": "PDT", - "year": "2023", - } + check_firewall_mock._node.get_mp_clock.return_value = datetime.strptime( + "Wed May 31 11:50:21 2023", "%a %b %d %H:%M:%S %Y" + ) + check_firewall_mock._node.get_dp_clock.return_value = datetime.strptime( + "Wed May 31 11:52:34 2023", "%a %b %d %H:%M:%S %Y" + ) assert check_firewall_mock.check_mp_dp_sync(1) == CheckResult( status=CheckStatus.FAIL, reason="The data plane clock and management clock are different by 133.0 seconds." ) + def test_check_mp_dp_sync_time_diff_with_threshold(self, check_firewall_mock): + check_firewall_mock._node.get_mp_clock.return_value = datetime.strptime( + "Wed May 31 11:50:21 2023", "%a %b %d %H:%M:%S %Y" + ) + check_firewall_mock._node.get_dp_clock.return_value = datetime.strptime( + "Wed May 31 11:50:34 2023", "%a %b %d %H:%M:%S %Y" + ) + + assert check_firewall_mock.check_mp_dp_sync(30) == CheckResult(status=CheckStatus.SUCCESS) + def test_check_mp_dp_sync_time_synced(self, check_firewall_mock): - check_firewall_mock._node.get_mp_clock.return_value = { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:50:21", - "tz": "PDT", - "year": "2023", - } - check_firewall_mock._node.get_dp_clock.return_value = { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:50:21", - "tz": "PDT", - "year": "2023", - } + check_firewall_mock._node.get_mp_clock.return_value = datetime.strptime( + "Wed May 31 11:50:21 2023", "%a %b %d %H:%M:%S %Y" + ) + check_firewall_mock._node.get_dp_clock.return_value = datetime.strptime( + "Wed May 31 11:50:21 2023", "%a %b %d %H:%M:%S %Y" + ) assert check_firewall_mock.check_mp_dp_sync(1) == CheckResult(status=CheckStatus.SUCCESS) @@ -909,6 +994,127 @@ def test_check_ssl_cert_requirements_success(self, check_firewall_mock): assert check_firewall_mock.check_ssl_cert_requirements(rsa=rsa, ecdsa=ecdsa) == CheckResult(status=CheckStatus.SUCCESS) + @pytest.mark.parametrize( + "param_now_dts, param_schedule_type, param_schedule, param_time_d, param_details", + [ + ( + "2023-08-07 00:00:00", # this is Monday + "daily", + {"action": "download-and-install", "at": "07:45"}, + 465, + "at 07:45:00", + ), + ("2023-08-07 00:00:00", "hourly", {"action": "download-and-install", "at": "0"}, 60, "every hour"), # this is Monday + ( + "2023-08-07 00:00:00", # this is Monday + "every-5-mins", + {"action": "download-and-install", "at": "1"}, + 5, + "every 5 minutes", + ), + ("2023-08-07 00:00:00", "real-time", None, 0, "unpredictable (real-time)"), # this is Monday + ], + ) + def test__calculate_schedule_time_diff( + self, param_now_dts, param_schedule_type, param_schedule, param_time_d, param_details, check_firewall_mock + ): + mock_now_dt = datetime.strptime(param_now_dts, "%Y-%m-%d %H:%M:%S") + + time_delta, delta_reason = check_firewall_mock._calculate_schedule_time_diff( + mock_now_dt, param_schedule_type, param_schedule + ) + + assert time_delta == param_time_d + assert delta_reason == param_details + + @pytest.mark.parametrize("param_schedule_type", ["every-something", "something"]) + def test__calculate_schedule_time_diff_exception(self, param_schedule_type, check_firewall_mock): + with pytest.raises(MalformedResponseException) as exception_msg: + check_firewall_mock._calculate_schedule_time_diff(datetime.now(), param_schedule_type, None) + + assert str(exception_msg.value) == f"Unknown schedule type: {param_schedule_type}." + + @pytest.mark.parametrize( + "param_now_dts, param_test_window, param_schedules_block, check_result", + [ + ( + "2023-08-07 00:00:00", # this is Monday + 120, + { + "anti-virus": { + "@ptpl": "lab", # a template provided config + "@src": "tpl", + "recurring": { + "@ptpl": "lab", + "@src": "tpl", + "daily": { + "@ptpl": "lab", + "@src": "tpl", + "action": {"#text": "download-and-install", "@ptpl": "lab", "@src": "tpl"}, + "at": {"#text": "03:30", "@ptpl": "lab", "@src": "tpl"}, + }, + "sync-to-peer": {"#text": "yes", "@ptpl": "lab", "@src": "tpl"}, + "threshold": {"#text": "15", "@ptpl": "lab", "@src": "tpl"}, + }, + } + }, + CheckResult(CheckStatus.SUCCESS, ""), + ), + ( + "2023-08-07 00:00:00", # this is Monday + 120, + {"anti-virus": {"recurring": {"real-time": None}}}, + CheckResult( + CheckStatus.FAIL, "Following schedules fall into test window: anti-virus (unpredictable (real-time))." + ), + ), + ( + "2023-08-07 07:00:00", # this is Monday + 60, + {"anti-virus": {"recurring": {"daily": {"action": "download-and-install", "at": "07:45"}}}}, + CheckResult(CheckStatus.FAIL, "Following schedules fall into test window: anti-virus (at 07:45:00)."), + ), + ( + "2023-08-07 00:00:00", # this is Monday + 180, + { + "global-protect-datafile": { + "recurring": {"weekly": {"action": "download-and-install", "at": "02:45", "day-of-week": "monday"}} + }, + "threats": {"recurring": {"daily": {"action": "download-and-install", "at": "15:30"}}}, + }, + CheckResult(CheckStatus.FAIL, "Following schedules fall into test window: global-protect-datafile (in 2:45:00)."), + ), + ( + "2023-08-07 07:00:00", # this is Monday + 20, + {}, + CheckResult(CheckStatus.SKIPPED, "No scheduled job present on the device."), + ), + ( + "2023-08-07 07:00:00", # this is Monday + 20, + {"anti-virus": {"recurring": {"real-time": None}}}, + CheckResult(CheckStatus.ERROR, "Schedules test window is below the supported, safe minimum of 60 minutes."), + ), + ( + "2023-08-07 07:00:00", # this is Monday + 10081, + {"anti-virus": {"recurring": {"real-time": None}}}, + CheckResult(CheckStatus.ERROR, "Schedules test window is set to over 1 week. This test will always fail."), + ), + ], + ) + def test_check_scheduled_updates( + self, param_now_dts, param_test_window, param_schedules_block, check_result, check_firewall_mock + ): + now_dt = datetime.strptime(param_now_dts, "%Y-%m-%d %H:%M:%S") + check_firewall_mock._node.get_mp_clock = lambda: now_dt + + check_firewall_mock._node.get_update_schedules = lambda: param_schedules_block + + assert check_firewall_mock.check_scheduled_updates(param_test_window) == check_result + def test_check_jobs_success(self, check_firewall_mock): jobs = { "4": { diff --git a/tests/test_firewall_proxy.py b/tests/test_firewall_proxy.py index 4309d80..45fac35 100644 --- a/tests/test_firewall_proxy.py +++ b/tests/test_firewall_proxy.py @@ -1,5 +1,6 @@ import pytest from unittest.mock import MagicMock +from panos.firewall import Firewall from panos_upgrade_assurance.firewall_proxy import FirewallProxy from xmltodict import parse as xml_parse import xml.etree.ElementTree as ET @@ -12,13 +13,17 @@ WrongDiskSizeFormatException, DeviceNotLicensedException, UpdateServerConnectivityException, + GetXpathConfigFailedException, ) +from datetime import datetime @pytest.fixture(scope="function") def fw_proxy_mock(): - fw_proxy_obj = FirewallProxy() - fw_proxy_obj.op = MagicMock() + fw_proxy_obj = FirewallProxy(Firewall()) + fw_proxy_obj._fw.op = MagicMock() + fw_proxy_obj._fw.generate_xapi = MagicMock() + fw_proxy_obj._fw.xapi.get = MagicMock() yield fw_proxy_obj @@ -74,6 +79,73 @@ def test_op_parser_none(self, fw_proxy_mock): fw_proxy_mock.op.assert_called_with(cmd, xml=False, cmd_xml=True, vsys=fw_proxy_mock.vsys) + def test_get_parser_correct_response_defaults(self, fw_proxy_mock): + input_xpath = "/some/xpath" + xml_output_text = """ + + + value + + + """ + xml_output = ET.fromstring(xml_output_text) + fw_proxy_mock.xapi.get.return_value = xml_output + + response = fw_proxy_mock.get_parser(input_xpath) + mocked_response = xml_parse(ET.tostring(xml_output.find("result"), encoding="utf8", method="xml"))["result"] + + assert response == mocked_response + + def test_get_parser_correct_response_in_xml(self, fw_proxy_mock): + input_xpath = "/some/xpath" + xml_output_text = """ + + + value + + + """ + xml_output = ET.fromstring(xml_output_text) + fw_proxy_mock.xapi.get.return_value = xml_output + + response = fw_proxy_mock.get_parser(input_xpath, True) + mocked_response = xml_output.find("result") + + assert response == mocked_response + + def test_get_parser_no_xpath_exception(self, fw_proxy_mock): + with pytest.raises(GetXpathConfigFailedException) as exc_info: + fw_proxy_mock.get_parser(None) + assert "No XPATH provided." in str(exc_info.value) + + def test_get_parser_incorrect_response(self, fw_proxy_mock): + input_xpath = "/some/xpath" + xml_output_text = """ + + + + """ + xml_output = ET.fromstring(xml_output_text) + fw_proxy_mock.xapi.get.return_value = xml_output + + with pytest.raises(GetXpathConfigFailedException) as exc_info: + fw_proxy_mock.get_parser(input_xpath) + + expected = f'Failed get data under XPATH: {input_xpath}, status: {xml_output.get("status")}.' + assert expected == str(exc_info.value) + + def test_get_parser_no_response(self, fw_proxy_mock): + input_xpath = "/some/xpath" + xml_output_text = '' + xml_output = ET.fromstring(xml_output_text) + fw_proxy_mock.xapi.get.return_value = xml_output + + with pytest.raises(GetXpathConfigFailedException) as exc_info: + fw_proxy_mock.get_parser(input_xpath) + + expected = f"No data found under XPATH: {input_xpath}, or path does not exist." + assert expected == str(exc_info.value) + def test_is_pending_changes_true(self, fw_proxy_mock): xml_text = "yes" raw_response = ET.fromstring(xml_text) @@ -1021,14 +1093,9 @@ def test_get_mp_clock(self, fw_proxy_mock): raw_response = ET.fromstring(xml_text) fw_proxy_mock.op.return_value = raw_response - assert fw_proxy_mock.get_mp_clock() == { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:50:21", - "tz": "PDT", - "year": "2023", - } + response = datetime.strptime("Wed May 31 11:50:21 2023", "%a %b %d %H:%M:%S %Y") + + assert fw_proxy_mock.get_mp_clock() == response def test_get_dp_clock(self, fw_proxy_mock): xml_text = """ @@ -1041,14 +1108,9 @@ def test_get_dp_clock(self, fw_proxy_mock): raw_response = ET.fromstring(xml_text) fw_proxy_mock.op.return_value = raw_response - assert fw_proxy_mock.get_dp_clock() == { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:52:34", - "tz": "PDT", - "year": "2023", - } + response = datetime.strptime("Wed May 31 11:52:34 2023", "%a %b %d %H:%M:%S %Y") + + assert fw_proxy_mock.get_dp_clock() == response def test_get_jobs(self, fw_proxy_mock): xml_text = """ @@ -1242,3 +1304,180 @@ def test_get_certificates_no_certificate(self, fw_proxy_mock): fw_proxy_mock.op.return_value = raw_response assert fw_proxy_mock.get_certificates() == {} + + def test_get_update_schedules(self, fw_proxy_mock): + xml_text = """ + + + + + + 15 + + 00:30 + download-and-install + + yes + + + + + + 4 + download-only + yes + + + + + + + + + + + + + + + + + 01:45 + download-and-install + + + + + + + wednesday + 01:02 + download-only + + + + + + + """ + raw_response = ET.fromstring(xml_text) + fw_proxy_mock.xapi.get.return_value = raw_response + # fw_proxy_mock.op.return_value = raw_response + # fw_proxy_mock.get_parser.return_value = raw_response + + response = { + "@ptpl": "lab", + "@src": "tpl", + "anti-virus": { + "@ptpl": "lab", + "@src": "tpl", + "recurring": { + "@ptpl": "lab", + "@src": "tpl", + "daily": { + "@ptpl": "lab", + "@src": "tpl", + "action": {"#text": "download-and-install", "@ptpl": "lab", "@src": "tpl"}, + "at": {"#text": "00:30", "@ptpl": "lab", "@src": "tpl"}, + }, + "sync-to-peer": {"#text": "yes", "@ptpl": "lab", "@src": "tpl"}, + "threshold": {"#text": "15", "@ptpl": "lab", "@src": "tpl"}, + }, + }, + "global-protect-clientless-vpn": { + "@ptpl": "lab", + "@src": "tpl", + "recurring": {"daily": {"action": "download-and-install", "at": "01:45"}}, + }, + "global-protect-datafile": { + "@ptpl": "lab", + "@src": "tpl", + "recurring": {"@ptpl": "lab", "@src": "tpl", "none": {"@ptpl": "lab", "@src": "tpl"}}, + }, + "threats": {"recurring": {"weekly": {"action": "download-only", "at": "01:02", "day-of-week": "wednesday"}}}, + "wf-private": { + "@ptpl": "lab", + "@src": "tpl", + "recurring": {"@ptpl": "lab", "@src": "tpl", "none": {"@ptpl": "lab", "@src": "tpl"}}, + }, + "wildfire": { + "@ptpl": "lab", + "@src": "tpl", + "recurring": { + "@ptpl": "lab", + "@src": "tpl", + "every-15-mins": { + "@ptpl": "lab", + "@src": "tpl", + "action": {"#text": "download-only", "@ptpl": "lab", "@src": "tpl"}, + "at": {"#text": "4", "@ptpl": "lab", "@src": "tpl"}, + "sync-to-peer": {"#text": "yes", "@ptpl": "lab", "@src": "tpl"}, + }, + }, + }, + } + + assert fw_proxy_mock.get_update_schedules() == response + + def test_get_update_schedules_empty_response(self, fw_proxy_mock): + xml_text = """ + + + + + """ + raw_response = ET.fromstring(xml_text) + fw_proxy_mock.xapi.get.return_value = raw_response + + assert fw_proxy_mock.get_update_schedules() == {} + + def test_get_update_schedules_no_update_schedules_key(self, fw_proxy_mock): + xml_text = """ + + + + + + + """ + raw_response = ET.fromstring(xml_text) + fw_proxy_mock.xapi.get.return_value = raw_response + + assert fw_proxy_mock.get_update_schedules() == {} + + def test_get_system_state(self, fw_proxy_mock): + xml_text = """ + + + + + + """ + raw_response = ET.fromstring(xml_text) + fw_proxy_mock.op.return_value = raw_response + + system_state_dict = fw_proxy_mock.get_system_state() + + assert system_state_dict == { + "": "", + "local.brdagent": "{ }", + "local.family": "vm", + "local.info": "{ 'family': vm, 'model': PA-VM, 'name': mp, 'ppid': 0, 'role': mp, 'slot': 1, }", + "local.model": "PA-VM", + "local.name": "mp", + "local.octeon": "{ }", + "local.ppid": "0", + "local.role": "mp", + "local.slot": "1", + "sys.s6.p1.phy": "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }", + }