-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Ensure that packet is set to p param to type of bytes. In certain sit… #3909
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
3d593d0
56f0304
67e03f4
6cc34d8
9ba88ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
from scapy.config import conf, ConfClass | ||
from scapy.compat import orb, chb | ||
from scapy.error import log_runtime | ||
from scapy.volatile import RandShort, RandString, RandByte, RandIP, RandNum | ||
|
||
|
||
# | ||
|
@@ -101,7 +102,7 @@ def h2i(self, pkt, h): | |
return int(mask), ip | ||
|
||
def i2h(self, pkt, i): | ||
""""Internal" representation to "human" representation | ||
"""Internal" representation to "human" representation | ||
(x.x.x.x/y).""" | ||
mask, ip = i | ||
return ip + "/" + str(mask) | ||
|
@@ -114,28 +115,55 @@ def i2len(self, pkt, i): | |
return self.mask2iplen(mask) + 1 | ||
|
||
def i2m(self, pkt, i): | ||
""""Internal" (IP as bytes, mask as int) to "machine" | ||
"""Internal" (IP as bytes, mask as int) to "machine" | ||
representation.""" | ||
mask, ip = i | ||
ip = socket.inet_aton(ip) | ||
ip = socket.inet_aton(str(ip)) | ||
return struct.pack(">B", mask) + ip[:self.mask2iplen(mask)] | ||
|
||
def addfield(self, pkt, s, val): | ||
return s + self.i2m(pkt, val) | ||
|
||
def getfield(self, pkt, s): | ||
""" | ||
:param pkt: BGPNLRI_IPv4 | ||
:param s: bytes byte 0: network prefix, byte 1+: network | ||
|
||
example: | ||
s = b'\x18\x03\x03\x03' 24/3.3.3 | ||
""" | ||
length = self.mask2iplen(orb(s[0])) + 1 | ||
return s[length:], self.m2i(pkt, s[:length]) | ||
|
||
def m2i(self, pkt, m): | ||
'''TODO: Need to figure out a way to prevent inet_ntoa errors when it comes | ||
to fuzzing exercises.''' | ||
mask = orb(m[0]) | ||
mask2iplen_res = self.mask2iplen(mask) | ||
ip = b"".join(m[i + 1:i + 2] if i < mask2iplen_res else b"\x00" for i in range(4)) # noqa: E501 | ||
|
||
ip = b"".join( | ||
m[i + 1:i + 2] if i < mask2iplen_res and m[i + 1:i + 2] else b"\x00" for i in range(4)) | ||
# Pad network address with 0's to make valid ip address | ||
return (mask, socket.inet_ntoa(ip)) | ||
|
||
def randval(self): | ||
# TODO: replace IPField with RandString() | ||
return (RandNetMask(0, 32), RandIP()) | ||
|
||
|
||
class RandNetMask(RandNum): | ||
def __init__(self, min=0, max=2**8 - 1): | ||
# type: (int, int) -> None | ||
""" | ||
:param int min: Minimum range for random value. | ||
:param int max: Maximum range for random value. | ||
""" | ||
RandNum.__init__(self, min, max) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use |
||
|
||
|
||
class BGPFieldIPv6(Field): | ||
"""IPv6 Field (CIDR)""" | ||
# TODO: Add support for fuzz() | ||
|
||
def mask2iplen(self, mask): | ||
"""Get the IP field mask length (in bytes).""" | ||
|
@@ -147,7 +175,7 @@ def h2i(self, pkt, h): | |
return int(mask), ip | ||
|
||
def i2h(self, pkt, i): | ||
""""Internal" representation to "human" representation.""" | ||
"""Internal" representation to "human" representation.""" | ||
mask, ip = i | ||
return ip + "/" + str(mask) | ||
|
||
|
@@ -159,7 +187,7 @@ def i2len(self, pkt, i): | |
return self.mask2iplen(mask) + 1 | ||
|
||
def i2m(self, pkt, i): | ||
""""Internal" (IP as bytes, mask as int) to "machine" representation.""" # noqa: E501 | ||
"""Internal" (IP as bytes, mask as int) to "machine" representation.""" # noqa: E501 | ||
mask, ip = i | ||
ip = pton_ntop.inet_pton(socket.AF_INET6, ip) | ||
return struct.pack(">B", mask) + ip[:self.mask2iplen(mask)] | ||
|
@@ -449,7 +477,7 @@ class BGPHeader(Packet): | |
0xffffffffffffffffffffffffffffffff, | ||
0x80 | ||
), | ||
ShortField("len", None), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is very not scapy-like. Don't do that. It is commonly understood that |
||
ShortField("len", 0), # default = 0, enables field for fuzz() | ||
ByteEnumField("type", 4, _bgp_message_types) | ||
] | ||
|
||
|
@@ -485,7 +513,7 @@ def _bgp_dispatcher(payload): | |
cls = _get_cls("BGPHeader", conf.raw_layer) | ||
|
||
else: | ||
if len(payload) >= _BGP_HEADER_SIZE and\ | ||
if len(payload) >= _BGP_HEADER_SIZE and \ | ||
payload[:16] == _BGP_HEADER_MARKER: | ||
|
||
# Get BGP message type | ||
|
@@ -633,7 +661,7 @@ class _BGPCapability_metaclass(_BGPCap_metaclass, Packet_metaclass): | |
pass | ||
|
||
|
||
class BGPCapability(Packet, metaclass=_BGPCapability_metaclass): | ||
class BGPCapability(six.with_metaclass(_BGPCapability_metaclass, Packet)): # type: ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We just dropped Python 2 support so you shouldn't need to use |
||
""" | ||
Generic BGP capability. | ||
""" | ||
|
@@ -677,7 +705,7 @@ class BGPCapGeneric(BGPCapability): | |
fields_desc = [ | ||
ByteEnumField("code", 0, _capabilities), | ||
FieldLenField("length", None, fmt="B", length_of="cap_data"), | ||
StrLenField("cap_data", '', | ||
StrLenField("cap_data", b'', | ||
length_from=lambda p: p.length, max_length=255), | ||
] | ||
|
||
|
@@ -888,14 +916,14 @@ class BGPAuthenticationInformation(Packet): | |
# Optional Parameter. | ||
# | ||
|
||
|
||
class BGPOptParamPacketListField(PacketListField): | ||
""" | ||
PacketListField handling the optional parameters (OPEN message). | ||
""" | ||
|
||
def getfield(self, pkt, s): | ||
lst = [] | ||
ret = b'' | ||
|
||
length = 0 | ||
if self.length_from is not None: | ||
|
@@ -954,7 +982,7 @@ def post_build(self, p, pay): | |
length = len(p) - \ | ||
2 # parameter type (1 byte) - parameter length (1 byte) | ||
packet = p[:1] + chb(length) | ||
if (self.param_type == 2 and self.param_value is not None) or\ | ||
if (self.param_type == 2 and self.param_value is not None) or \ | ||
(self.param_type == 1 and self.authentication_data is not None): # noqa: E501 | ||
packet = packet + p[2:] | ||
|
||
|
@@ -1789,8 +1817,9 @@ def __init__(self, name, default, enum_from=None): | |
self.enum_from = enum_from | ||
|
||
def i2repr(self, pkt, i): | ||
enum = self.enum_from(pkt) | ||
return enum.get(i, i) | ||
if self.enum_from: | ||
enum = self.enum_from(pkt) | ||
return enum.get(i, i) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. else? |
||
|
||
|
||
class BGPPAExtCommunity(Packet): | ||
|
@@ -2127,7 +2156,7 @@ def post_build(self, p, pay): | |
if extended_length: | ||
packet = packet + p[2:4] | ||
else: | ||
packet = packet + p[2] | ||
packet = packet + p[2].to_bytes(1, byteorder='big') | ||
else: | ||
if extended_length: | ||
packet = packet + struct.pack("!H", length) | ||
|
@@ -2157,9 +2186,9 @@ class BGPUpdate(BGP): | |
|
||
name = "UPDATE" | ||
fields_desc = [ | ||
FieldLenField( | ||
BGPFieldLenField( | ||
"withdrawn_routes_len", | ||
None, | ||
0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same. Leave |
||
length_of="withdrawn_routes", | ||
fmt="!H" | ||
), | ||
|
@@ -2169,9 +2198,9 @@ class BGPUpdate(BGP): | |
"IPv4", | ||
length_from=lambda p: p.withdrawn_routes_len | ||
), | ||
FieldLenField( | ||
BGPFieldLenField( | ||
"path_attr_len", | ||
None, | ||
0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same |
||
length_of="path_attr", | ||
fmt="!H" | ||
), | ||
|
@@ -2185,8 +2214,10 @@ class BGPUpdate(BGP): | |
] | ||
|
||
def post_build(self, p, pay): | ||
# type: (bytes, bytes) -> bytes | ||
subpacklen = lambda p: len(p) | ||
packet = "" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch. Could you add a unit test? |
||
packet = p | ||
|
||
if self.withdrawn_routes_len is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
...otherwise this won't work |
||
wl = sum(map(subpacklen, self.withdrawn_routes)) | ||
packet = p[:0] + struct.pack("!H", wl) + p[2:] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? (add a comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inet_aton expects a string representation of an ip address.
I'm ensuring that is the case, and any errors will make this obvious.