Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize unsupported values to 'N/A' in CmisApi::get_transceiver_info #545

Merged
merged 9 commits into from
Mar 1, 2025
25 changes: 19 additions & 6 deletions sonic_platform_base/sonic_xcvr/api/public/c_cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ...fields import consts
from .cmis import CmisApi, CMIS_VDM_KEY_TO_DB_PREFIX_KEY_MAP
import time
import copy
BYTELENGTH = 8
SYSLOG_IDENTIFIER = "CCmisApi"

Expand Down Expand Up @@ -46,6 +47,15 @@
helper_logger = logger.Logger(SYSLOG_IDENTIFIER)

class CCmisApi(CmisApi):
C_CMIS_XCVR_INFO_DEFAULT_DICT = copy.deepcopy(CmisApi.cmis_xcvr_info_dict)
C_CMIS_XCVR_INFO_DEFAULT_DICT.update({
"supported_max_tx_power": "N/A",
"supported_min_tx_power": "N/A",
"supported_max_laser_freq": "N/A",
"supported_min_laser_freq": "N/A"
})


def __init__(self, xcvr_eeprom):
super(CCmisApi, self).__init__(xcvr_eeprom)

Expand Down Expand Up @@ -354,14 +364,17 @@ def get_transceiver_info(self):
supported_min_laser_freq = FLOAT ; support minimum laser frequency
================================================================================
"""
trans_info = super(CCmisApi,self).get_transceiver_info()
self.xcvr_info = copy.deepcopy(self.C_CMIS_XCVR_INFO_DEFAULT_DICT)
self.xcvr_info = super(CCmisApi, self).get_transceiver_info()
min_power, max_power = self.get_supported_power_config()
trans_info['supported_max_tx_power'] = max_power
trans_info['supported_min_tx_power'] = min_power
_, _, _, low_freq_supported, high_freq_supported = self.get_supported_freq_config()
trans_info['supported_max_laser_freq'] = high_freq_supported
trans_info['supported_min_laser_freq'] = low_freq_supported
return trans_info
self.xcvr_info.update({
'supported_max_tx_power': max_power,
'supported_min_tx_power': min_power,
'supported_max_laser_freq': high_freq_supported,
'supported_min_laser_freq': low_freq_supported
})
return self.xcvr_info

def get_transceiver_bulk_status(self):
"""
Expand Down
79 changes: 54 additions & 25 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .cmisCDB import CmisCdbApi
from .cmisVDM import CmisVdmApi
import time
import copy
from collections import defaultdict

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,6 +83,37 @@ class CmisApi(XcvrApi):
LowPwrRequestSW = 4
LowPwrAllowRequestHW = 6

cmis_xcvr_info_dict = {
"type": "N/A",
"type_abbrv_name": "N/A",
"hardware_rev": "N/A",
"serial": "N/A",
"manufacturer": "N/A",
"model": "N/A",
"connector": "N/A",
"encoding": "N/A",
"ext_identifier": "N/A",
"ext_rateselect_compliance": "N/A",
"cable_length": "N/A",
"nominal_bit_rate": "N/A",
"vendor_date": "N/A",
"vendor_oui": "N/A",
**{f"active_apsel_hostlane{i}": "N/A" for i in range(1, 9)},
"application_advertisement": "N/A",
"host_electrical_interface": "N/A",
"media_interface_code": "N/A",
"host_lane_count": "N/A",
"media_lane_count": "N/A",
"host_lane_assignment_option": "N/A",
"media_lane_assignment_option": "N/A",
"cable_type": "N/A",
"media_interface_technology": "N/A",
"vendor_rev": "N/A",
"cmis_rev": "N/A",
"specification_compliance": "N/A",
"vdm_supported": "N/A"
}

def __init__(self, xcvr_eeprom):
super(CmisApi, self).__init__(xcvr_eeprom)
self.vdm = CmisVdmApi(xcvr_eeprom) if not self.is_flat_memory() else None
Expand Down Expand Up @@ -267,50 +299,47 @@ def get_transceiver_info(self):
ext_id = admin_info[consts.EXT_ID_FIELD]
power_class = ext_id[consts.POWER_CLASS_FIELD]
max_power = ext_id[consts.MAX_POWER_FIELD]

xcvr_info = {
self.xcvr_info = copy.deepcopy(CmisApi.cmis_xcvr_info_dict)
self.xcvr_info.update({
"type": admin_info[consts.ID_FIELD],
"type_abbrv_name": admin_info[consts.ID_ABBRV_FIELD],
"hardware_rev": self.get_module_hardware_revision(),
"serial": admin_info[consts.VENDOR_SERIAL_NO_FIELD],
"manufacturer": admin_info[consts.VENDOR_NAME_FIELD],
"model": admin_info[consts.VENDOR_PART_NO_FIELD],
"connector": admin_info[consts.CONNECTOR_FIELD],
"encoding": "N/A", # Not supported
"ext_identifier": "%s (%sW Max)" % (power_class, max_power),
"ext_rateselect_compliance": "N/A", # Not supported
"cable_length": float(admin_info[consts.LENGTH_ASSEMBLY_FIELD]),
"nominal_bit_rate": 0, # Not supported
"vendor_date": admin_info[consts.VENDOR_DATE_FIELD],
"vendor_oui": admin_info[consts.VENDOR_OUI_FIELD]
}
appl_advt = self.get_application_advertisement()
xcvr_info['application_advertisement'] = str(appl_advt) if len(appl_advt) > 0 else 'N/A'
xcvr_info['host_electrical_interface'] = self.get_host_electrical_interface()
xcvr_info['media_interface_code'] = self.get_module_media_interface()
xcvr_info['host_lane_count'] = self.get_host_lane_count()
xcvr_info['media_lane_count'] = self.get_media_lane_count()
xcvr_info['host_lane_assignment_option'] = self.get_host_lane_assignment_option()
xcvr_info['media_lane_assignment_option'] = self.get_media_lane_assignment_option()
xcvr_info['cable_type'] = self.get_cable_length_type()
"vendor_oui": admin_info[consts.VENDOR_OUI_FIELD],
"application_advertisement": str(self.get_application_advertisement()) if len(self.get_application_advertisement()) > 0 else 'N/A',
"host_electrical_interface": self.get_host_electrical_interface(),
"media_interface_code": self.get_module_media_interface(),
"host_lane_count": self.get_host_lane_count(),
"media_lane_count": self.get_media_lane_count(),
"host_lane_assignment_option": self.get_host_lane_assignment_option(),
"media_lane_assignment_option": self.get_media_lane_assignment_option(),
"cable_type": self.get_cable_length_type(),
"media_interface_technology": self.get_media_interface_technology(),
"vendor_rev": self.get_vendor_rev(),
"cmis_rev": self.get_cmis_rev(),
"specification_compliance": self.get_module_media_type(),
"vdm_supported": self.is_transceiver_vdm_supported()
})
apsel_dict = self.get_active_apsel_hostlane()
for lane in range(1, self.NUM_CHANNELS+1):
xcvr_info["%s%d" % ("active_apsel_hostlane", lane)] = \
apsel_dict["%s%d" % (consts.ACTIVE_APSEL_HOSTLANE, lane)]
xcvr_info['media_interface_technology'] = self.get_media_interface_technology()
xcvr_info['vendor_rev'] = self.get_vendor_rev()
xcvr_info['cmis_rev'] = self.get_cmis_rev()
xcvr_info['specification_compliance'] = self.get_module_media_type()
for lane in range(1, self.NUM_CHANNELS + 1):
self.xcvr_info["%s%d" % ("active_apsel_hostlane", lane)] = \
apsel_dict["%s%d" % (consts.ACTIVE_APSEL_HOSTLANE, lane)]

# In normal case will get a valid value for each of the fields. If get a 'None' value
# means there was a failure while reading the EEPROM, either because the EEPROM was
# not ready yet or experincing some other issues. It shouldn't return a dict with a
# wrong field value, instead should return a 'None' to indicate to XCVRD that retry is
# needed.
if None in xcvr_info.values():
if None in self.xcvr_info.values():
return None
else:
return xcvr_info
return self.xcvr_info

def get_transceiver_info_firmware_versions(self):
return_dict = {"active_firmware" : "N/A", "inactive_firmware" : "N/A"}
Expand Down
17 changes: 10 additions & 7 deletions tests/sonic_xcvr/test_ccmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,29 +160,32 @@ def test_get_pm_all(self, mock_response, expected):

@pytest.mark.parametrize("mock_response, expected",[
(
[ {
(
{
'type': 'QSFP-DD Double Density 8X Pluggable Transceiver'
},
(-20, 0),
(0xff, -72, 120, 191300, 196100)
],
),
{
'type': 'QSFP-DD Double Density 8X Pluggable Transceiver',
'supported_min_laser_freq': 191300,
'supported_max_laser_freq': 196100,
'supported_max_tx_power': 0,
'supported_min_tx_power': -20,

}
)
])
@patch("sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info")
def test_get_transceiver_info(self, get_transceiver_info_func, mock_response, expected):
# Mock the base class method to return initial transceiver data
get_transceiver_info_func.return_value = mock_response[0]
self.api.get_supported_power_config = MagicMock()
self.api.get_supported_power_config.return_value = mock_response[1]
self.api.get_supported_freq_config = MagicMock()
self.api.get_supported_freq_config.return_value = mock_response[2]

# Mock the power and frequency configurations
self.api.get_supported_power_config = MagicMock(return_value = mock_response[1])
self.api.get_supported_freq_config = MagicMock(return_value = mock_response[2])

# Call function under test
result = self.api.get_transceiver_info()
assert result == expected

Expand Down
85 changes: 36 additions & 49 deletions tests/sonic_xcvr/test_cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1433,11 +1433,14 @@ def test_module_fw_upgrade(self, input_param, mock_response, expected):
result = self.api.module_fw_upgrade(input_param)
assert result == expected

@pytest.mark.parametrize("mock_response, expected",[
([None, None, None, None, None, None, None, None, None, None, None, None, None, None, None], None),
@pytest.mark.parametrize("mock_response, expected", [
# Case: No transceiver data
([None] * 15, None),

# Case: Valid transceiver data
(
[
{
{ # EEPROM mocked Data
'Extended Identifier': {'Power Class': 'Power Class 8', 'MaxPower': 20.0},
'Identifier': 'QSFP-DD Double Density 8X Pluggable Transceiver',
'Identifier Abbreviation': 'QSFP-DD',
Expand All @@ -1450,30 +1453,31 @@ def test_module_fw_upgrade(self, input_param, mock_response, expected):
'Length Cable Assembly': 0.0,
'ModuleMediaType': 'sm_media_interface',
'VendorDate': '21010100',
'VendorOUI': 'xx-xx-xx'
'VendorOUI': 'xx-xx-xx',
'vdm_supported': True,
},
'400GAUI-8 C2M (Annex 120E)',
'400ZR, DWDM, amplified',
8, 1, 1, 1,
{'ActiveAppSelLane1': 1, 'ActiveAppSelLane2': 1, 'ActiveAppSelLane3': 1, 'ActiveAppSelLane4': 1,
'ActiveAppSelLane5': 1, 'ActiveAppSelLane6': 1, 'ActiveAppSelLane7': 1, 'ActiveAppSelLane8': 1},
{f'ActiveAppSelLane{i}': 1 for i in range(1, 9)},
'1550 nm DFB',
'0.0',
'5.0',
'0.1',
'0.0',
'sm_media_interface',
{'status': True, 'result': ("0.3.0", 1, 1, 0, "0.2.0", 0, 0, 0, "0.3.0", "0.2.0")}
{'status': True, 'result': ("0.3.0", 1, 1, 0, "0.2.0", 0, 0, 0, "0.3.0", "0.2.0")}
],
{ 'type': 'QSFP-DD Double Density 8X Pluggable Transceiver',
{ # Expected result
'type': 'QSFP-DD Double Density 8X Pluggable Transceiver',
'type_abbrv_name': 'QSFP-DD',
'model': 'ABCD',
'encoding': 'N/A',
'ext_identifier': 'Power Class 8 (20.0W Max)',
'ext_rateselect_compliance': 'N/A',
'cable_type': 'Length Cable Assembly(m)',
'cable_length': 0.0,
'nominal_bit_rate': 0,
'nominal_bit_rate': 'N/A',
'specification_compliance': 'sm_media_interface',
'application_advertisement': 'N/A',
'media_lane_count': 1,
Expand All @@ -1485,56 +1489,39 @@ def test_module_fw_upgrade(self, input_param, mock_response, expected):
'media_interface_code': '400ZR, DWDM, amplified',
'serial': '00000000',
'host_lane_count': 8,
'active_apsel_hostlane1': 1,
'active_apsel_hostlane3': 1,
'active_apsel_hostlane2': 1,
'active_apsel_hostlane5': 1,
'active_apsel_hostlane4': 1,
'active_apsel_hostlane7': 1,
'active_apsel_hostlane6': 1,
'active_apsel_hostlane8': 1,
**{f'active_apsel_hostlane{i}': 1 for i in range(1, 9)},
'hardware_rev': '0.0',
'cmis_rev': '5.0',
'media_lane_assignment_option': 1,
'connector': 'LC',
'host_lane_assignment_option': 1,
'vendor_date': '21010100'
'vendor_date': '21010100',
'vdm_supported': True,
}
)
])
def test_get_transceiver_info(self, mock_response, expected):
self.api.xcvr_eeprom.read = MagicMock()
self.api.xcvr_eeprom.read.return_value = mock_response[0]
self.api.get_host_electrical_interface = MagicMock()
self.api.get_host_electrical_interface.return_value = mock_response[1]
self.api.get_module_media_interface = MagicMock()
self.api.get_module_media_interface.return_value = mock_response[2]
self.api.get_host_lane_count = MagicMock()
self.api.get_host_lane_count.return_value = mock_response[3]
self.api.get_media_lane_count = MagicMock()
self.api.get_media_lane_count.return_value = mock_response[4]
self.api.get_host_lane_assignment_option = MagicMock()
self.api.get_host_lane_assignment_option.return_value = mock_response[5]
self.api.get_media_lane_assignment_option = MagicMock()
self.api.get_media_lane_assignment_option.return_value = mock_response[6]
self.api.get_active_apsel_hostlane = MagicMock()
self.api.get_active_apsel_hostlane.return_value = mock_response[7]
self.api.get_media_interface_technology = MagicMock()
self.api.get_media_interface_technology.return_value = mock_response[8]
self.api.get_vendor_rev = MagicMock()
self.api.get_vendor_rev.return_value = mock_response[9]
self.api.get_cmis_rev = MagicMock()
self.api.get_cmis_rev.return_value = mock_response[10]
self.api.get_module_fw_info = MagicMock()
self.api.get_module_media_type = MagicMock()
self.api.get_module_media_type.return_value = mock_response[13]
self.api.get_module_hardware_revision = MagicMock()
self.api.get_module_hardware_revision.return_value = '0.0'
self.api.get_module_fw_info.return_value = mock_response[14]
self.api.is_flat_memory = MagicMock()
self.api.is_flat_memory.return_value = False
self.api.xcvr_eeprom.read = MagicMock(return_value = mock_response[0])
self.api.get_host_electrical_interface = MagicMock(return_value = mock_response[1])
self.api.get_module_media_interface = MagicMock(return_value = mock_response[2])
self.api.get_host_lane_count = MagicMock(return_value = mock_response[3])
self.api.get_media_lane_count = MagicMock(return_value = mock_response[4])
self.api.get_host_lane_assignment_option = MagicMock(return_value = mock_response[5])
self.api.get_media_lane_assignment_option = MagicMock(return_value = mock_response[6])
self.api.get_active_apsel_hostlane = MagicMock(return_value = mock_response[7])
self.api.get_media_interface_technology = MagicMock(return_value = mock_response[8])
self.api.get_vendor_rev = MagicMock(return_value = mock_response[9])
self.api.get_cmis_rev = MagicMock(return_value = mock_response[10])
self.api.get_module_fw_info = MagicMock(return_value = mock_response[14])
self.api.get_module_media_type = MagicMock(return_value = mock_response[13])
self.api.get_module_hardware_revision = MagicMock(return_value="0.0")
self.api.is_flat_memory = MagicMock(return_value=False)
self.api.is_transceiver_vdm_supported = MagicMock(return_value=True)

# Run test and validate output
result = self.api.get_transceiver_info()
assert result == expected

# Test negative path
self.api.get_cmis_rev.return_value = None
result = self.api.get_transceiver_info()
Expand Down Expand Up @@ -3082,7 +3069,7 @@ def test_get_error_description(self):
}
self.api.xcvr_eeprom.read = MagicMock()
self.api.xcvr_eeprom.read.return_value = 0x10

result = self.api.get_error_description()
assert result is 'OK'

Expand Down
Loading