|
1 | 1 | import pytest |
| 2 | +import logging |
2 | 3 |
|
3 | 4 | from sonic_platform_base.sonic_xcvr.xcvr_eeprom import XcvrEeprom |
4 | 5 | from sonic_platform_base.sonic_xcvr.api.public.cmis import CmisApi |
|
10 | 11 | ReadRequest, |
11 | 12 | WriteRequest, |
12 | 13 | ) |
| 14 | +from cmis import MemMap |
| 15 | +from cmis.optoe import EEPROM |
| 16 | + |
| 17 | +logger = logging.getLogger(__name__) |
| 18 | + |
13 | 19 |
|
14 | 20 |
|
15 | 21 | class XcvrEmuEeprom(XcvrEeprom): |
16 | | - def __init__(self, config: dict): |
| 22 | + def __init__(self, config: dict, mem_map=None): |
17 | 23 |
|
18 | 24 | codes = CmisCodes |
19 | | - mem_map = CmisMemMap(codes) |
20 | 25 |
|
21 | 26 | self.xcvr = CMISTransceiver( |
22 | 27 | 0, |
23 | 28 | { |
24 | 29 | "present": True, |
25 | 30 | "defaults": config, |
26 | 31 | }, |
| 32 | + mem_map, |
27 | 33 | ) |
28 | 34 |
|
29 | | - super().__init__(self._read, self._write, mem_map) |
| 35 | + super().__init__(self._read, self._write, CmisMemMap(codes)) |
30 | 36 |
|
31 | 37 | def _read(self, offset, num_bytes): |
32 | 38 | if not self.xcvr.present: |
@@ -81,3 +87,46 @@ async def test_get_model(emu_response, expected): |
81 | 87 | result = result.rstrip("\x00") |
82 | 88 | await eeprom.xcvr.plugout() |
83 | 89 | assert result == expected |
| 90 | + |
| 91 | +# hexdump taken from https://github.com/sonic-net/sonic-platform-common/issues/489#issue-2445591891 |
| 92 | +EEPROM_HEXDUMP = """00000000 18 40 00 07 00 00 00 00 00 00 00 00 00 00 17 00 |.@..............| |
| 93 | +00000010 82 00 00 00 00 00 00 00 17 80 00 00 00 00 00 00 |................| |
| 94 | +00000020 00 00 00 00 00 00 00 01 00 00 00 00 00 00 00 00 |................| |
| 95 | +00000030 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................| |
| 96 | +00000040 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................| |
| 97 | +00000050 00 00 00 00 00 03 00 00 00 00 00 00 00 00 00 00 |................| |
| 98 | +00000060 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................| |
| 99 | +00000070 00 00 11 00 88 00 00 00 00 00 00 00 00 00 00 00 |................| |
| 100 | +00000080 18 43 49 53 43 4f 20 20 20 20 20 20 20 20 20 20 |.CISCO | |
| 101 | +00000090 20 00 06 f6 36 38 2d 31 30 33 32 30 35 2d 30 32 | ...68-103205-02| |
| 102 | +000000a0 20 20 20 20 32 20 46 41 42 32 36 31 31 30 30 43 | 2 FAB261100C| |
| 103 | +000000b0 51 20 20 20 20 20 32 32 31 30 31 38 20 20 00 00 |Q 221018 ..| |
| 104 | +000000c0 00 00 00 00 00 00 00 00 e0 78 00 00 00 00 00 00 |.........x......| |
| 105 | +000000d0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 f9 00 |................| |
| 106 | +000000e0 1b 00 07 00 00 00 00 00 00 00 00 00 00 00 00 00 |................| |
| 107 | +000000f0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|""" |
| 108 | + |
| 109 | +@pytest.mark.asyncio |
| 110 | +async def test_get_application_advertisement(): |
| 111 | + e = EEPROM() |
| 112 | + e.load(EEPROM_HEXDUMP) |
| 113 | + m = MemMap(e) |
| 114 | + eeprom = XcvrEmuEeprom({}, m) |
| 115 | + api = CmisApi(eeprom) |
| 116 | + |
| 117 | + data = e.read(0, 0, 0, 256) |
| 118 | + for f, v in m.decode(0, 0, data): |
| 119 | + logger.info(f.to_str(value=v)) |
| 120 | + |
| 121 | + result = api.get_application_advertisement() |
| 122 | + logger.info(f"Application Advertisement: {result}") |
| 123 | + |
| 124 | + app_id = list(result.keys())[0] |
| 125 | + |
| 126 | + with pytest.raises(KeyError): |
| 127 | + result = api.get_media_lane_count() |
| 128 | + |
| 129 | + result = api.get_media_lane_count(app_id) |
| 130 | + logger.info(f"Media Lane Count: {result}") |
| 131 | + |
| 132 | + await eeprom.xcvr.plugout() |
0 commit comments