-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchecker.py
More file actions
executable file
·97 lines (80 loc) · 4.23 KB
/
checker.py
File metadata and controls
executable file
·97 lines (80 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import argparse
import sys
from apa.classes.IPRange import Firewall
from apa.classes.NetworkEntities import Ports, Protocol
from apa.classes.policy import Connection, Check, CaR
from apa.main import get_firewall_from_csv
from utils.parser import parse_path
from utils.printer import dump
from utils.setter import set_paths_from, handle_arguments
from utils.validators import validate_protocols, validate_prefix
def main(firewall: Firewall = None):
args: argparse.Namespace = get_arguments()
set_paths_from(args)
connection = Connection(source=validate_prefix(args.source), destination=validate_prefix(args.destination),
ports=Ports(args.ports), protocols=validate_protocols(args.protocols))
check: Check = Check(connection)
firewall: Firewall = get_firewall_from_csv(firewall)
check.outbound.result = firewall.outbound_allows(check.outbound.connection)
check.firewall.result = firewall.firewall_allows(check.firewall.connection)
check.inbound.result = firewall.inbound_allows(check.inbound.connection)
dump_output(args, check)
sys.exit(1 if check.get_result() else 0)
def dump_output(args: argparse.Namespace, check: Check) -> None:
"""
Print text result to the console or file based on given user argument
:param check: aggregation of all connections and their checker results
:param args: user arguments
:return: None
"""
ports = args.ports.replace(",", ", ") + (" ports" if len(args.ports.split(",")) > 1 else " port")
protocols = args.protocols.replace(",", ", ") + (
" protocols" if len(args.protocols.split(",")) > 1 else " protocol")
possible = "allowed" if check.get_result() else "denied"
text = (f"Connection from {check.get_source_ip()} to {check.get_destination_ip()} using "
f"{protocols} via {ports} is {possible} because of:")
output_dir = handle_arguments(args)
source = str(args.source).replace("/", "prefix")
destination = str(args.destination).replace("/", "prefix")
path: str = output_dir + f"/{source}to{destination}using{args.protocols}via{args.ports}.txt" if output_dir else None
dump(str(check.get_result()), path)
dump(text, path)
for attribute, value in check.__dict__.items(): # iterate over Outbound, Firewall and Inbound
dump_used_rules(value, path)
def dump_used_rules(connection_and_result: CaR, path: str):
"""
Format used rules for connection and print them to the specified path
:param connection_and_result: connection and its result
:param path: where to print applied rules
:return: None
"""
dump(connection_and_result.name + ":", path)
if not connection_and_result.connection.applied_rules:
if connection_and_result.result:
dump(" Communication outside of virtual network -> default allow", path)
else:
dump(" No allow rule for this connection -> default deny", path)
for rule in connection_and_result.connection.applied_rules:
allow = "allows" if rule.allow else "denies"
port = "" if Protocol.ICMP in rule.protocols else (str(rule.destination_ports) + " " + ("port" if len(
rule.destination_ports) == 1 else "ports")) + " and "
applied_rule = (f" Rule: {rule.name} {allow} {port}{", ".join(map(str, sorted(rule.protocols)))} "
f"from {rule.source.addresses.ip_addresses} to {rule.destination.addresses.ip_addresses}")
dump(applied_rule, path)
def get_arguments() -> argparse.Namespace:
"""
Parse user given arguments for checker feature. Source and destination are obligatory and ports and protocols are
optional
:return: Parsed arguments
"""
parser = argparse.ArgumentParser(
description='Check connection between source and destination IP with selected parameters')
parse_path(parser)
parser.add_argument('source', type=str, help='source IP address or network')
parser.add_argument('destination', type=str, help='destination IP address or network')
parser.add_argument('--ports', type=str, default="*", help='Required open ports')
parser.add_argument('--protocols', type=str, default="Any", help='Required open ports')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()