Skip to content

Commit dc5a9ec

Browse files
Merge pull request #34 from AkulS1008/main
2 parents 9d66541 + 0fc302e commit dc5a9ec

File tree

2 files changed

+228
-37
lines changed

2 files changed

+228
-37
lines changed
Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
from netunicorn.base import Pipeline
2-
from netunicorn.library.tasks.capture.tcpdump import StartCapture, StopNamedCapture
3-
from netunicorn.library.tasks.measurements.ookla_speedtest import SpeedTest
4-
from netunicorn.library.tasks.upload.fileio import UploadToFileIO
5-
2+
from netunicorn.library.tasks.measurements.ookla_speedtest import OoklaSpeedtest, OoklaSpeedtestAnalysis
63

74
def simple_speedtest_pipeline() -> Pipeline:
85
"""
9-
Run a speedtest while capturing the traffic with tcpdump and upload the capture file to file.io
6+
Run a speedtest using ookla CLI and then providing user-friendly analysis of the results
107
:return: pipeline to run
118
"""
129
pipeline = (
1310
Pipeline()
14-
.then(StartCapture(filepath="/tmp/capture.pcap", name="capture"))
15-
.then(SpeedTest())
16-
.then(StopNamedCapture(start_capture_task_name="capture"))
17-
.then(UploadToFileIO(filepath="/tmp/capture.pcap", expires="1d"))
11+
.then(OoklaSpeedtest(name="Ookla CLI Speedtest"))
12+
.then(OoklaSpeedtestAnalysis(name="Ookla Speedtest Analysis"))
1813
)
19-
return pipeline
14+
return pipeline
Lines changed: 223 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,239 @@
1-
from typing import Dict
1+
import subprocess
2+
import json
3+
from typing import Callable, Optional, List
24

3-
from netunicorn.base import Architecture, Node, Task, TaskDispatcher
4-
from netunicorn.library.tasks.tasks_utils import subprocess_run
5+
from netunicorn.base import Architecture, Failure, Success, Task, TaskDispatcher, Node
6+
from subprocess import CalledProcessError
7+
from dataclasses import dataclass
58

9+
UNIX_REQUIREMENTS = [
10+
"apt-get install curl --yes",
11+
"curl -s https://packagecloud.io/install/repositories/ookla/speedtest-cli/script.deb.sh | bash",
12+
"apt-get install speedtest --yes",
13+
]
614

7-
class SpeedTest(TaskDispatcher):
8-
def __init__(self, *args, **kwargs):
15+
@dataclass
16+
class ServerInfo:
17+
id: str
18+
host: str
19+
port: int
20+
name: str
21+
location: str
22+
country: str
23+
24+
class OoklaSpeedtest(TaskDispatcher):
25+
def __init__(self, server_selection_task_name: str = "", source_ip: str = "", timeout: int = 100, *args, **kwargs):
26+
"""
27+
Proivde either `server_selection_task_name` or `source_ip` to ping to a certain server.
28+
If neither are provided, a server will be automatically selected.
29+
If both are proived, the server id from the server selection task will be prioritized.
30+
"""
931
super().__init__(*args, **kwargs)
10-
self.linux_instance = SpeedTestLinuxImplementation(name=self.name)
32+
self.linux_implementation = OoklaSpeedtestLinuxImplementation(server_selection_task_name, source_ip, timeout, name=self.name)
33+
34+
def dispatch(self, node: Node) -> Task:
35+
if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
36+
return self.linux_implementation
37+
raise NotImplementedError(
38+
f"Ookla Speedtest is not implemented for architecture: {node.architecture}"
39+
)
40+
41+
class OoklaSpeedtestLinuxImplementation(Task):
42+
requirements = UNIX_REQUIREMENTS
43+
44+
def __init__(self,server_selection_task_name: str, source_ip: str, timeout: int, *args, **kwargs):
45+
self.timeout = timeout
46+
self.server_selection_task_name = server_selection_task_name
47+
self.source_ip = source_ip
48+
super().__init__(*args, **kwargs)
49+
50+
def run(self):
51+
52+
try:
53+
flags = ["--accept-gdpr", "--accept-license", "--progress=no", "--format=json"]
54+
55+
if self.server_selection_task_name != '':
56+
server_id = self.previous_steps.get(self.server_selection_task_name, [Failure(f"{self.server_selection_task_name} not found")])[-1]
57+
58+
if isinstance(server_id, Failure):
59+
return server_id
60+
61+
else:
62+
flags.append(f"--server-id={server_id.unwrap()}")
63+
64+
elif self.source_ip != '':
65+
flags.append(f"--ip={self.source_ip}")
66+
67+
else:
68+
pass
69+
70+
result = subprocess.run(["speedtest"] + flags, stdout=subprocess.PIPE)
71+
result.check_returncode()
72+
return Success(json.loads(result.stdout))
73+
74+
except subprocess.TimeoutExpired:
75+
return Failure("Ookla Speedtest timed out.")
76+
77+
except CalledProcessError:
78+
return Failure(
79+
f"Ookla Speedtest failed with return code {result.returncode}. "
80+
f"\nStdout: {result.stdout.strip()} "
81+
f"\nStderr: {result.stderr.strip()}"
82+
)
83+
84+
except Exception as e:
85+
return Failure(f"Exception occurred: {str(e)}")
86+
87+
88+
class ServerSelection(TaskDispatcher):
89+
"""
90+
Inteded to be use in tandem with `OoklaSpeedtest`. Allows users to select a specific server from a list using a callback function.
91+
"""
92+
93+
def __init__(self, callback: Callable[[list[ServerInfo]], str], *args, **kwargs):
94+
"""
95+
`callback` will recieve a list of `ServerInfo` and should return a single server id from that list.
96+
"""
97+
super().__init__(*args, **kwargs)
98+
self.linux_instance = ServerSelectionLinuxImplementation(callback, name=self.name)
1199

12100
def dispatch(self, node: Node) -> Task:
13101
if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
14102
return self.linux_instance
15103

16104
raise NotImplementedError(
17-
f"SpeedTest is not implemented for architecture: {node.architecture}"
105+
f"ServerSelection is not implemented for architecture: {node.architecture}"
18106
)
19107

108+
class ServerSelectionLinuxImplementation(Task):
109+
requirements = UNIX_REQUIREMENTS
20110

21-
class SpeedTestLinuxImplementation(Task):
22-
requirements = ["pip install speedtest-cli"]
23-
111+
def __init__(self, callback: Callable[[list[ServerInfo]], str], *args, **kwargs):
112+
self.callback = callback
113+
super().__init__(*args, **kwargs)
114+
24115
def run(self):
25-
return subprocess_run(["speedtest-cli", "--simple", "--secure"]).map(
26-
self._format_data
116+
try:
117+
flags = ["--accept-gdpr", "--accept-license", "--progress=no", "--servers", "--format=json"]
118+
result = subprocess.run(["speedtest"] + flags, stdout=subprocess.PIPE)
119+
result.check_returncode()
120+
servers = [
121+
ServerInfo(
122+
server["id"],
123+
server["host"],
124+
server["port"],
125+
server["name"],
126+
server["location"],
127+
server["country"]
128+
)
129+
for server
130+
in json.loads(result.stdout.decode())["servers"]
131+
]
132+
return self.callback(servers)
133+
134+
except CalledProcessError:
135+
return Failure(
136+
f"Ookla_Speedtest_CLI failed with return code {result.returncode}. "
137+
f"\nStdout: {result.stdout.strip()} "
138+
f"\nStderr: {result.stderr.strip()}"
139+
)
140+
141+
class OoklaSpeedtestAnalysis(TaskDispatcher):
142+
"""
143+
This task analyzes the results of an Ookla Speedtest by inspecting the latency,
144+
jitter, and download/upload throughput. It then provides a simple classification
145+
(e.g. 'good', 'ok', 'strange', 'problem') for latency and throughput results.
146+
"""
147+
def __init__(self, speedtest_task_name: str, *args, **kwargs):
148+
super().__init__(*args, **kwargs)
149+
self.linux_implementation = OoklaSpeedtestAnalysisLinuxImplementation(
150+
speedtest_task_name,
151+
name=self.name
152+
)
153+
154+
def dispatch(self, node: Node) -> Task:
155+
if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
156+
return self.linux_implementation
157+
raise NotImplementedError(
158+
f"Ookla_Speedtest_CLI is not implemented for architecture: {node.architecture}"
27159
)
28160

29-
@staticmethod
30-
def _format_data(data: str) -> Dict[str, Dict]:
31-
ping, download, upload, *other = data.split("\n")
32-
return {
33-
"ping": {"value": float(ping.split(" ")[1]), "unit": ping.split(" ")[2]},
34-
"download": {
35-
"value": float(download.split(" ")[1]),
36-
"unit": download.split(" ")[2],
37-
},
38-
"upload": {
39-
"value": float(upload.split(" ")[1]),
40-
"unit": upload.split(" ")[2],
41-
},
42-
"other": other,
43-
}
161+
class OoklaSpeedtestAnalysisLinuxImplementation(Task):
162+
requirements = UNIX_REQUIREMENTS
163+
164+
def __init__(self, speedtest_task_name: str, *args, **kwargs):
165+
self.speedtest_task_name = speedtest_task_name
166+
super().__init__(*args, **kwargs)
167+
168+
def classify_latency(self, latency_value: float) -> str:
169+
if latency_value < 10:
170+
return "good"
171+
elif latency_value < 30:
172+
return "ok"
173+
elif latency_value < 100:
174+
return "strange"
175+
else:
176+
return "problem"
177+
178+
def classify_throughput(self, bandwidth_bps: float) -> str:
179+
mbps = bandwidth_bps / 1_000_000
180+
if mbps < 10:
181+
return "low"
182+
elif mbps < 50:
183+
return "ok"
184+
elif mbps < 150:
185+
return "good"
186+
else:
187+
return "excellent"
188+
189+
def run(self):
190+
try:
191+
raw_speedtest_results = self.previous_steps.get(self.speedtest_task_name, Failure("Ookla CLI Speedtest Task has not been executed"))
192+
193+
if isinstance(raw_speedtest_results, Failure):
194+
return raw_speedtest_results
195+
196+
speedtest_results = [result.unwrap() for result in raw_speedtest_results]
197+
ping_latencies: List[float] = []
198+
ping_jitters: List[float] = []
199+
download_bandwidths: List[float] = []
200+
upload_bandwidths: List[float] = []
201+
202+
for speedtest_data_dict in speedtest_results:
203+
ping_info = speedtest_data_dict.get("ping", {})
204+
if "latency" in ping_info:
205+
ping_latencies.append(float(ping_info["latency"]))
206+
if "jitter" in ping_info:
207+
ping_jitters.append(float(ping_info["jitter"]))
208+
download_info = speedtest_data_dict.get("download", {})
209+
if "bandwidth" in download_info:
210+
download_bandwidths.append(float(download_info["bandwidth"]))
211+
upload_info = speedtest_data_dict.get("upload", {})
212+
if "bandwidth" in upload_info:
213+
upload_bandwidths.append(float(upload_info["bandwidth"]))
214+
215+
def average(values: List[float]) -> float:
216+
return sum(values) / len(values) if values else 0.0
217+
218+
avg_latency = average(ping_latencies)
219+
avg_jitter = average(ping_jitters)
220+
avg_download_bps = average(download_bandwidths)
221+
avg_upload_bps = average(upload_bandwidths)
222+
223+
latency_class = self.classify_latency(avg_latency) if avg_latency > 0 else "Unknown"
224+
download_class = self.classify_throughput(avg_download_bps) if avg_download_bps > 0 else "Unknown"
225+
upload_class = self.classify_throughput(avg_upload_bps) if avg_upload_bps > 0 else "Unknown"
226+
227+
summary = {
228+
"average_ping_latency_ms": avg_latency,
229+
"ping_latency_class": latency_class,
230+
"average_ping_jitter_ms": avg_jitter,
231+
"average_download_bandwidth_bps": avg_download_bps,
232+
"download_bandwidth_class": download_class,
233+
"average_upload_bandwidth_bps": avg_upload_bps,
234+
"upload_bandwidth_class": upload_class,
235+
}
236+
return Success(summary)
237+
238+
except Exception as e:
239+
return Failure(f"Exception occurred: {str(e)}")

0 commit comments

Comments
 (0)