Skip to content

Add LoWPAN enhancements #3556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scapy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ class Conf(ConfClass):
'llmnr',
'lltd',
'mgcp',
'mle',
'mobileip',
'netbios',
'netflow',
Expand Down
197 changes: 191 additions & 6 deletions scapy/layers/dot15d4.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Copyright (C) Ryan Speers <[email protected]> 2011-2012
# Copyright (C) Roger Meyer <[email protected]>: 2012-03-10 Added frames
# Copyright (C) Gabriel Potter <[email protected]>: 2018
# Copyright (C) 2020 Dimitrios-Georgios Akestoridis <[email protected]>
# Copyright (C) 2020, 2022 Dimitrios-Georgios Akestoridis <[email protected]>
# This program is published under a GPLv2 license

"""
Expand Down Expand Up @@ -36,6 +36,7 @@
XByteField,
XLEIntField,
XLEShortField,
XStrField,
)

# Fields #
Expand Down Expand Up @@ -233,9 +234,36 @@ class Dot15d4Data(Packet):
lambda pkt:pkt.underlayer.getfieldval("fcf_srcaddrmode") != 0), # noqa: E501
# Security field present if fcf_security == True
ConditionalField(PacketField("aux_sec_header", Dot15d4AuxSecurityHeader(), Dot15d4AuxSecurityHeader), # noqa: E501
lambda pkt:pkt.underlayer.getfieldval("fcf_security") is True), # noqa: E501
lambda pkt:pkt.underlayer.getfieldval("fcf_security")), # noqa: E501
# Secured Payload (variable length)
ConditionalField(
XStrField("sec_payload", ""),
lambda pkt:pkt.underlayer.getfieldval("fcf_security"),
),
# Message Integrity Code (variable length)
ConditionalField(
XStrField("mic", ""),
lambda pkt:pkt.underlayer.getfieldval("fcf_security"),
),
]

def post_dissect(self, s):
if self.underlayer.getfieldval("fcf_security"):
if self.aux_sec_header.sec_sc_seclevel in {1, 5}:
mic_length = 4
elif self.aux_sec_header.sec_sc_seclevel in {2, 6}:
mic_length = 8
elif self.aux_sec_header.sec_sc_seclevel in {3, 7}:
mic_length = 16
else:
mic_length = 0
self.sec_payload = bytes(self.aux_sec_header.payload)
self.aux_sec_header.remove_payload()
if mic_length > 0 and mic_length <= len(self.sec_payload):
self.mic = self.sec_payload[-mic_length:]
self.sec_payload = self.sec_payload[:-mic_length]
return s

def guess_payload_class(self, payload):
# TODO: See how it's done in wireshark:
# https://github.com/wireshark/wireshark/blob/93c60b3b7c801dddd11d8c7f2a0ea4b7d02d700a/epan/dissectors/packet-ieee802154.c#L2061 # noqa: E501
Expand Down Expand Up @@ -268,7 +296,7 @@ class Dot15d4Beacon(Packet):
dot15d4AddressField("src_addr", None, length_of="fcf_srcaddrmode"),
# Security field present if fcf_security == True
ConditionalField(PacketField("aux_sec_header", Dot15d4AuxSecurityHeader(), Dot15d4AuxSecurityHeader), # noqa: E501
lambda pkt:pkt.underlayer.getfieldval("fcf_security") is True), # noqa: E501
lambda pkt:pkt.underlayer.getfieldval("fcf_security")), # noqa: E501

# Superframe spec field:
BitField("sf_sforder", 15, 4), # not used by ZigBee
Expand Down Expand Up @@ -303,12 +331,137 @@ class Dot15d4Beacon(Packet):
FieldListField("pa_long_addresses", [],
dot15d4AddressField("", 0, adjust=lambda pkt, x: 8),
count_from=lambda pkt: pkt.pa_num_long),
# TODO beacon payload
# Secured Payload (variable length)
ConditionalField(
XStrField("sec_payload", ""),
lambda pkt:pkt.underlayer.getfieldval("fcf_security"),
),
# Message Integrity Code (variable length)
ConditionalField(
XStrField("mic", ""),
lambda pkt:pkt.underlayer.getfieldval("fcf_security"),
),
]

def post_dissect(self, s):
if self.underlayer.getfieldval("fcf_security"):
if self.aux_sec_header.sec_sc_seclevel in {1, 5}:
mic_length = 4
elif self.aux_sec_header.sec_sc_seclevel in {2, 6}:
mic_length = 8
elif self.aux_sec_header.sec_sc_seclevel in {3, 7}:
mic_length = 16
else:
mic_length = 0
self.sec_payload = bytes(self.aux_sec_header.payload)
self.aux_sec_header.remove_payload()
if mic_length > 0 and mic_length <= len(self.sec_payload):
self.mic = self.sec_payload[-mic_length:]
self.sec_payload = self.sec_payload[:-mic_length]
i = 0
if len(self.sec_payload) > i:
if isinstance(self.sec_payload[i], str):
tmp_val = struct.unpack(">B", self.sec_payload[i])[0]
else:
tmp_val = self.sec_payload[i]
self.sf_sforder = (tmp_val >> 4) & 0b1111
self.sf_beaconorder = tmp_val & 0b1111
i += 1
if len(self.sec_payload) > i:
if isinstance(self.sec_payload[i], str):
tmp_val = struct.unpack(">B", self.sec_payload[i])[0]
else:
tmp_val = self.sec_payload[i]
self.sf_assocpermit = (tmp_val >> 7) & 0b1
self.sf_pancoord = (tmp_val >> 6) & 0b1
self.sf_reserved = (tmp_val >> 5) & 0b1
self.sf_battlifeextend = (tmp_val >> 4) & 0b1
self.sf_finalcapslot = tmp_val & 0b1111
i += 1
if len(self.sec_payload) > i:
if isinstance(self.sec_payload[i], str):
tmp_val = struct.unpack(">B", self.sec_payload[i])[0]
else:
tmp_val = self.sec_payload[i]
self.gts_spec_permit = (tmp_val >> 7) & 0b1
self.gts_spec_reserved = (tmp_val >> 3) & 0b1111
self.gts_spec_desccount = tmp_val & 0b111
i += 1
if self.gts_spec_desccount != 0 and len(self.sec_payload) > i:
if isinstance(self.sec_payload[i], str):
tmp_val = struct.unpack(">B", self.sec_payload[i])[0]
else:
tmp_val = self.sec_payload[i]
self.gts_dir_reserved = (tmp_val >> 7) & 0b1
self.gts_dir_mask = tmp_val & 0b1111111
i += 1
# TODO: Update the GTS List field
if len(self.sec_payload) > i:
if isinstance(self.sec_payload[i], str):
tmp_val = struct.unpack(">B", self.sec_payload[i])[0]
else:
tmp_val = self.sec_payload[i]
self.pa_reserved_1 = (tmp_val >> 7) & 0b1
self.pa_num_long = (tmp_val >> 4) & 0b111
self.pa_reserved_2 = (tmp_val >> 3) & 0b1
self.pa_num_short = tmp_val & 0b111
i += 1
for _ in range(self.pa_num_short):
if len(self.sec_payload) > i + 1:
if isinstance(self.sec_payload[i], str):
tmp_val = (
struct.unpack("<H", self.sec_payload[i:i + 2])[0]
)
else:
tmp_val = (
self.sec_payload[i] +
(self.sec_payload[i + 1] << 8)
)
self.pa_short_addresses.append(tmp_val)
i += 2
for _ in range(self.pa_num_long):
if len(self.sec_payload) > i + 7:
if isinstance(self.sec_payload[i], str):
tmp_val = (
struct.unpack("<Q", self.sec_payload[i:i + 8])[0]
)
else:
tmp_val = (
self.sec_payload[i] +
(self.sec_payload[i + 1] << 8) +
(self.sec_payload[i + 2] << 16) +
(self.sec_payload[i + 3] << 24) +
(self.sec_payload[i + 4] << 32) +
(self.sec_payload[i + 5] << 40) +
(self.sec_payload[i + 6] << 48) +
(self.sec_payload[i + 7] << 56)
)
self.pa_long_addresses.append(tmp_val)
i += 8
self.sec_payload = self.sec_payload[i:]
return s

def mysummary(self):
return self.sprintf("802.15.4 Beacon ( %Dot15d4Beacon.src_panid%:%Dot15d4Beacon.src_addr% ) assocPermit(%Dot15d4Beacon.sf_assocpermit%) panCoord(%Dot15d4Beacon.sf_pancoord%)") # noqa: E501

def guess_payload_class(self, payload):
from scapy.layers.sixlowpan import ThreadBeacon
from scapy.layers.zigbee import ZigBeeBeacon
if len(payload) > 0:
if (
(isinstance(payload[0], int) and payload[0] == 0x03) or
(isinstance(payload[0], bytes) and payload[0] == b'\x03')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use orb(payload[0]) == 0x03 to work on all Python versions (from scapy.compat import orb)

):
# https://gitlab.com/wireshark/wireshark/-/blob/5ecb57cb9026cebf0cfa4918c4a86942620c5ecf/epan/dissectors/packet-thread.c#L2223-2225 # noqa: E501
return ThreadBeacon
elif (
(isinstance(payload[0], int) and payload[0] == 0x00) or
(isinstance(payload[0], bytes) and payload[0] == b'\x00')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

):
# https://gitlab.com/wireshark/wireshark/-/blob/5ecb57cb9026cebf0cfa4918c4a86942620c5ecf/epan/dissectors/packet-zbee-nwk.c#L1507-1509 # noqa: E501
return ZigBeeBeacon
return Packet.guess_payload_class(self, payload)


class Dot15d4Cmd(Packet):
name = "802.15.4 Command"
Expand All @@ -323,7 +476,7 @@ class Dot15d4Cmd(Packet):
lambda pkt:pkt.underlayer.getfieldval("fcf_srcaddrmode") != 0), # noqa: E501
# Security field present if fcf_security == True
ConditionalField(PacketField("aux_sec_header", Dot15d4AuxSecurityHeader(), Dot15d4AuxSecurityHeader), # noqa: E501
lambda pkt:pkt.underlayer.getfieldval("fcf_security") is True), # noqa: E501
lambda pkt:pkt.underlayer.getfieldval("fcf_security")), # noqa: E501
ByteEnumField("cmd_id", 0, {
1: "AssocReq", # Association request
2: "AssocResp", # Association response
Expand All @@ -336,9 +489,41 @@ class Dot15d4Cmd(Packet):
9: "GTSReq" # GTS request
# 0x0a - 0xff reserved
}),
# TODO command payload
# Secured Payload (variable length)
ConditionalField(
XStrField("sec_payload", ""),
lambda pkt: pkt.underlayer.getfieldval("fcf_security"),
),
# Message Integrity Code (variable length)
ConditionalField(
XStrField("mic", ""),
lambda pkt: pkt.underlayer.getfieldval("fcf_security"),
),
]

def post_dissect(self, s):
if self.underlayer.getfieldval("fcf_security"):
if self.aux_sec_header.sec_sc_seclevel in {1, 5}:
mic_length = 4
elif self.aux_sec_header.sec_sc_seclevel in {2, 6}:
mic_length = 8
elif self.aux_sec_header.sec_sc_seclevel in {3, 7}:
mic_length = 16
else:
mic_length = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a lot of code that is re-used. Could you exerternalize it to a function?

self.sec_payload = bytes(self.aux_sec_header.payload)
self.aux_sec_header.remove_payload()
if mic_length > 0 and mic_length <= len(self.sec_payload):
self.mic = self.sec_payload[-mic_length:]
self.sec_payload = self.sec_payload[:-mic_length]
if len(self.sec_payload) > 0:
if isinstance(self.sec_payload[0], str):
self.cmd_id = struct.unpack(">B", self.sec_payload[0])[0]
else:
self.cmd_id = self.sec_payload[0]
self.sec_payload = self.sec_payload[1:]
return s

def mysummary(self):
return self.sprintf("802.15.4 Command %Dot15d4Cmd.cmd_id% ( %Dot15dCmd.src_panid%:%Dot15d4Cmd.src_addr% -> %Dot15d4Cmd.dest_panid%:%Dot15d4Cmd.dest_addr% )") # noqa: E501

Expand Down
117 changes: 117 additions & 0 deletions scapy/layers/mle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# This file is part of Scapy
# See http://www.secdev.org/projects/scapy for more information
# Copyright (C) 2022 Dimitrios-Georgios Akestoridis <[email protected]>
# This program is published under a GPLv2 license

"""
MLE (Mesh Link Establishment)
=============================

For more information, please refer to the following files:

* https://datatracker.ietf.org/doc/html/draft-ietf-6lo-mesh-link-establishment-00

* https://doi.org/10.1109/IEEESTD.2006.232110

* https://gitlab.com/wireshark/wireshark/-/blob/5ecb57cb9026cebf0cfa4918c4a86942620c5ecf/epan/dissectors/packet-mle.c

""" # noqa: E501

from scapy.fields import (
ByteEnumField,
ConditionalField,
PacketField,
XStrField,
)
from scapy.layers.dot15d4 import Dot15d4AuxSecurityHeader
from scapy.layers.inet import UDP
from scapy.packet import (
Packet,
bind_layers,
)


class MLE(Packet):
name = "Mesh Link Establishment"
fields_desc = [
ByteEnumField(
"sec_suite",
0,
{
0: "IEEE 802.15.4 Security (0)",
255: "No Security (255)",
},
),
ConditionalField(
PacketField(
"aux_sec_header",
Dot15d4AuxSecurityHeader(),
Dot15d4AuxSecurityHeader,
),
lambda pkt: pkt.sec_suite == 0,
),
ConditionalField(
XStrField("sec_payload", ""),
lambda pkt: pkt.sec_suite == 0,
),
ConditionalField(
XStrField("mic", ""),
lambda pkt: pkt.sec_suite == 0,
),
]

def post_dissect(self, s):
if self.sec_suite == 0:
if self.aux_sec_header.sec_sc_seclevel in {1, 5}:
mic_length = 4
elif self.aux_sec_header.sec_sc_seclevel in {2, 6}:
mic_length = 8
elif self.aux_sec_header.sec_sc_seclevel in {3, 7}:
mic_length = 16
else:
mic_length = 0
self.sec_payload = bytes(self.aux_sec_header.payload)
self.aux_sec_header.remove_payload()
if mic_length > 0 and mic_length <= len(self.sec_payload):
self.mic = self.sec_payload[-mic_length:]
self.sec_payload = self.sec_payload[:-mic_length]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same: you could reuse the function defined above.

return s

def guess_payload_class(self, payload):
if self.sec_suite == 255:
return MLECmd
return Packet.guess_payload_class(self, payload)


class MLECmd(Packet):
name = "Mesh Link Establishment Command"
fields_desc = [
ByteEnumField(
"cmd_type",
0,
{
0: "Link Request (0)",
1: "Link Accept (1)",
2: "Link Accept and Request (2)",
3: "Link Reject (3)",
4: "Advertisement (4)",
5: "Update (5)",
6: "Update Request (6)",
7: "Data Request (7)",
8: "Data Response (8)",
9: "Parent Request (9)",
10: "Parent Response (10)",
11: "Child ID Request (11)",
12: "Child ID Response (12)",
13: "Child Update Request (13)",
14: "Child Update Response (14)",
15: "Announce (15)",
16: "Discovery Request (16)",
17: "Discovery Response (17)",
},
),
# TODO: Dissect the command payload
]


bind_layers(UDP, MLE, sport=19788, dport=19788)
Loading