diff --git a/docs/configtree.py b/docs/configtree.py index bb7a3214..85a382c9 100755 --- a/docs/configtree.py +++ b/docs/configtree.py @@ -70,8 +70,10 @@ def node_style(cls): style = "style=filled " + nodestyle[module] + " " except: pass - result = ' {0} [{1}URL="../module-{2}.html#panos.{3}" target="_top"];\n'.format( - cls_name, style, module, cls + result = ( + ' {0} [{1}URL="../module-{2}.html#panos.{3}" target="_top"];\n'.format( + cls_name, style, module, cls + ) ) else: if style: diff --git a/examples/dyn_address_group.py b/examples/dyn_address_group.py index 248769ca..20d4cd41 100755 --- a/examples/dyn_address_group.py +++ b/examples/dyn_address_group.py @@ -111,7 +111,11 @@ def main(): logging.basicConfig(format=logging_format, level=logging_level) # Connect to the device and determine its type (Firewall or Panorama). - device = PanDevice.create_from_device(args.hostname, args.username, args.password,) + device = PanDevice.create_from_device( + args.hostname, + args.username, + args.password, + ) # Panorama does not have a userid API, so exit. # You can use the userid API on a firewall with the Panorama 'target' diff --git a/examples/upgrade.py b/examples/upgrade.py index 12e55236..e1162337 100755 --- a/examples/upgrade.py +++ b/examples/upgrade.py @@ -95,7 +95,11 @@ def main(): # Connect to the device and determine its type (Firewall or Panorama). # This is important to know what version to upgrade to next. - device = PanDevice.create_from_device(args.hostname, args.username, args.password,) + device = PanDevice.create_from_device( + args.hostname, + args.username, + args.password, + ) # Perform the upgrades in sequence with reboots between each upgrade device.software.upgrade_to_version(args.version, args.dryrun) diff --git a/examples/userid.py b/examples/userid.py index 0ba04887..b8aa942a 100755 --- a/examples/userid.py +++ b/examples/userid.py @@ -90,7 +90,11 @@ def main(): logging.basicConfig(format=logging_format, level=logging_level) # Connect to the device and determine its type (Firewall or Panorama). - device = PanDevice.create_from_device(args.hostname, args.username, args.password,) + device = PanDevice.create_from_device( + args.hostname, + args.username, + args.password, + ) logging.debug("Detecting type of device") diff --git a/panos/__init__.py b/panos/__init__.py index e9c0afa0..3fc19205 100755 --- a/panos/__init__.py +++ b/panos/__init__.py @@ -241,7 +241,9 @@ def tree_legend_dot(): result += ( '{module} [style=filled fillcolor={color} URL="{url}' '/module-{module}.html" target="_blank"];'.format( - module=module, color=node_color(module), url=DOCUMENTATION_URL, + module=module, + color=node_color(module), + url=DOCUMENTATION_URL, ) ) result += "}" @@ -286,7 +288,11 @@ def string_or_list(value): value, ] return ( - list(value) if "__iter__" in dir(value) else [value,] + list(value) + if "__iter__" in dir(value) + else [ + value, + ] ) diff --git a/panos/base.py b/panos/base.py index 6a9bcf76..9477ec3a 100644 --- a/panos/base.py +++ b/panos/base.py @@ -1631,7 +1631,13 @@ def _set_reference( if var_type == "list": if var is None: update_needed = True - setattr(obj, reference_var, [self,]) + setattr( + obj, + reference_var, + [ + self, + ], + ) if update: obj.update(reference_var) elif not isinstance(var, list): @@ -2100,7 +2106,9 @@ def hierarchy_info(self): classes = panos.object_classes() configs = [ - [self.__class__,], + [ + self.__class__, + ], ] updated_configs = [] @@ -2112,7 +2120,10 @@ def hierarchy_info(self): configs.pop(num) for p in parents: configs.append( - chain + [p,] + chain + + [ + p, + ] ) break else: @@ -2764,7 +2775,8 @@ def __getattr__(self, name): raise AttributeError( "'{0}' object has no attribute '{1}'".format( - self.__class__.__name__, str(name), + self.__class__.__name__, + str(name), ) ) @@ -2849,9 +2861,7 @@ def __repr__(self): class ValueEntry(VersionedPanObject): - """Base class for objects that only have a value element. - - """ + """Base class for objects that only have a value element.""" ROOT = Root.VSYS SUFFIX = ENTRY @@ -3685,7 +3695,12 @@ def get_device_version(self): @classmethod def create_from_device( - cls, hostname, api_username=None, api_password=None, api_key=None, port=443, + cls, + hostname, + api_username=None, + api_password=None, + api_key=None, + port=443, ): """Factory method to create a :class:`panos.firewall.Firewall` or :class:`panos.panorama.Panorama` object from a live device @@ -3707,18 +3722,33 @@ def create_from_device( # Create generic PanDevice to connect and get information from panos import firewall, panorama - device = PanDevice(hostname, api_username, api_password, api_key, port,) + device = PanDevice( + hostname, + api_username, + api_password, + api_key, + port, + ) system_info = device.refresh_system_info() version = system_info[0] model = system_info[1] if model == "Panorama" or model.startswith("M-"): instance = panorama.Panorama( - hostname, api_username, api_password, device.api_key, port, + hostname, + api_username, + api_password, + device.api_key, + port, ) else: serial = system_info[2] instance = firewall.Firewall( - hostname, api_username, api_password, device.api_key, serial, port, + hostname, + api_username, + api_password, + device.api_key, + serial, + port, ) instance._set_version_and_version_info(version) return instance @@ -3866,10 +3896,16 @@ def method(self, *args, **kwargs): def classify_exception(self, e): if str(e) == "Invalid credentials.": - return err.PanInvalidCredentials(str(e), pan_device=self.pan_device,) + return err.PanInvalidCredentials( + str(e), + pan_device=self.pan_device, + ) elif str(e).startswith("URLError:"): if str(e).endswith("timed out"): - return err.PanConnectionTimeout(str(e), pan_device=self.pan_device,) + return err.PanConnectionTimeout( + str(e), + pan_device=self.pan_device, + ) else: # This could be that we have an old version of OpenSSL # that doesn't support TLSv1.1, so check for that and give @@ -4584,12 +4620,19 @@ def map_ha(self, method_name, *args, **kwargs): return result1, result2 def show_highavailability_state(self): + from panos.panorama import Panorama + ha_state = self.op("show high-availability state") enabled = ha_state.findtext("result/enabled") + p = self if enabled is None or enabled == "no": return "disabled", None else: - return ha_state.findtext("result/group/local-info/state"), ha_state + if isinstance(p, Panorama): + xpath = "result/local-info/state" + else: + xpath = "result/group/local-info/state" + return ha_state.findtext(xpath), ha_state def refresh_ha_active(self): """Refresh which device is active using the live device @@ -4788,7 +4831,12 @@ def _commit( logger.debug( self.id + ": commit requested: commit_all:%s sync:%s sync_all:%s cmd:%s" - % (str(commit_all), str(sync), str(sync_all), cmd,) + % ( + str(commit_all), + str(sync), + str(sync_all), + cmd, + ) ) if commit_all: action = "all" diff --git a/panos/device.py b/panos/device.py index d3b6f2a2..ec8c3f20 100644 --- a/panos/device.py +++ b/panos/device.py @@ -1276,7 +1276,11 @@ def _setup(self): ) ) params.append( - VersionedParamPath("disabled", vartype="yesno", path="disabled",), + VersionedParamPath( + "disabled", + vartype="yesno", + path="disabled", + ), ) self._params = tuple(params) @@ -1422,7 +1426,10 @@ def _setup(self): "facility", default="LOG_USER", path="facility", - values=["LOG_USER",] + ["LOG_LOCAL{0}".format(x) for x in range(8)], + values=[ + "LOG_USER", + ] + + ["LOG_LOCAL{0}".format(x) for x in range(8)], ) ) @@ -2148,12 +2155,28 @@ def _setup(self): ) params.append( VersionedParamPath( - "username_field_value", path="username-field/{username_field}", + "username_field_value", + path="username-field/{username_field}", + ) + ) + params.append( + VersionedParamPath( + "domain", + path="domain", + ) + ) + params.append( + VersionedParamPath( + "use_crl", + path="use-crl", + ) + ) + params.append( + VersionedParamPath( + "use_ocsp", + path="use-ocsp", ) ) - params.append(VersionedParamPath("domain", path="domain",)) - params.append(VersionedParamPath("use_crl", path="use-crl",)) - params.append(VersionedParamPath("use_ocsp", path="use-ocsp",)) params.append( VersionedParamPath( "crl_receive_timeout", @@ -2180,27 +2203,50 @@ def _setup(self): ) params.append( VersionedParamPath( - "block_unknown_certificate", vartype="yesno", path="block-unknown-cert", + "block_unknown_certificate", + vartype="yesno", + path="block-unknown-cert", ) ) params.append( VersionedParamPath( - "block_certificate_timeout", vartype="yesno", path="block-timeout-cert", + "block_certificate_timeout", + vartype="yesno", + path="block-timeout-cert", ) ) params.append( - VersionedParamPath("block_unauthenticated_certificate", exclude=True,) + VersionedParamPath( + "block_unauthenticated_certificate", + exclude=True, + ) ) params[-1].add_profile( - "7.1.0", vartype="yesno", path="block-unauthenticated-cert", + "7.1.0", + vartype="yesno", + path="block-unauthenticated-cert", + ) + params.append( + VersionedParamPath( + "block_expired_certificate", + exclude=True, + ) ) - params.append(VersionedParamPath("block_expired_certificate", exclude=True,)) params[-1].add_profile( - "8.1.0", vartype="yesno", path="block-expired-cert", + "8.1.0", + vartype="yesno", + path="block-expired-cert", + ) + params.append( + VersionedParamPath( + "ocsp_exclude_nonce", + exclude=True, + ) ) - params.append(VersionedParamPath("ocsp_exclude_nonce", exclude=True,)) params[-1].add_profile( - "9.0.0", path="ocsp-exclude-nonce", vartype="yesno", + "9.0.0", + path="ocsp-exclude-nonce", + vartype="yesno", ) self._params = tuple(params) @@ -2227,13 +2273,27 @@ def _setup(self): # params params = [] - params.append(VersionedParamPath("default_ocsp_url", path="default-ocsp-url",)) params.append( - VersionedParamPath("ocsp_verify_certificate", path="ocsp-verify-cert",) + VersionedParamPath( + "default_ocsp_url", + path="default-ocsp-url", + ) + ) + params.append( + VersionedParamPath( + "ocsp_verify_certificate", + path="ocsp-verify-cert", + ) + ) + params.append( + VersionedParamPath( + "template_name", + exclude=True, + ) ) - params.append(VersionedParamPath("template_name", exclude=True,)) params[-1].add_profile( - "9.0.0", path="template-name", + "9.0.0", + path="template-name", ) self._params = tuple(params) @@ -2268,7 +2328,8 @@ def _setup(self): params.append( VersionedParamPath( - "forward_trust_certificate_rsa", path="forward-trust-certificate/rsa", + "forward_trust_certificate_rsa", + path="forward-trust-certificate/rsa", ) ) params.append( @@ -2291,13 +2352,17 @@ def _setup(self): ) params.append( VersionedParamPath( - "root_ca_excludes", vartype="member", path="root-ca-exclude-list", + "root_ca_excludes", + vartype="member", + path="root-ca-exclude-list", ) ) # Only option present on Panorama. params.append( VersionedParamPath( - "trusted_root_cas", vartype="member", path="trusted-root-CA", + "trusted_root_cas", + vartype="member", + path="trusted-root-CA", ) ) params.append( @@ -2332,8 +2397,19 @@ def _setup(self): # params params = [] - params.append(VersionedParamPath("description", path="description",)) - params.append(VersionedParamPath("exclude", vartype="yesno", path="exclude",)) + params.append( + VersionedParamPath( + "description", + path="description", + ) + ) + params.append( + VersionedParamPath( + "exclude", + vartype="yesno", + path="exclude", + ) + ) self._params = tuple(params) @@ -2363,9 +2439,19 @@ def _setup(self): params = [] params.append( - VersionedParamPath("password_hash", vartype="encrypted", path="phash",) + VersionedParamPath( + "password_hash", + vartype="encrypted", + path="phash", + ) + ) + params.append( + VersionedParamPath( + "disabled", + vartype="yesno", + path="disabled", + ) ) - params.append(VersionedParamPath("disabled", vartype="yesno", path="disabled",)) self._params = tuple(params) @@ -2406,6 +2492,12 @@ def _setup(self): # params params = [] - params.append(VersionedParamPath("users", vartype="member", path="user",)) + params.append( + VersionedParamPath( + "users", + vartype="member", + path="user", + ) + ) self._params = tuple(params) diff --git a/panos/firewall.py b/panos/firewall.py index 49c8ae3a..af824e4e 100644 --- a/panos/firewall.py +++ b/panos/firewall.py @@ -94,6 +94,7 @@ class Firewall(PanDevice): "objects.DynamicUserGroup", "objects.Region", "objects.Edl", + "objects.UrlFilteringProfile", "policies.Rulebase", "network.EthernetInterface", "network.AggregateInterface", diff --git a/panos/ha.py b/panos/ha.py index 824d9734..9a072e66 100644 --- a/panos/ha.py +++ b/panos/ha.py @@ -560,5 +560,6 @@ def _setup(self): # stubs self._stubs.add_profile( - "0.0.0", "interface/ha1", + "0.0.0", + "interface/ha1", ) diff --git a/panos/network.py b/panos/network.py index 566a6dcc..01c09d8f 100644 --- a/panos/network.py +++ b/panos/network.py @@ -153,22 +153,48 @@ def _setup(self): ) ) params.append( - VersionedParamPath("enable_packet_buffer_protection", exclude=True,) + VersionedParamPath( + "enable_packet_buffer_protection", + exclude=True, + ) ) params[-1].add_profile( - "8.0.0", path="network/enable-packet-buffer-protection", vartype="yesno", + "8.0.0", + path="network/enable-packet-buffer-protection", + vartype="yesno", + ) + params.append( + VersionedParamPath( + "enable_device_identification", + exclude=True, + ) ) - params.append(VersionedParamPath("enable_device_identification", exclude=True,)) params[-1].add_profile( - "10.0.0", path="enable-device-identification", vartype="yesno", + "10.0.0", + path="enable-device-identification", + vartype="yesno", + ) + params.append( + VersionedParamPath( + "device_include_acl", + exclude=True, + ) ) - params.append(VersionedParamPath("device_include_acl", exclude=True,)) params[-1].add_profile( - "10.0.0", path="device-acl/include-list", vartype="member", + "10.0.0", + path="device-acl/include-list", + vartype="member", + ) + params.append( + VersionedParamPath( + "device_exclude_acl", + exclude=True, + ) ) - params.append(VersionedParamPath("device_exclude_acl", exclude=True,)) params[-1].add_profile( - "10.0.0", path="device-acl/exclude-acl", vartype="member", + "10.0.0", + path="device-acl/exclude-acl", + vartype="member", ) self._params = tuple(params) @@ -1376,7 +1402,12 @@ def _setup(self): "mode", path="{mode}", default="layer3", - values=["layer3", "layer2", "virtual-wire", "ha",], + values=[ + "layer3", + "layer2", + "virtual-wire", + "ha", + ], ) ) params.append( @@ -2170,7 +2201,11 @@ def _setup(self): VersionedParamPath("enable", path="enable", default=True, vartype="yesno") ) params.append( - VersionedParamPath("reject_default_route", default=True, vartype="yesno",) + VersionedParamPath( + "reject_default_route", + default=True, + vartype="yesno", + ) ) params.append( VersionedParamPath( @@ -4202,7 +4237,11 @@ def _setup(self): ) params[-1].add_profile( "8.1.0", - values=("ip", "dynamic", "fqdn",), + values=( + "ip", + "dynamic", + "fqdn", + ), path="peer-address/{peer_ip_type}", ) params.append( @@ -4608,7 +4647,14 @@ def _setup(self): params.append( VersionedParamPath( "mk_esp_encryption", - values=("des", "3des", "aes128", "aes192", "aes256", "null",), + values=( + "des", + "3des", + "aes128", + "aes192", + "aes256", + "null", + ), path="{type}/{mk_protocol}/encryption/algorithm", ) ) @@ -5448,6 +5494,8 @@ def _setup(self): params = [] - params.append(VersionedParamPath("interface", path="interface"),) + params.append( + VersionedParamPath("interface", path="interface"), + ) self._params = tuple(params) diff --git a/panos/objects.py b/panos/objects.py index 3c3b34da..b5cc3706 100644 --- a/panos/objects.py +++ b/panos/objects.py @@ -319,7 +319,8 @@ def _setup(self): # xpaths self._xpaths.add_profile(value="/application") self._xpaths.add_profile( - value='//*[contains(local-name(), "application")]', parents=("Predefined",), + value='//*[contains(local-name(), "application")]', + parents=("Predefined",), ) # params @@ -615,7 +616,8 @@ def _setup(self): # xpaths self._xpaths.add_profile(value="/application-container") self._xpaths.add_profile( - value='//*[contains(local-name(), "application")]', parents=("Predefined",), + value='//*[contains(local-name(), "application")]', + parents=("Predefined",), ) # params @@ -859,7 +861,9 @@ def _setup(self): VersionedParamPath( "action_type", default="tagging", - values=["tagging",], + values=[ + "tagging", + ], path="type/{action_type}", ) ) @@ -1149,7 +1153,10 @@ def _setup(self): params.append( VersionedParamPath( - "edl_type", default="ip", path="type", values=("ip", "domain", "url"), + "edl_type", + default="ip", + path="type", + values=("ip", "domain", "url"), ), ) params[-1].add_profile( @@ -1162,38 +1169,77 @@ def _setup(self): path="type/{edl_type}", values=("ip", "domain", "url", "predefined-ip", "predefined-url"), ) - params.append(VersionedParamPath("description", path="description",),) + params.append( + VersionedParamPath( + "description", + path="description", + ), + ) params[-1].add_profile( - "8.0.0", path="type/{edl_type}/description", + "8.0.0", + path="type/{edl_type}/description", + ) + params.append( + VersionedParamPath( + "source", + path="url", + ), ) - params.append(VersionedParamPath("source", path="url",),) params[-1].add_profile( - "8.0.0", path="type/{edl_type}/url", + "8.0.0", + path="type/{edl_type}/url", + ) + params.append( + VersionedParamPath( + "exceptions", + exclude=True, + ), ) - params.append(VersionedParamPath("exceptions", exclude=True,),) params[-1].add_profile( - "8.0.0", vartype="member", path="type/{edl_type}/exception-list", + "8.0.0", + vartype="member", + path="type/{edl_type}/exception-list", + ) + params.append( + VersionedParamPath( + "certificate_profile", + exclude=True, + ) ) - params.append(VersionedParamPath("certificate_profile", exclude=True,)) params[-1].add_profile( "8.0.0", path="type/{edl_type}/certificate-profile", condition={"edl_type": ["ip", "domain", "url"]}, ) - params.append(VersionedParamPath("username", exclude=True,)) + params.append( + VersionedParamPath( + "username", + exclude=True, + ) + ) params[-1].add_profile( "8.0.0", path="type/{edl_type}/auth/username", condition={"edl_type": ["ip", "domain", "url"]}, ) - params.append(VersionedParamPath("password", exclude=True,)) + params.append( + VersionedParamPath( + "password", + exclude=True, + ) + ) params[-1].add_profile( "8.0.0", path="type/{edl_type}/auth/password", vartype="encrypted", condition={"edl_type": ["ip", "domain", "url"]}, ) - params.append(VersionedParamPath("expand_domain", exclude=True,),) + params.append( + VersionedParamPath( + "expand_domain", + exclude=True, + ), + ) params[-1].add_profile( "9.0.0", path="type/{edl_type}/expand-domain", @@ -1256,7 +1302,10 @@ def _setup(self): "friday", "saturday", ), - condition={"edl_type": ["ip", "domain", "url"], "repeat": "weekly",}, + condition={ + "edl_type": ["ip", "domain", "url"], + "repeat": "weekly", + }, ) params.append( VersionedParamPath( @@ -1270,7 +1319,261 @@ def _setup(self): "8.0.0", vartype="int", path="type/{edl_type}/recurring/{repeat}/day-of-month", - condition={"edl_type": ["ip", "domain", "url"], "repeat": "monthly",}, + condition={ + "edl_type": ["ip", "domain", "url"], + "repeat": "monthly", + }, + ) + + self._params = tuple(params) + + +class HTTPHeaderInsertionHeaders(VersionedPanObject): + """HTTPHeaderInsertionHeaders object + + Args: + name (str): HTTP Insertion header name + header (str): Header key to be inserted + value (str): Header value to be inserted + log (bool): Enable log + + """ + + ROOT = Root.VSYS + SUFFIX = ENTRY + + def _setup(self): + # xpaths + self._xpaths.add_profile(value="/headers") + + # params + params = [] + + params.append(VersionedParamPath("header", vartype="string", path="/header")) + + params.append(VersionedParamPath("value", vartype="string", path="/value")) + + params.append(VersionedParamPath("log", vartype="yesno", path="/log")) + + self._params = tuple(params) + + +class HTTPHeaderInsertionType(VersionedPanObject): + """HTTPHeaderInsertionType object + + Args: + name (str): HTTP Header Insertion Type name + domains (list): List of domains + + """ + + ROOT = Root.VSYS + SUFFIX = ENTRY + CHILDTYPES = ("objects.HTTPHeaderInsertionHeaders",) + + def _setup(self): + # xpaths + self._xpaths.add_profile(value="/type") + + # params + params = [] + + params.append(VersionedParamPath("domains", vartype="member", path="/domains")) + + self._params = tuple(params) + + +class HTTPHeaderInsertion(VersionedPanObject): + """HTTPHeaderInsertion object + + Args: + name (str): HTTP Header Insertion name + disable_override (bool): Disable override + + """ + + ROOT = Root.VSYS + SUFFIX = ENTRY + CHILDTYPES = ("objects.HTTPHeaderInsertionType",) + + def _setup(self): + # xpaths + self._xpaths.add_profile(value="/http-header-insertion") + + # params + params = [] + + params.append( + VersionedParamPath( + "disable_override", vartype="yesno", path="/disable-override" + ) + ) + + self._params = tuple(params) + + +class UrlFilteringProfile(VersionedPanObject): + """UrlFilteringProfile object + + Args: + name (str): URL-Filtering name + description (str): Profile description + allow (list): Allowed categories + alert (list): Alert categories + categorychange (list): Category change categories + block (list): Block categories + continue (list): Continue categories + override (list): Override categories + mode (str): Credential enforcement mode. Valid values are "disabled", + "ip-user", "domain-credentials". Default value is "disabled". + group_mapping (str): Group mapping used + log_severity (str): Log severity. Default value is "medium" + credential_enforcement_allow (list): Credential enforcement allow categories + credential_enforcement_alert (list): Credential enforcement alert categories + credential_enforcement_block (list): Credential enforcement block categories + credential_enforcement_continue (list): Credential enforcement continue categories + enable_container_page (bool): Enable container page + log_container_page_only (bool): Log container page only + safe_search_enforcement (bool): Safe search enforcement + log_http_hdr_xff (bool): Log HTTP HDR XFF + log_http_hdr_user_agent (bool): Log HTTP HDR User-Agent + log_http_hdr_referer (bool): Log HTTP HDR Referer + + """ + + ROOT = Root.VSYS + SUFFIX = ENTRY + CHILDTYPES = ("objects.HTTPHeaderInsertion",) + + def _setup(self): + # xpaths + self._xpaths.add_profile(value="/profiles/url-filtering") + + # params + params = [] + + params.append(VersionedParamPath("description", path="description")) + + params.append(VersionedParamPath("action_allow", path="allow", vartype="member")) + + params.append(VersionedParamPath("action_alert", path="alert", vartype="member")) + + params.append( + VersionedParamPath( + "categorychange", path="categorychange", vartype="member" + ) + ) + + params.append(VersionedParamPath("action_block", path="block", vartype="member")) + + params.append(VersionedParamPath("action_continue", path="continue", vartype="member")) + + params.append(VersionedParamPath("override", path="override", vartype="member")) + + params.append( + VersionedParamPath( + "mode", + values=["disabled", "ip-user", "domain-credentials"], + default="disabled", + path="credential-enforcement/mode/{mode}", + ) + ) + + params.append( + VersionedParamPath( + "group_mapping", + path="credential-enforcement/mode/group-mapping", + vartype="string", + ) + ) + + params.append( + VersionedParamPath( + "log_severity", + path="credential-enforcement/log-severity", + vartype="string", + default="medium", + ) + ) + + params.append( + VersionedParamPath( + "credential_enforcement_allow", + path="credential-enforcement/allow", + vartype="member", + ) + ) + + params.append( + VersionedParamPath( + "credential_enforcement_alert", + path="credential-enforcement/alert", + vartype="member", + ) + ) + + params.append( + VersionedParamPath( + "credential_enforcement_block", + path="credential-enforcement/block", + vartype="member", + ) + ) + + params.append( + VersionedParamPath( + "credential_enforcement_continue", + path="credential-enforcement/continue", + vartype="member", + ) + ) + + params.append( + VersionedParamPath( + "enable_container_page", + path="enable-container-page", + vartype="yesno", + ) + ) + + params.append( + VersionedParamPath( + "log_container_page_only", + path="log-container-page-only", + vartype="yesno", + ) + ) + + params.append( + VersionedParamPath( + "safe_search_enforcement", + path="safe-search-enforcement", + vartype="yesno", + ) + ) + + params.append( + VersionedParamPath( + "log_http_hdr_xff", + path="log-http-hdr-xff", + vartype="yesno", + ) + ) + + params.append( + VersionedParamPath( + "log_http_hdr_user_agent", + path="log-http-hdr-user-agent", + vartype="yesno", + ) + ) + + params.append( + VersionedParamPath( + "log_http_hdr_referer", + path="log-http-hdr-referer", + vartype="yesno", + ) ) self._params = tuple(params) diff --git a/panos/panorama.py b/panos/panorama.py index cc446290..cd6dd755 100644 --- a/panos/panorama.py +++ b/panos/panorama.py @@ -118,7 +118,9 @@ class DeviceGroup(VersionedPanObject): "objects.CustomUrlCategory", "objects.LogForwardingProfile", "objects.Region", + "objects.Tag", "objects.Edl", + "objects.UrlFilteringProfile", "policies.PreRulebase", "policies.PostRulebase", ) @@ -439,6 +441,7 @@ class Panorama(base.PanDevice): "objects.DynamicUserGroup", "objects.Region", "objects.Edl", + "objects.UrlFilteringProfile", "firewall.Firewall", "panorama.DeviceGroup", "panorama.Template", diff --git a/panos/plugins.py b/panos/plugins.py index aec89bd8..991378e6 100644 --- a/panos/plugins.py +++ b/panos/plugins.py @@ -373,7 +373,9 @@ def _setup(self): params.append( VersionedParamPath( - "same_as_primary", vartype="yesno", path="same-as-primary", + "same_as_primary", + vartype="yesno", + path="same-as-primary", ) ) params.append(VersionedParamPath("peer_ip_address", path="peer-ip-address")) diff --git a/panos/policies.py b/panos/policies.py index b947a6fa..c1f7b1bb 100644 --- a/panos/policies.py +++ b/panos/policies.py @@ -48,7 +48,10 @@ def _setup(self, name=None, elm=None): def refresh(self, elm=None): if elm is None and self.obj is not None: self.obj.parent.opstate.hit_count.refresh( - self.obj.HIT_COUNT_STYLE, [self.name,], + self.obj.HIT_COUNT_STYLE, + [ + self.name, + ], ) else: self._refresh_xml(elm) @@ -264,7 +267,8 @@ def current(self): ET.SubElement(sub, "xpath").text = self.obj.xpath() ans = self.obj.nearest_pandevice().op( - ET.tostring(cmd, encoding="utf-8"), cmd_xml=False, + ET.tostring(cmd, encoding="utf-8"), + cmd_xml=False, ) resp = ans.find("./result/entry/comment") @@ -284,7 +288,8 @@ def update(self, comment): ET.SubElement(sub, "comment").text = comment self.obj.nearest_pandevice().op( - ET.tostring(cmd, encoding="utf-8"), cmd_xml=False, + ET.tostring(cmd, encoding="utf-8"), + cmd_xml=False, ) @@ -370,7 +375,12 @@ def _setup(self): for var_name, path in any_defaults: params.append( VersionedParamPath( - var_name, default=["any",], vartype="member", path=path + var_name, + default=[ + "any", + ], + vartype="member", + path=path, ) ) @@ -384,7 +394,12 @@ def _setup(self): ) params.append( VersionedParamPath( - "category", default=["any",], vartype="member", path="category" + "category", + default=[ + "any", + ], + vartype="member", + path="category", ) ) params.append(VersionedParamPath("action", path="action")) @@ -447,11 +462,23 @@ def _setup(self): params.append(VersionedParamPath("uuid", exclude=True)) params[-1].add_profile("9.0.0", vartype="attrib", path="uuid") params.append( - VersionedParamPath("source_devices", default=["any",], exclude=True) + VersionedParamPath( + "source_devices", + default=[ + "any", + ], + exclude=True, + ) ) params[-1].add_profile("10.0.0", vartype="member", path="source-hip") params.append( - VersionedParamPath("destination_devices", default=["any",], exclude=True) + VersionedParamPath( + "destination_devices", + default=[ + "any", + ], + exclude=True, + ) ) params[-1].add_profile("10.0.0", vartype="member", path="destination-hip") params.append(VersionedParamPath("group_tag", exclude=True)) @@ -555,7 +582,12 @@ def _setup(self): ) params.append( VersionedParamPath( - "fromzone", default=["any",], vartype="member", path="from" + "fromzone", + default=[ + "any", + ], + vartype="member", + path="from", ) ) params.append(VersionedParamPath("tozone", vartype="member", path="to")) @@ -563,12 +595,22 @@ def _setup(self): params.append(VersionedParamPath("service", default="any", path="service")) params.append( VersionedParamPath( - "source", default=["any",], vartype="member", path="source" + "source", + default=[ + "any", + ], + vartype="member", + path="source", ) ) params.append( VersionedParamPath( - "destination", default=["any",], vartype="member", path="destination" + "destination", + default=[ + "any", + ], + vartype="member", + path="destination", ) ) params.append( @@ -872,7 +914,12 @@ def _setup(self): for var_name, path in any_defaults: params.append( VersionedParamPath( - var_name, default=["any",], vartype="member", path=path + var_name, + default=[ + "any", + ], + vartype="member", + path=path, ) ) params.append(VersionedParamPath("application", path="application")) @@ -1155,43 +1202,103 @@ def _setup(self): params.append(VersionedParamPath("uuid", exclude=True)) params[-1].add_profile("9.0.0", vartype="attrib", path="uuid") params.append( - VersionedParamPath("source_zones", vartype="member", path="from",) + VersionedParamPath( + "source_zones", + vartype="member", + path="from", + ) ) params.append( - VersionedParamPath("source_addresses", vartype="member", path="source",) + VersionedParamPath( + "source_addresses", + vartype="member", + path="source", + ) ) params.append( - VersionedParamPath("negate_source", vartype="yesno", path="negate-source",) + VersionedParamPath( + "negate_source", + vartype="yesno", + path="negate-source", + ) ) params.append( - VersionedParamPath("source_users", vartype="member", path="source-user",) + VersionedParamPath( + "source_users", + vartype="member", + path="source-user", + ) + ) + params.append( + VersionedParamPath( + "source_hip", + exclude=True, + ) ) - params.append(VersionedParamPath("source_hip", exclude=True,)) params[-1].add_profile( - "10.0.0", path="source-hip", vartype="member", + "10.0.0", + path="source-hip", + vartype="member", ) params.append( - VersionedParamPath("destination_zones", vartype="member", path="to",) + VersionedParamPath( + "destination_zones", + vartype="member", + path="to", + ) + ) + params.append( + VersionedParamPath( + "destination_addresses", + vartype="member", + path="destination", + ) ) params.append( VersionedParamPath( - "destination_addresses", vartype="member", path="destination", + "negate_destination", + vartype="yesno", + path="negate-destination", ) ) params.append( VersionedParamPath( - "negate_destination", vartype="yesno", path="negate-destination", + "destination_hip", + exclude=True, ) ) - params.append(VersionedParamPath("destination_hip", exclude=True,)) params[-1].add_profile( - "10.0.0", path="destination-hip", vartype="member", + "10.0.0", + path="destination-hip", + vartype="member", + ) + params.append( + VersionedParamPath( + "tags", + vartype="member", + path="tag", + ) ) - params.append(VersionedParamPath("tags", vartype="member", path="tag",)) - params.append(VersionedParamPath("disabled", vartype="yesno", path="disabled",)) - params.append(VersionedParamPath("services", vartype="member", path="service",)) params.append( - VersionedParamPath("url_categories", vartype="member", path="category",) + VersionedParamPath( + "disabled", + vartype="yesno", + path="disabled", + ) + ) + params.append( + VersionedParamPath( + "services", + vartype="member", + path="service", + ) + ) + params.append( + VersionedParamPath( + "url_categories", + vartype="member", + path="category", + ) ) params.append( VersionedParamPath( @@ -1210,38 +1317,79 @@ def _setup(self): VersionedParamPath( "decryption_type", path="type/{decryption_type}", - values=("ssl-forward-proxy", "ssh-proxy", "ssl-inbound-inspection",), + values=( + "ssl-forward-proxy", + "ssh-proxy", + "ssl-inbound-inspection", + ), ) ) params.append( VersionedParamPath( "ssl_certificate", path="type/{decryption_type}", - condition={"decryption_type": "ssl-inbound-inspection",}, + condition={ + "decryption_type": "ssl-inbound-inspection", + }, + ) + ) + params.append( + VersionedParamPath( + "decryption_profile", + path="profile", + ) + ) + params.append( + VersionedParamPath( + "forwarding_profile", + exclude=True, ) ) - params.append(VersionedParamPath("decryption_profile", path="profile",)) - params.append(VersionedParamPath("forwarding_profile", exclude=True,)) params[-1].add_profile( - "8.1.0", path="forwarding-profile", + "8.1.0", + path="forwarding-profile", + ) + params.append( + VersionedParamPath( + "group_tag", + exclude=True, + ) ) - params.append(VersionedParamPath("group_tag", exclude=True,)) params[-1].add_profile( - "9.0.0", path="group-tag", + "9.0.0", + path="group-tag", ) params.append( - VersionedParamPath("log_successful_tls_handshakes", exclude=True,) + VersionedParamPath( + "log_successful_tls_handshakes", + exclude=True, + ) ) params[-1].add_profile( - "10.0.0", path="log-success", vartype="yesno", + "10.0.0", + path="log-success", + vartype="yesno", + ) + params.append( + VersionedParamPath( + "log_failed_tls_handshakes", + exclude=True, + ) ) - params.append(VersionedParamPath("log_failed_tls_handshakes", exclude=True,)) params[-1].add_profile( - "10.0.0", path="log-fail", vartype="yesno", + "10.0.0", + path="log-fail", + vartype="yesno", + ) + params.append( + VersionedParamPath( + "log_setting", + exclude=True, + ) ) - params.append(VersionedParamPath("log_setting", exclude=True,)) params[-1].add_profile( - "10.0.0", path="log-setting", + "10.0.0", + path="log-setting", ) params.append( VersionedParamPath("negate_target", path="target/negate", vartype="yesno") @@ -1306,55 +1454,117 @@ def _setup(self): params.append(VersionedParamPath("uuid", exclude=True)) params[-1].add_profile("9.0.0", vartype="attrib", path="uuid") params.append( - VersionedParamPath("source_zones", vartype="member", path="from",) + VersionedParamPath( + "source_zones", + vartype="member", + path="from", + ) ) params.append( VersionedParamPath( - "source_addresses", default=["any",], vartype="member", path="source" + "source_addresses", + default=[ + "any", + ], + vartype="member", + path="source", ) ) params.append( - VersionedParamPath("negate_source", vartype="yesno", path="negate-source",) + VersionedParamPath( + "negate_source", + vartype="yesno", + path="negate-source", + ) ) params.append( - VersionedParamPath("destination_zones", vartype="member", path="to",) + VersionedParamPath( + "destination_zones", + vartype="member", + path="to", + ) ) params.append( VersionedParamPath( "destination_addresses", - default=["any",], + default=[ + "any", + ], vartype="member", path="destination", ) ) params.append( VersionedParamPath( - "negate_destination", vartype="yesno", path="negate-destination", + "negate_destination", + vartype="yesno", + path="negate-destination", + ) + ) + params.append( + VersionedParamPath( + "tag", + vartype="member", + path="tag", + ) + ) + params.append( + VersionedParamPath( + "disabled", + vartype="yesno", + path="disabled", + ) + ) + params.append( + VersionedParamPath( + "service", + vartype="member", + path="service", + ) + ) + params.append( + VersionedParamPath( + "source_hip", + exclude=True, ) ) - params.append(VersionedParamPath("tag", vartype="member", path="tag",)) - params.append(VersionedParamPath("disabled", vartype="yesno", path="disabled",)) - params.append(VersionedParamPath("service", vartype="member", path="service",)) - params.append(VersionedParamPath("source_hip", exclude=True,)) params[-1].add_profile( - "10.0.0", path="source-hip", vartype="member", + "10.0.0", + path="source-hip", + vartype="member", ) params.append( VersionedParamPath("source_users", vartype="member", path="source-user") ) params.append( - VersionedParamPath("url_categories", vartype="member", path="category",) + VersionedParamPath( + "url_categories", + vartype="member", + path="category", + ) + ) + params.append( + VersionedParamPath( + "group_tag", + exclude=True, + ) ) - params.append(VersionedParamPath("group_tag", exclude=True,)) params[-1].add_profile( - "9.0.0", path="group-tag", + "9.0.0", + path="group-tag", + ) + params.append( + VersionedParamPath( + "authentication_enforcement", + path="authentication-enforcement", + ) ) params.append( VersionedParamPath( - "authentication_enforcement", path="authentication-enforcement", + "timeout", + path="timeout", ) ) - params.append(VersionedParamPath("timeout", path="timeout",)) params.append( VersionedParamPath("negate_target", path="target/negate", vartype="yesno") ) diff --git a/panos/predefined.py b/panos/predefined.py index c522792d..3c8fd28d 100644 --- a/panos/predefined.py +++ b/panos/predefined.py @@ -111,10 +111,20 @@ def _refresh_application(self, name=None): ) def _refresh_service(self, name=None): - return self._refresh([(objects.ServiceObject, "service_objects", None),], name,) + return self._refresh( + [ + (objects.ServiceObject, "service_objects", None), + ], + name, + ) def _refresh_tag(self, name=None): - return self._refresh([(objects.Tag, "tag_objects", None),], name,) + return self._refresh( + [ + (objects.Tag, "tag_objects", None), + ], + name, + ) def refresh_application(self, name): """Refresh a Single Predefined Application diff --git a/panos/userid.py b/panos/userid.py index f8865ed1..4e42580f 100644 --- a/panos/userid.py +++ b/panos/userid.py @@ -718,7 +718,13 @@ def tag_user(self, user, tags, timeout=None, prefix=None): te = entry.find("./tag") break else: - entry = ET.SubElement(ru, "entry", {"user": user,}) + entry = ET.SubElement( + ru, + "entry", + { + "user": user, + }, + ) te = ET.SubElement(entry, "tag") # Now add in the tags with the specified timeout. @@ -761,7 +767,13 @@ def untag_user(self, user, tags=None, prefix=None): if entry.attrib["user"] == user: break else: - entry = ET.SubElement(uu, "entry", {"user": user,}) + entry = ET.SubElement( + uu, + "entry", + { + "user": user, + }, + ) # Do tag removal. te = entry.find("./tag") diff --git a/tests/live/conftest.py b/tests/live/conftest.py index d324367c..8d379d35 100644 --- a/tests/live/conftest.py +++ b/tests/live/conftest.py @@ -122,7 +122,10 @@ def init(): fw = random.choice(fws) panorama_fw_combinations.append( - ((pano, fw), desc(pano_version, fw_version),) + ( + (pano, fw), + desc(pano_version, fw_version), + ) ) diff --git a/tests/live/test_network.py b/tests/live/test_network.py index e62238df..f3f3ba69 100644 --- a/tests/live/test_network.py +++ b/tests/live/test_network.py @@ -6,7 +6,10 @@ class TestZoneBasic(testlib.FwFlow): def setup_state_obj(self, fw, state): - state.obj = network.Zone(testlib.random_name(), mode="layer3",) + state.obj = network.Zone( + testlib.random_name(), + mode="layer3", + ) fw.add(state.obj) def update_state_obj(self, fw, state): @@ -65,7 +68,10 @@ def create_dependencies(self, fw, state): state.parent.create() def setup_state_obj(self, fw, state): - state.obj = network.StaticMac(testlib.random_mac(), state.eths[0],) + state.obj = network.StaticMac( + testlib.random_mac(), + state.eths[0], + ) state.parent.add(state.obj) def update_state_obj(self, fw, state): @@ -102,7 +108,9 @@ def create_dependencies(self, fw, state): def setup_state_obj(self, fw, state): state.obj = network.Vlan( - testlib.random_name(), state.eths[0], state.vlan_interface.uid, + testlib.random_name(), + state.eths[0], + state.vlan_interface.uid, ) fw.add(state.obj) @@ -324,7 +332,9 @@ def create_dependencies(self, fw, state): state.eth = testlib.get_available_interfaces(fw)[0] state.parent = network.EthernetInterface( - state.eth, "layer3", ip=testlib.random_ip("/24"), + state.eth, + "layer3", + ip=testlib.random_ip("/24"), ) fw.add(state.parent) state.parent.create() @@ -368,7 +378,10 @@ def create_dependencies(self, fw, state): state.eth = None state.eth = testlib.get_available_interfaces(fw)[0] - state.parent = network.EthernetInterface(state.eth, "layer2",) + state.parent = network.EthernetInterface( + state.eth, + "layer2", + ) fw.add(state.parent) state.parent.create() @@ -376,7 +389,9 @@ def setup_state_obj(self, fw, state): tag = random.randint(1, 4000) name = "{0}.{1}".format(state.eth, tag) state.obj = network.Layer2Subinterface( - name, tag, comment="This is my L2 subinterface", + name, + tag, + comment="This is my L2 subinterface", ) state.parent.add(state.obj) @@ -861,14 +876,16 @@ def create_dependencies(self, fw, state): if self.WITH_BGP_IMPORT_RULE: state.import_rule = network.BgpPolicyImportRule( - name=testlib.random_name(), enable=True, + name=testlib.random_name(), + enable=True, ) state.bgp.add(state.import_rule) state.bgp.apply() if self.WITH_BGP_EXPORT_RULE: state.export_rule = network.BgpPolicyExportRule( - name=testlib.random_name(), enable=True, + name=testlib.random_name(), + enable=True, ) state.bgp.add(state.export_rule) state.bgp.apply() @@ -1409,7 +1426,9 @@ def setup_state_obj(self, fw, state): # 'match_afi': 'ip', # 'match_safi': 'ip', "match_route_table": "unicast", - "match_nexthop": [testlib.random_ip("/32"),], + "match_nexthop": [ + testlib.random_ip("/32"), + ], "match_from_peer": state.peer.name, "match_med": random.randint(0, 4294967295), "match_as_path_regex": "as-path-regex", @@ -1491,7 +1510,8 @@ class MakeBgpPolicyAddressPrefix(MakeVirtualRouter): def setup_state_obj(self, fw, state): state.obj = network.BgpPolicyAddressPrefix( - name=testlib.random_netmask(), exact=True, + name=testlib.random_netmask(), + exact=True, ) if self.WITH_BGP_IMPORT_RULE: state.import_rule.add(state.obj) @@ -1547,7 +1567,9 @@ def setup_state_obj(self, fw, state): ) advert.extend(prefixes) state.obj = network.BgpPolicyConditionalAdvertisement( - name=testlib.random_name(), enable=True, used_by=state.pg.name, + name=testlib.random_name(), + enable=True, + used_by=state.pg.name, ) state.obj.add(non_exist) state.obj.add(advert) @@ -1654,8 +1676,12 @@ class TestIkeCryptoProfile(testlib.FwFlow): def setup_state_obj(self, fw, state): state.obj = network.IkeCryptoProfile( testlib.random_name(), - authentication=["sha256",], - dh_group=["group1",], + authentication=[ + "sha256", + ], + dh_group=[ + "group1", + ], lifetime_minutes=42, ) fw.add(state.obj) @@ -1741,7 +1767,8 @@ def create_dependencies(self, fw, state): raise ValueError("IkeGateway not supported for version < 7.0") state.lbi = network.LoopbackInterface( - "loopback.{0}".format(random.randint(5, 20)), ipv6_enabled=True, + "loopback.{0}".format(random.randint(5, 20)), + ipv6_enabled=True, ) state.lbi.add(network.IPv6Address(testlib.random_ipv6())) state.lbi.add(network.IPv6Address(testlib.random_ipv6())) diff --git a/tests/live/test_objects.py b/tests/live/test_objects.py index c0bd5993..daa4dad9 100644 --- a/tests/live/test_objects.py +++ b/tests/live/test_objects.py @@ -29,7 +29,8 @@ def create_dependencies(self, dev, state): def setup_state_obj(self, dev, state): state.obj = objects.AddressGroup( - testlib.random_name(), [x.name for x in state.aos[:2]], + testlib.random_name(), + [x.name for x in state.aos[:2]], ) dev.add(state.obj) @@ -71,7 +72,8 @@ def setup_state_obj(self, dev, state): def update_state_obj(self, dev, state): state.obj.dynamic_value = "'{0}' and '{1}'".format( - state.tags[2].name, state.tags[3].name, + state.tags[2].name, + state.tags[3].name, ) state.obj.tag = state.tags[1].name @@ -86,7 +88,9 @@ def cleanup_dependencies(self, dev, state): class TestTag(testlib.DevFlow): def setup_state_obj(self, dev, state): state.obj = objects.Tag( - testlib.random_name(), color="color1", comments="My new tag", + testlib.random_name(), + color="color1", + comments="My new tag", ) dev.add(state.obj) diff --git a/tests/live/test_userid.py b/tests/live/test_userid.py index ead99456..7e2adc5a 100644 --- a/tests/live/test_userid.py +++ b/tests/live/test_userid.py @@ -67,7 +67,11 @@ def test_08_get_registered_ip(self, fw, state_map): test4 = set(fw.userid.get_registered_ip(ips, tags[0:5])) assert test4 == set(ips) test5 = set(fw.userid.get_registered_ip(ips[0], tags[0])) - assert test5 == set([ips[0],]) + assert test5 == set( + [ + ips[0], + ] + ) tests = [test1, test2, test3, test4, test5] assert len(test5) != 0 assert all([test1 >= x for x in tests]) diff --git a/tests/live/testlib.py b/tests/live/testlib.py index 3a23f20b..9138ea33 100644 --- a/tests/live/testlib.py +++ b/tests/live/testlib.py @@ -21,7 +21,9 @@ def random_ip(netmask=None): def random_netmask(): return "{0}.{1}.{2}.0/24".format( - random.randint(11, 150), random.randint(1, 200), random.randint(1, 200), + random.randint(11, 150), + random.randint(1, 200), + random.randint(1, 200), ) diff --git a/tests/test_base.py b/tests/test_base.py index 2ff5907b..c4fc911d 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -83,7 +83,13 @@ def test_add_without_children(self): ret_value = self.obj.add(child) self.assertEqual(child, ret_value) - self.verify_object(self.obj, name=OBJECT_NAME, children=[child,]) + self.verify_object( + self.obj, + name=OBJECT_NAME, + children=[ + child, + ], + ) self.verify_object(child, name=CHILD_NAME, parent=self.obj) def test_add_with_children(self): @@ -112,7 +118,13 @@ def test_insert_without_children(self): ret_val = self.obj.insert(0, child) self.assertEqual(child, ret_val) - self.verify_object(self.obj, name=OBJECT_NAME, children=[child,]) + self.verify_object( + self.obj, + name=OBJECT_NAME, + children=[ + child, + ], + ) self.verify_object(child, name=CHILD_NAME, parent=self.obj) def test_insert_with_children(self): @@ -214,7 +226,13 @@ def test_remove(self): ret_val = self.obj.remove(child2) self.assertIsNone(ret_val) - self.verify_object(self.obj, name=OBJECT_NAME, children=[child1,]) + self.verify_object( + self.obj, + name=OBJECT_NAME, + children=[ + child1, + ], + ) self.verify_object(child1, name=CHILD1_NAME, parent=self.obj) self.verify_object(child2, name=CHILD2_NAME) @@ -454,7 +472,9 @@ def test_apply_with_ha_sync(self, m_uid): self.assertIsNone(ret_val) m_panos.set_config_changed.assert_called_once_with() m_panos.active().xapi.edit.assert_called_once_with( - PanDeviceXpath, PanDeviceElementStr, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + PanDeviceElementStr, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath.assert_called_once_with() self.obj.element_str.assert_called_once_with() @@ -486,7 +506,9 @@ def test_apply_without_ha_sync(self, m_uid): self.assertIsNone(ret_val) m_panos.set_config_changed.assert_called_once_with() m_panos.xapi.edit.assert_called_once_with( - PanDeviceXpath, PanDeviceElementStr, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + PanDeviceElementStr, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath.assert_called_once_with() self.obj.element_str.assert_called_once_with() @@ -516,7 +538,9 @@ def test_create_with_ha_sync(self, m_uid): self.assertIsNone(ret_val) m_panos.set_config_changed.assert_called_once_with() m_panos.active().xapi.set.assert_called_once_with( - PanDeviceXpath, PanDeviceElementStr, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + PanDeviceElementStr, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath_short.assert_called_once_with() self.obj.element_str.assert_called_once_with() @@ -548,7 +572,9 @@ def test_create_without_ha_sync(self, m_uid): self.assertIsNone(ret_val) m_panos.set_config_changed.assert_called_once_with() m_panos.xapi.set.assert_called_once_with( - PanDeviceXpath, PanDeviceElementStr, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + PanDeviceElementStr, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath_short.assert_called_once_with() self.obj.element_str.assert_called_once_with() @@ -576,7 +602,8 @@ def test_delete_with_ha_sync_no_parent(self, m_uid): self.assertIsNone(ret_val) m_panos.set_config_changed.assert_called_once_with() m_panos.active().xapi.delete.assert_called_once_with( - PanDeviceXpath, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath.assert_called_once_with() for c in self.obj.children: @@ -606,7 +633,8 @@ def test_delete_with_ha_sync_and_parent(self, m_uid): self.obj.parent.remove.assert_called_once_with(self.obj) m_panos.set_config_changed.assert_called_once_with() m_panos.active().xapi.delete.assert_called_once_with( - PanDeviceXpath, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath.assert_called_once_with() for c in self.obj.children: @@ -1151,7 +1179,8 @@ def test_no_fallback_raises_value_error(self): parent = None obj = Base.ParentAwareXpath() obj.add_profile( - parents=("ParentClass1",), value="/some/path", + parents=("ParentClass1",), + value="/some/path", ) self.assertRaises(ValueError, obj._get_versioned_value, (1, 0, 0), parent) @@ -1279,7 +1308,14 @@ def test_values_are_unchanged_after_comparison(self): self.assertEqual(o2.members, ["d", "c"]) def test_str_list_field_is_equal(self): - o1 = MyVersionedObject("a", ["a",], ["c", "d"], 5) + o1 = MyVersionedObject( + "a", + [ + "a", + ], + ["c", "d"], + 5, + ) o2 = MyVersionedObject("a", "a", ["c", "d"], 5) self.assertTrue(o1.equal(o2)) diff --git a/tests/test_device_profile_xpaths.py b/tests/test_device_profile_xpaths.py index 87a14424..05bcc01a 100644 --- a/tests/test_device_profile_xpaths.py +++ b/tests/test_device_profile_xpaths.py @@ -60,9 +60,18 @@ OBJECTS = { SnmpServerProfile: [None, SnmpV2cServer, SnmpV3Server], - EmailServerProfile: [None, EmailServer,], - LdapServerProfile: [None, LdapServer,], - SyslogServerProfile: [None, SyslogServer,], + EmailServerProfile: [ + None, + EmailServer, + ], + LdapServerProfile: [ + None, + LdapServer, + ], + SyslogServerProfile: [ + None, + SyslogServer, + ], HttpServerProfile: [ None, HttpServer, diff --git a/tests/test_firewall.py b/tests/test_firewall.py index fa6d1162..e883dcb0 100644 --- a/tests/test_firewall.py +++ b/tests/test_firewall.py @@ -22,7 +22,9 @@ class TestFirewall(unittest.TestCase): def test_id_returns_serial(self): expected = "serial#" - fw = panos.firewall.Firewall(serial=expected,) + fw = panos.firewall.Firewall( + serial=expected, + ) ret_val = fw.id @@ -31,7 +33,9 @@ def test_id_returns_serial(self): def test_id_returns_hostname(self): expected = "hostName" - fw = panos.firewall.Firewall(hostname=expected,) + fw = panos.firewall.Firewall( + hostname=expected, + ) ret_val = fw.id diff --git a/tests/test_init.py b/tests/test_init.py index 722ae38d..00ea2e16 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -177,7 +177,8 @@ def test_base_is_single_key_value(self): for x in self.quotes(): self.assertEqual( - panos.string_to_xml("hello {0}world{0}".format(x), x), self._str(root), + panos.string_to_xml("hello {0}world{0}".format(x), x), + self._str(root), ) def test_base_root_with_one_key_value(self): @@ -187,7 +188,8 @@ def test_base_root_with_one_key_value(self): for x in self.quotes(): self.assertEqual( - panos.string_to_xml("foo bar {0}baz{0}".format(x), x), self._str(root), + panos.string_to_xml("foo bar {0}baz{0}".format(x), x), + self._str(root), ) def test_base_root_with_two_key_values(self): diff --git a/tests/test_integration.py b/tests/test_integration.py index 654de5c3..3dabf0e2 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -89,11 +89,26 @@ def setUp(self): self.assertEqual(self.firewall, self.address_object.parent) self.assertEqual([], self.address_object.children) self.assertEqual(self.device_group, self.firewall.parent) - self.assertEqual([self.address_object,], self.firewall.children) + self.assertEqual( + [ + self.address_object, + ], + self.firewall.children, + ) self.assertEqual(self.panorama, self.device_group.parent) - self.assertEqual([self.firewall,], self.device_group.children) + self.assertEqual( + [ + self.firewall, + ], + self.device_group.children, + ) self.assertEqual(None, self.panorama.parent) - self.assertEqual([self.device_group,], self.panorama.children) + self.assertEqual( + [ + self.device_group, + ], + self.panorama.children, + ) def test_nearest_pandevice_from_addressobject_in_pano_dg_fw_ao_chain(self): """Runs nearest_pandevice() on the AddressObject. @@ -325,7 +340,10 @@ def test_element_str_from_ethernetinterface_for_aggregate_group(self): def test_element_str_from_firewall_with_pano_parent_and_systemsettings_child(self): expected = b"".join( - [b'', b'',] + [ + b'', + b'', + ] ) fw = panos.firewall.Firewall( @@ -357,7 +375,10 @@ def test_element_str_from_firewall_without_serial_number_raises_error(self): def test_element_str_from_firewall_with_dg_pano_parents_and_multi_vsys(self): expected = b"".join( - [b'', b"",] + [ + b'', + b"", + ] ) fw = panos.firewall.Firewall( @@ -615,7 +636,11 @@ def test_set_xpath_from_arp_with_l3s_ei_fw_parents(self): def test_edit_xpath_from_firewall(self): # This is not a valid xpath, but its what should happen # if there is no parent - expected = "".join(["/devices/entry[@name='serial']",]) + expected = "".join( + [ + "/devices/entry[@name='serial']", + ] + ) fw = panos.firewall.Firewall("foo", vsys="vsys2", serial="serial") @@ -626,7 +651,11 @@ def test_edit_xpath_from_firewall(self): def test_set_xpath_from_firewall(self): # This is not a valid xpath, but its what should happen # if there is no parent - expected = "".join(["/devices",]) + expected = "".join( + [ + "/devices", + ] + ) fw = panos.firewall.Firewall("foo", vsys="vsys2", serial="serial") diff --git a/tests/test_opstate.py b/tests/test_opstate.py index f99d3276..bf579e2f 100644 --- a/tests/test_opstate.py +++ b/tests/test_opstate.py @@ -48,7 +48,9 @@ def _hit_count_fw_setup(*args): inner = "".join(ET.tostring(x, encoding="utf-8").decode("utf-8") for x in args) fw.op = mock.Mock( - return_value=ET.fromstring(HIT_COUNT_PREFIX + inner + HIT_COUNT_SUFFIX,) + return_value=ET.fromstring( + HIT_COUNT_PREFIX + inner + HIT_COUNT_SUFFIX, + ) ) rb = Rulebase() diff --git a/tests/test_params.py b/tests/test_params.py index 4073ae2e..e2231514 100644 --- a/tests/test_params.py +++ b/tests/test_params.py @@ -17,22 +17,74 @@ def _setup(self): params = [] - params.append(VersionedParamPath("uuid", vartype="attrib", path="uuid",),) - params.append(VersionedParamPath("size", vartype="int", path="size",),) - params.append(VersionedParamPath("listing", vartype="member", path="listing",),) - params.append(VersionedParamPath("pb1", vartype="exist", path="pb1",),) - params.append(VersionedParamPath("pb2", vartype="exist", path="pb2",),) - params.append(VersionedParamPath("live", vartype="yesno", path="live",),) params.append( - VersionedParamPath("disabled", vartype="yesno", path="disabled",), + VersionedParamPath( + "uuid", + vartype="attrib", + path="uuid", + ), + ) + params.append( + VersionedParamPath( + "size", + vartype="int", + path="size", + ), + ) + params.append( + VersionedParamPath( + "listing", + vartype="member", + path="listing", + ), + ) + params.append( + VersionedParamPath( + "pb1", + vartype="exist", + path="pb1", + ), + ) + params.append( + VersionedParamPath( + "pb2", + vartype="exist", + path="pb2", + ), + ) + params.append( + VersionedParamPath( + "live", + vartype="yesno", + path="live", + ), + ) + params.append( + VersionedParamPath( + "disabled", + vartype="yesno", + path="disabled", + ), + ) + params.append( + VersionedParamPath( + "uuid2", + vartype="attrib", + path="level-2/uuid", + ), ) params.append( - VersionedParamPath("uuid2", vartype="attrib", path="level-2/uuid",), + VersionedParamPath( + "age", + vartype="int", + path="level-2/age", + ), ) - params.append(VersionedParamPath("age", vartype="int", path="level-2/age",),) params.append( VersionedParamPath( - "interfaces", vartype="member", path="level-2/interface", + "interfaces", + vartype="member", + path="level-2/interface", ), ) @@ -84,7 +136,8 @@ def _refresh_xml(): # int at base level def test_render_int(): _verify_render( - FakeObject("test", size=5), '5', + FakeObject("test", size=5), + '5', ) @@ -111,7 +164,8 @@ def test_parse_member(): # exist at base level def test_render_exist(): _verify_render( - FakeObject("test", pb1=True), '', + FakeObject("test", pb1=True), + '', ) @@ -139,7 +193,8 @@ def test_parse_yesno(): # attrib def test_render_attrib(): _verify_render( - FakeObject("test", uuid="123-456"), '', + FakeObject("test", uuid="123-456"), + '', ) diff --git a/tests/test_predefined.py b/tests/test_predefined.py index 9ea6eded..1a1b913a 100644 --- a/tests/test_predefined.py +++ b/tests/test_predefined.py @@ -45,9 +45,13 @@ ( """//*[contains(local-name(), "application")]/entry[@name='{0}']""", '//*[contains(local-name(), "application")]/entry', - ApplicationContainer(name="ap container 1", applications=["func1", "func2"],), ApplicationContainer( - name="application container deux", applications=["a", "la", "mode"], + name="ap container 1", + applications=["func1", "func2"], + ), + ApplicationContainer( + name="application container deux", + applications=["a", "la", "mode"], ), ), ( @@ -97,8 +101,16 @@ ( None, "/tag/entry", - Tag(name="foo", color="color1", comments="First color",), - Tag(name="bar", color="color42", comments="Another color for another time",), + Tag( + name="foo", + color="color1", + comments="First color", + ), + Tag( + name="bar", + color="color42", + comments="Another color for another time", + ), ), ) @@ -142,7 +154,11 @@ def _fw(*args): prefix = "" suffix = "" inner = "".join(x.element_str().decode("utf-8") for x in args) - fw.xapi.get = mock.Mock(return_value=ET.fromstring(prefix + inner + suffix,)) + fw.xapi.get = mock.Mock( + return_value=ET.fromstring( + prefix + inner + suffix, + ) + ) return fw diff --git a/tests/test_userid.py b/tests/test_userid.py index 0f4af6a1..a3a2052a 100644 --- a/tests/test_userid.py +++ b/tests/test_userid.py @@ -67,8 +67,18 @@ def test_batch_tag_user(self): ) fw.xapi fw.userid.batch_start() - fw.userid.tag_user("user1", ["tag1",]) - fw.userid.tag_user("user2", ["tag1",]) + fw.userid.tag_user( + "user1", + [ + "tag1", + ], + ) + fw.userid.tag_user( + "user2", + [ + "tag1", + ], + ) def test_batch_untag_user(self): fw = panos.firewall.Firewall( @@ -76,8 +86,18 @@ def test_batch_untag_user(self): ) fw.xapi fw.userid.batch_start() - fw.userid.untag_user("user1", ["tag1",]) - fw.userid.untag_user("user2", ["tag1",]) + fw.userid.untag_user( + "user1", + [ + "tag1", + ], + ) + fw.userid.untag_user( + "user2", + [ + "tag1", + ], + ) if __name__ == "__main__": diff --git a/tests/test_versioning.py b/tests/test_versioning.py index 317df1b5..b498fffb 100644 --- a/tests/test_versioning.py +++ b/tests/test_versioning.py @@ -39,7 +39,8 @@ def test_empty_objects_are_equal(self): raise unittest.SkipTest("OLD_CLS does not have element_str()") self.assertEqual( - old.element_str(), new.element_str(), + old.element_str(), + new.element_str(), ) def test_positionally_populated_objects_are_equal(self): @@ -56,7 +57,8 @@ def test_positionally_populated_objects_are_equal(self): raise unittest.SkipTest("OLD_CLS does not have element_str()") self.assertEqual( - old.element_str(), new.element_str(), + old.element_str(), + new.element_str(), ) def test_keyword_populated_objects_are_equal(self): @@ -73,7 +75,8 @@ def test_keyword_populated_objects_are_equal(self): raise unittest.SkipTest("OLD_CLS does not have element_str()") self.assertEqual( - old.element_str(), new.element_str(), + old.element_str(), + new.element_str(), ) def test_parsing_old_elmstring_works(self):