|
| 1 | +import asyncio |
| 2 | +import pathlib |
| 3 | +import subprocess |
| 4 | + |
| 5 | +from unittest.mock import AsyncMock, patch |
| 6 | +import pytest |
| 7 | + |
| 8 | +from firmware.FirmwareUpload import FirmwareUploader |
| 9 | +from exceptions import InvalidUploadTool, UploadToolNotFound |
| 10 | + |
| 11 | + |
| 12 | +class TestFirmwareUpload: |
| 13 | + def test_init_success(self) -> None: |
| 14 | + with patch("shutil.which", return_value="ardupilot_fw_uploader.py"): |
| 15 | + uploader = FirmwareUploader() |
| 16 | + assert uploader._autopilot_port == pathlib.Path("/dev/autopilot") |
| 17 | + assert uploader._baudrate_bootloader == 115200 |
| 18 | + assert uploader._baudrate_flightstack == 57600 |
| 19 | + assert uploader._binary == pathlib.Path("ardupilot_fw_uploader.py") |
| 20 | + |
| 21 | + def test_init_binary_not_found(self) -> None: |
| 22 | + with patch("shutil.which", return_value=None): |
| 23 | + with pytest.raises(UploadToolNotFound, match="Uploader binary not found on system's PATH."): |
| 24 | + FirmwareUploader() |
| 25 | + |
| 26 | + def test_binary_name(self) -> None: |
| 27 | + assert FirmwareUploader.binary_name() == "ardupilot_fw_uploader.py" |
| 28 | + |
| 29 | + def test_binary(self) -> None: |
| 30 | + with patch("shutil.which", return_value="ardupilot_fw_uploader.py"), patch.object( |
| 31 | + FirmwareUploader, "validate_binary" |
| 32 | + ): |
| 33 | + uploader = FirmwareUploader() |
| 34 | + assert uploader.binary() == pathlib.Path("ardupilot_fw_uploader.py") |
| 35 | + |
| 36 | + def test_validate_binary_success(self) -> None: |
| 37 | + with patch("shutil.which", return_value="ardupilot_fw_uploader.py"): |
| 38 | + uploader = FirmwareUploader() |
| 39 | + uploader.validate_binary() |
| 40 | + |
| 41 | + def test_validate_binary_failure(self) -> None: |
| 42 | + with patch("shutil.which", return_value="ardupilot_fw_uploader.py"): |
| 43 | + uploader = FirmwareUploader() |
| 44 | + |
| 45 | + with patch("subprocess.check_output", side_effect=subprocess.CalledProcessError(2, "cmd", "error output")): |
| 46 | + with pytest.raises(InvalidUploadTool, match="Binary returned 2 on '--help' call: error output"): |
| 47 | + uploader.validate_binary() |
| 48 | + |
| 49 | + def test_set_autopilot_port(self) -> None: |
| 50 | + with patch("shutil.which", return_value="ardupilot_fw_uploader.py"): |
| 51 | + uploader = FirmwareUploader() |
| 52 | + new_port = pathlib.Path("/dev/autopilot2") |
| 53 | + uploader.set_autopilot_port(new_port) |
| 54 | + assert uploader._autopilot_port == new_port |
| 55 | + |
| 56 | + def test_set_baudrate_bootloader(self) -> None: |
| 57 | + with patch("shutil.which", return_value="ardupilot_fw_uploader.py"): |
| 58 | + uploader = FirmwareUploader() |
| 59 | + new_baudrate = 57600 |
| 60 | + uploader.set_baudrate_bootloader(new_baudrate) |
| 61 | + assert uploader._baudrate_bootloader == new_baudrate |
| 62 | + |
| 63 | + def test_set_baudrate_flightstack(self) -> None: |
| 64 | + with patch("shutil.which", return_value="ardupilot_fw_uploader.py"): |
| 65 | + uploader = FirmwareUploader() |
| 66 | + new_baudrate = 38400 |
| 67 | + uploader.set_baudrate_flightstack(new_baudrate) |
| 68 | + assert uploader._baudrate_flightstack == new_baudrate |
| 69 | + |
| 70 | + @pytest.mark.asyncio |
| 71 | + async def test_upload_success(self) -> None: |
| 72 | + with patch("shutil.which", return_value="ardupilot_fw_uploader.py"), patch.object( |
| 73 | + FirmwareUploader, "validate_binary" |
| 74 | + ): |
| 75 | + uploader = FirmwareUploader() |
| 76 | + |
| 77 | + firmware_path = pathlib.Path("/tmp/test_firmware.bin") |
| 78 | + |
| 79 | + mock_process = AsyncMock() |
| 80 | + mock_process.returncode = 0 |
| 81 | + mock_process.stdout = AsyncMock() |
| 82 | + mock_process.stdout.readline = AsyncMock( |
| 83 | + side_effect=[b"Upload starting...\n", b"Progress: 50%\n", b"Upload complete!\n", b""] |
| 84 | + ) |
| 85 | + mock_process.wait = AsyncMock(return_value=0) |
| 86 | + |
| 87 | + with patch("asyncio.create_subprocess_shell", return_value=mock_process) as mock_create_subprocess, patch( |
| 88 | + "asyncio.sleep" |
| 89 | + ) as mock_sleep, patch("loguru.logger.info") as mock_info, patch("loguru.logger.debug") as mock_debug: |
| 90 | + |
| 91 | + await uploader.upload(firmware_path) |
| 92 | + |
| 93 | + expected_command = ( |
| 94 | + f"ardupilot_fw_uploader.py {firmware_path}" |
| 95 | + f" --port /dev/autopilot" |
| 96 | + f" --baud-bootloader 115200" |
| 97 | + f" --baud-flightstack 57600" |
| 98 | + ) |
| 99 | + |
| 100 | + mock_create_subprocess.assert_called_once_with( |
| 101 | + expected_command, |
| 102 | + stdout=asyncio.subprocess.PIPE, |
| 103 | + stderr=asyncio.subprocess.PIPE, |
| 104 | + shell=True, |
| 105 | + ) |
| 106 | + |
| 107 | + mock_info.assert_any_call("Starting upload of firmware to board.") |
| 108 | + mock_info.assert_any_call("Successfully uploaded firmware to board.") |
| 109 | + mock_debug.assert_any_call("Upload starting...") |
| 110 | + mock_debug.assert_any_call("Progress: 50%") |
| 111 | + mock_debug.assert_any_call("Upload complete!") |
| 112 | + |
| 113 | + mock_sleep.assert_called_with(10) |
0 commit comments