Skip to content

Commit 7f338c2

Browse files
committed
Refactored framework and added a FIFO and SRPT
I changed and refactored the framework; for now, it breaks the current scheduling algorithms. I have provided two new example schedulers, FIFO and SRPT, to show how the new framework works. I split the data structures and the scheduling algorithms to make it more transparent which part is which. I also removed the packet awareness from the queues to make them able to have references to packets, flows, or queues without needing helpers. This change makes it easier to create hierarchical schedulers and to track flows in schedulers like SRPT. Signed-off-by: Frey Alfredsson <[email protected]>
1 parent f23005b commit 7f338c2

File tree

3 files changed

+197
-26
lines changed

3 files changed

+197
-26
lines changed

queue-exp/pifo_fifo.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55
#
66
# pifo-fifo.py
77

8+
"""First in, first out (FIFO)
9+
10+
The FIFO scheduling algorithm preserves the order of the scheduled packets. This
11+
implementation is here for completeness and uses a PIFO. It is here to help
12+
people understand how to add new scheduling algorithms to this framework.
13+
"""
14+
815
__copyright__ = """
916
Copyright (c) 2021, Toke Høiland-Jørgensen <[email protected]>
1017
Copyright (c) 2021, Frey Alfredsson <[email protected]>
@@ -26,11 +33,28 @@
2633
"""
2734

2835
from pifo_lib import Packet, Runner, Pifo
36+
from pifo_lib import SchedulingAlgorithm
37+
38+
39+
class Fifo(SchedulingAlgorithm):
40+
"""First in, first out (FIFO)"""
2941

42+
def __init__(self):
43+
self._pifo = Pifo()
44+
45+
def enqueue(self, item):
46+
rank = self.get_rank(item)
47+
self._pifo.enqueue(item, rank)
3048

31-
class Fifo(Pifo):
3249
def get_rank(self, item):
33-
return self.qlen
50+
return self._pifo.qlen
51+
52+
def dequeue(self):
53+
return self._pifo.dequeue()
54+
55+
def dump(self):
56+
self._pifo.dump()
57+
3458

3559
if __name__ == "__main__":
3660
pkts = [

queue-exp/pifo_lib.py

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,32 +27,74 @@ def __init__(self, flow, idn, length=1):
2727
self.flow = flow
2828
self.idn = idn
2929
self.length = length
30-
self.rank = 0
3130

3231
def __repr__(self):
3332
return f"P(F:{self.flow}, I:{self.idn}, L:{self.length})"
3433

3534

3635
class Runner:
37-
def __init__(self, pkts, queue):
36+
"""This class is responsible for running a test on a packet scheduling
37+
algorithm. It is accountable for enquing and dequeing packets. For now, it
38+
does so by dequing as many packets as it enqued. In the next iteration, when
39+
we add pacing, it will need to handle virtual time cycling.
40+
"""
41+
42+
def __init__(self, pkts, scheduler):
3843
self.input_pkts = pkts
39-
self.queue = queue
44+
self.scheduler = scheduler
4045

4146
def run(self):
42-
print(f"Running with queue: {self.queue}")
43-
print(" Inserting packets into queue:")
47+
print(f"Running with scheduler: {self.scheduler}")
48+
print(" Inserting packets into scheduler:")
4449
pprint(self.input_pkts, indent=4)
4550
for p in self.input_pkts:
46-
self.queue.enqueue(p)
47-
print(" Queue state:")
48-
self.queue.dump()
51+
self.scheduler.enqueue(p)
52+
print(" Scheduler state:")
53+
self.scheduler.dump()
4954
output = []
50-
for p in self.queue:
55+
56+
for p in self.scheduler:
5157
output.append(p)
5258
print(" Got packets from queue:")
5359
pprint(output, indent=4)
5460

5561

62+
class SchedulingAlgorithm():
63+
64+
"""A queuing packet scheduling algorithm requires an abstraction that keeps
65+
the queuing data structure and the algorithm separate. To create a new
66+
Scheduling algorithm, inherit this class, add the scheduling data structures
67+
to the constructor, and implement the constructor, enqueue, dequeue, and the
68+
dump functions.
69+
70+
Please look at the pifo_fifo.py to see how you implement a FIFO.
71+
"""
72+
73+
def __init__(self):
74+
raise NotImplementedError(self.__class__.__name__ + ' missing implementation')
75+
76+
def enqueue(self, item):
77+
raise NotImplementedError(self.__class__.__name__ + ' missing implementation')
78+
79+
def dequeue(self):
80+
raise NotImplementedError(self.__class__.__name__ + ' missing implementation')
81+
82+
def dump(self):
83+
raise NotImplementedError(self.__class__.__name__ + ' missing implementation')
84+
85+
def __next__(self):
86+
item = self.dequeue()
87+
if item is None:
88+
raise StopIteration
89+
return item
90+
91+
def __iter__(self):
92+
return self
93+
94+
def __repr__(self):
95+
return f"{self.__class__.__name__} - {self.__class__.__doc__}"
96+
97+
5698
class Queue:
5799
def __init__(self, idx=None):
58100
self._list = []
@@ -97,11 +139,10 @@ def dump(self):
97139

98140

99141
class Pifo(Queue):
100-
101-
def enqueue(self, item, rank=None):
142+
def enqueue(self, item, rank):
102143
if rank is None:
103-
rank = self.get_rank(item)
104-
item.rank = rank
144+
raise ValueError("Rank can't be of value 'None'.")
145+
105146
super().enqueue((rank, item))
106147
self.sort()
107148

@@ -116,24 +157,42 @@ def peek(self):
116157
itm = super().peek()
117158
return itm[1] if itm else None
118159

119-
def get_rank(self, item):
120-
raise NotImplementedError
121-
122160

123161
class Flow(Queue):
124162
def __init__(self, idx):
125163
super().__init__()
126164
self.idx = idx
127-
self.rank = 0
128165

129166
def __repr__(self):
130-
return f"F({self.idx})"
167+
return f"F(I:{self.idx}, Q:{self.qlen}, L:{self.length})"
131168

132-
# Return the length of the first packet in the queue as the "length" of the
133-
# flow. This is not the correct thing to do, but it works as a stopgap
134-
# solution for testing the hierarchical mode, and we're only using
135-
# unit-length for that anyway
136169
@property
137170
def length(self):
138-
itm = self.peek()
139-
return itm.length if itm else 0
171+
result = 0
172+
for itm in self._list:
173+
result += itm.length if itm else 0
174+
return result
175+
176+
177+
class FlowTracker():
178+
"""This class provides us with the typical operation of keeping track of
179+
flows. Use this class in your scheduling algorithms when your algorithm only
180+
has one type of flows.
181+
"""
182+
183+
def __init__(self):
184+
self._flows = {}
185+
186+
def enqueue(self, pkt, flow_id=None):
187+
if not isinstance(pkt, Packet):
188+
raise ValueError(f"Expected a packet, but got '{pkt}' instead.")
189+
if flow_id is None:
190+
flow_id = pkt.flow
191+
if not flow_id in self._flows:
192+
self._flows[flow_id] = Flow(flow_id)
193+
flow = self._flows[flow_id]
194+
flow.enqueue(pkt)
195+
return flow
196+
197+
def get_flow(self, flow_id):
198+
return self._flows[flow_id]

queue-exp/pifo_srpt.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#!/usr/bin/env python3
2+
# coding: utf-8 -*-
3+
#
4+
# SPDX-License-Identifier: GPL-3.0-or-later
5+
#
6+
# pifo-srpt.py
7+
8+
"""Shortest Remaining Processing Time (SRPT).
9+
10+
This scheduling algorithm is referenced in companion C++ implementation for the
11+
paper "Programmable packet scheduling at line rate" by Sivaraman, Anirudh, et
12+
al.
13+
14+
It schedules packets in the order of how much data the flow has left. It assumes
15+
complete knowledge of the flow length. In the real world, this would either need
16+
to be estimated or limited to predictable flows.
17+
"""
18+
19+
__copyright__ = """
20+
Copyright (c) 2021 Toke Høiland-Jørgensen <[email protected]>
21+
Copyright (c) 2021 Frey Alfredsson <[email protected]>
22+
"""
23+
24+
__license__ = """
25+
This program is free software: you can redistribute it and/or modify
26+
it under the terms of the GNU General Public License as published by
27+
the Free Software Foundation, either version 3 of the License, or
28+
(at your option) any later version.
29+
30+
This program is distributed in the hope that it will be useful,
31+
but WITHOUT ANY WARRANTY; without even the implied warranty of
32+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
33+
GNU General Public License for more details.
34+
35+
You should have received a copy of the GNU General Public License
36+
along with this program. If not, see <http://www.gnu.org/licenses/>.
37+
"""
38+
39+
from pifo_lib import Packet, Runner, Pifo
40+
from pifo_lib import SchedulingAlgorithm
41+
from pifo_lib import FlowTracker
42+
43+
44+
class Srpt(SchedulingAlgorithm):
45+
"""Shortest Remaining Processing Time"""
46+
47+
def __init__(self):
48+
self._pifo = Pifo()
49+
self._flow_tracker = FlowTracker()
50+
51+
self._remains = {}
52+
53+
# We cheat by accessing the global packet list directly
54+
for pkt in pkts:
55+
if pkt.flow in self._remains.keys():
56+
self._remains[pkt.flow] += pkt.length
57+
else:
58+
self._remains[pkt.flow] = pkt.length
59+
60+
def get_rank(self, pkt):
61+
rank = self._remains[pkt.flow]
62+
self._remains[pkt.flow] -= pkt.length
63+
return rank
64+
65+
def enqueue(self, item):
66+
flow = self._flow_tracker.enqueue(item)
67+
rank = self.get_rank(item)
68+
self._pifo.enqueue(flow, rank)
69+
70+
def dequeue(self):
71+
flow = self._pifo.dequeue()
72+
pkt = None
73+
if flow is not None:
74+
pkt = flow.dequeue()
75+
return pkt
76+
77+
def dump(self):
78+
self._pifo.dump()
79+
80+
if __name__ == "__main__":
81+
pkts = [
82+
Packet(flow=1, idn=1, length=2),
83+
Packet(flow=1, idn=2, length=2),
84+
Packet(flow=2, idn=1, length=1),
85+
Packet(flow=2, idn=2, length=1),
86+
Packet(flow=2, idn=3, length=1),
87+
]
88+
Runner(pkts, Srpt()).run()

0 commit comments

Comments
 (0)