Skip to content

Commit 70e2dfe

Browse files
Merge pull request #48 from vinod-kanigicherla/fix-netflex-pipelines
Add handlers for Ookla and MLab; fixed measurement_data formatting
2 parents 0f71e3a + f59db3d commit 70e2dfe

File tree

5 files changed

+33
-13
lines changed

5 files changed

+33
-13
lines changed

pipelines/measurements/netflex_pipeline.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from netunicorn.library.tasks.measurements.ndt import NDT7SpeedTest
44
from netunicorn.library.tasks.data_transfer.send_data import SendData
55
from netunicorn.library.tasks.data_transfer.fetch_data import FetchData
6+
from netunicorn.library.tasks.handlers.ookla import ookla_handler
7+
from netunicorn.library.tasks.handlers.mlab import mlab_handler
68
import os
79

810
def netflex_ookla_full_loop_pipeline() -> Pipeline:
@@ -14,7 +16,7 @@ def netflex_ookla_full_loop_pipeline() -> Pipeline:
1416
pipeline = (
1517
Pipeline()
1618
.then(OoklaSpeedtest(name="Ookla CLI Speedtest"))
17-
.then(SendData(task_descriptors=[SendData.TaskDescriptor("Ookla CLI Speedtest", "ookla-speedtest", None)], endpoint=os.getenv('RAG_ENDPOINT'), name="send"))
19+
.then(SendData(task_descriptors=[SendData.TaskDescriptor("Ookla CLI Speedtest", "ookla-speedtest", ookla_handler)], endpoint=os.getenv('RAG_ENDPOINT'), name="send"))
1820
.then(FetchData(send_data_task="send", endpoint=os.getenv("RAG_ENDPOINT"), name="fetch"))
1921
)
2022
return pipeline
@@ -28,7 +30,7 @@ def netflex_mlab_full_loop_pipeline() -> Pipeline:
2830
pipeline = (
2931
Pipeline()
3032
.then(NDT7SpeedTest(name="test", flags=["-format", "json"]))
31-
.then(SendData(task_descriptors=[SendData.TaskDescriptor("test", "mlab-speedtest", None)], endpoint=os.getenv('RAG_ENDPOINT'), name="send"))
33+
.then(SendData(task_descriptors=[SendData.TaskDescriptor("test", "mlab-speedtest", mlab_handler)], endpoint=os.getenv('RAG_ENDPOINT'), name="send"))
3234
.then(FetchData(send_data_task="send", endpoint=os.getenv("RAG_ENDPOINT"), name="fetch"))
3335
)
3436
return pipeline

tasks/data_transfer/send_data.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@ def __init__(
1818
self,
1919
task_descriptors: list[TaskDescriptor],
2020
endpoint: str,
21-
allow_failure = False,
21+
allow_failure = False,
22+
payload_handler: Optional[Callable[[Any], Any]] = None,
2223
*args,
2324
**kwargs
2425
):
2526

2627
self.task_descriptors = task_descriptors
2728
self.endpoint = endpoint
2829
self.allow_failure = allow_failure
30+
self.payload_handler = payload_handler
2931
super().__init__(*args, **kwargs)
3032

3133
def get_geolocation_from_ip(self, ip: str):
@@ -61,21 +63,31 @@ def run(self):
6163
if isinstance(raw_task_result, Failure):
6264
continue
6365

64-
task_results = [result.unwrap() for result in raw_task_result]
66+
raw_results = [result.unwrap() for result in raw_task_result]
6567

6668
if task_descriptor.handler:
67-
task_results = task_descriptor.handler(task_descriptor)
68-
69-
execution_results.append({"task_type": task_descriptor.datatype, "task_results": task_results})
69+
task_results = task_descriptor.handler(task_descriptor.datatype, raw_results)
70+
else: # no handler
71+
task_results = {
72+
"task_type": task_descriptor.datatype,
73+
"task_results": raw_results
74+
}
75+
execution_results.append(task_results)
7076

7177
(location_str, *_) = self.get_geolocation_from_ip("8.8.8.8")
7278

73-
response = requests.post(self.endpoint, json=
74-
{
75-
"client_location": location_str,
76-
"measurement_type": self.task_descriptors[0].datatype,
77-
"measurement_data": execution_results
78-
})
79+
measurement_data = {"execution_results": execution_results}
80+
81+
payload = {
82+
"client_location": location_str,
83+
"measurement_type": self.task_descriptors[0].datatype,
84+
"measurement_data": measurement_data
85+
}
86+
87+
if self.payload_handler:
88+
payload = self.payload_handler(payload)
89+
90+
response = requests.post(self.endpoint, json=payload)
7991

8092
if response.status_code == 200:
8193
return Success(response.json())

tasks/handlers/__init__.py

Whitespace-only changes.

tasks/handlers/mlab.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
def mlab_handler(task_type: str, results: list[dict]) -> dict:
2+
return {"task_type": task_type, "task_results": {"data": results}}

tasks/handlers/ookla.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
def ookla_handler(task_type: str, results: list[dict]) -> dict:
2+
for r in results:
3+
r.pop("interface", None)
4+
return {"task_type": task_type, "task_results": results}

0 commit comments

Comments
 (0)