Source code for taf.testlib.switch_general

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``switch_general.py``

`General switches-specific functionality`

"""

import sys
import time
import traceback
from abc import abstractmethod
from collections import namedtuple
import ipaddress as ipaddr

import pytest

from . import clissh
from . import loggers
from . import entry_template
from . import ui_ons_xmlrpc
from . import ui_ons_cli
from . import ui_onpss_shell
from . import ui_iss_cli
from . import ui_onpss_jsonrpc
from . import lab
from .custom_exceptions import SwitchException

UI_MAP = {
    "ons_cli": ui_ons_cli.UiOnsCli,
    "ons_xmlrpc": ui_ons_xmlrpc.UiOnsXmlrpc,
    "onpss_shell": ui_onpss_shell.UiOnpssShell,
    "onpss_jsonrpc": ui_onpss_jsonrpc.UiOnpssJsonrpc,
    "iss_cli": ui_iss_cli.UiIssCli,
}

PortsOrder = namedtuple('PortsOrder', 'masters, slaves')


[docs]class SwitchGeneral(entry_template.GenericEntry): """General Switch object functionality. Configuration examples:: { "name": "simswitch2_lxc", "entry_type": "switch", "instance_type": "lxc", "id": 31, "ip_host": "10.0.5.103", "ip_port": "8083", "use_sshtun": 1, "sshtun_user": "admin", "sshtun_pass": "admin", "sshtun_port": 22, "default_gw": "127.0.0.1", "net_mask": "255.255.255.0", "ports_count": 32, "pwboard_host": "1.1.1.100", "pwboard_port": "15", "pwboard_snmp_rw_community_string": "private", "use_serial": false, "cli_user": "lxc_admin", "cli_user_passw": "lxc_admin", "cli_user_prompt": "Switch", "telnet_user": "admin", "telnet_pass": "password", "telnet_prompt": "localhost:~$", "ports": [20, 21, 1, 16], "port_list": [[35, 10000], [36, 2500]], "ports_map": [[51, [51, 52, 53, 54]], [55, [55, 56, 57, 58]]], "mgmt_ports": [47, 50], "related_id": ["33"] } Where:: \b entry_type and \b instance_type are mandatory values and cannot be changed for current device type. \n\b id - int or str uniq device ID (mandatory) \n\b name - User defined device name (optional) \n\b ip_host - uniq device IP (mandatory). \n\b ip_port - uniq device IP port for XML-RPC commands (mandatory) \n\b use_sshtun - set if TAF will use ssh connection for XML-RPC commands (0 or 1) (mandatory) \n\b sshtun_user, \b sshtun_pass and \b sshtun_port - uniq ssh user credentials and port to establish ssh connection (optional). \n\b default_gw and \b net_mask - \n\b ports_count - length of Ports table in default configuration (mandatory) \n\b pwboard_host and \b pwboard_port - PDU IP address and port to perform powercycle device reboot (mandatory). \n\b pwboard_snmp_rw_community_string - PDU SNMP Read/Write community string (optional). \n\b use_serial - set up if telnet connection used for clear config (optional) \n\b cli_user and \b cli_user_passw - uniq CLI user credentials (optional). \n\b cli_user_prompt - default CLI prompt on device (optional). \n\b telnet_user and \b telnet_pass - uniq telnet user credentials (optional). \n\b telnet_prompt - default telnet prompt on device (optional). \n\b ports - list of port ids used in tests (mandatory). \n\b port_list - list of port id and port speed used in tests for speed preconfiguration (optional). \n\b ports_map - mapping of master/slave ports for speed preconfiguration (optional). \n\b mgmt_ports - port ids of management ports in Ports table. Used to avoid setting of adminMode into Down state for these ports (optional). \n\b related_id - list of ids of related devices or services (optional). """ class_logger = None # defined in subclasses
[docs] def __init__(self, config, opts): """Initialize SwitchGeneral class. Args: config(dict): Configuration information. opts(OptionParser): py.test config.option object which contains all py.test cli options. """ super(SwitchGeneral, self).__init__(config, opts) self.name = config.get('name', 'noname') self.ipaddr = config['ip_host'] self.port = config['ip_port'] self.ports_count = config['ports_count'] self.id = config['id'] self.type = config['instance_type'] self.config = config self.opts = opts self.ports, self.speed_ports, self.ports_map = self._get_speed_ports() self.port_list = self.speed_ports[:] self.mgmt_ports = self.config.get("mgmt_ports", []) self.default_restart_type = "powercycle" self.waiton_err_message = "" # defined in subclasses # Use serial console for real devices or not. self._use_serial = config.get('use_serial', True) self._sshtun_port = config.get('sshtun_port', 22) # Update status to On(True) in case --get_only option is selected. self.status = self.opts.get_only # Initialize UI based on UI_MAP and cli --ui option self.ui = UI_MAP[self.opts.ui](self) self.class_logger.debug("ui = %s", self.ui) # Flag to indicate if Switch configuration DB is damaged. self.db_corruption = False self.instance_prop = None
[docs] def _get_speed_ports(self): """Get slave and master ports from config. Returns: list: List of ports (slave and master) used in real config Notes: This function check if master port should be split into slave ports. """ speed_ports = self.config.get("port_list", []) ports_map = self.config.get("ports_map", []) # speed_ports expected format: [[port1_num, port1_speed], ...] if speed_ports: ports = [x[0] for x in speed_ports] else: ports = self.config.get("ports", []) return ports, speed_ports, ports_map
[docs] def _get_port_for_probe(self): """Get port ID. Returns: int: ssh tunnel ports ID """ return int(self._sshtun_port)
[docs] def enable_ports(self): """Enable ports if Ports table is empty. """ pass
[docs] def probe(self): """Probe switch with UI call. Returns: dict: Dictionary (_object) with switchpp status parameters or raise an exception. """ _object = { 'isup': False, 'type': "unknown", 'prop': {}, } if clissh.probe_port(self.ipaddr, self._get_port_for_probe(), self.class_logger): _object['isup'] = True try: # Try to wait until device is ready to process self.ui.check_device_state() # Get switch properties only once if not self.instance_prop: self.instance_prop = self.ui.get_table_platform()[0] except Exception as err: self.class_logger.error("Caught an exception while probing the device: Error type: %s. Error msg: %s" % (type(err), err)) else: _object['type'] = "switchpp" # Define _object['prop'] only if switch has started correctly _object['prop'] = self.instance_prop message = "Switch %s(%s) is processing: " % (self.name, self.ipaddr) instance_props = ", ".join("{} - {}".format(key, val) for key, val in sorted(self.instance_prop.items())) self.class_logger.info(message + instance_props) time.sleep(1) self.class_logger.info(message) return _object
[docs] def waiton(self, timeout=90): """Wait until switch if fully operational. Args: timeout(int): Wait timeout Raises: SwitchException: device doesn't response Returns: dict: Status dictionary from probe method or raise an exception. """ status = None message = "Waiting until switch %s(%s) is up." % (self.name, self.ipaddr) self.class_logger.info(message) stop_flag = False end_time = time.time() + timeout while not stop_flag: if loggers.LOG_STREAM: sys.stdout.write(".") sys.stdout.flush() if time.time() < end_time: # While time isn't elapsed continue probing switch. try: status = self.probe() except KeyboardInterrupt: message = "KeyboardInterrupt while checking switch %s(%s)..." % (self.name, self.ipaddr) self.class_logger.info(message) self.sanitize() pytest.exit(message) if status["isup"] and status["type"] == "switchpp": stop_flag = True if status['prop'] == {}: # If type == switchpp but prop == empty dict then Platform table return incorrect data. message = "Found running switchpp on %s but Platform data is corrupted." % (self.ipaddr, ) self.class_logger.warning(message) raise SwitchException(message) self.class_logger.info("Switch instance on %s(%s) is OK." % (self.name, self.ipaddr)) else: # Time is elapsed. if status["isup"] and status["type"] != "switchpp": message = ("Port %s on host %s is opened but doesn't response queries." + " %s Check your environment!") % (self.port, self.ipaddr, self.waiton_err_message) else: port = self._get_port_for_probe() message = "Timeout exceeded. IP address %s port %s doesn't respond" % (self.ipaddr, port) self.class_logger.warning(message) raise SwitchException(message) if not stop_flag: time.sleep(0.75) return status
[docs] def waitoff(self, timeout=30): """Wait for switch stop listening on ssh port. Args: timeout(int): Wait timeout Raises: SwitchException: device is still alive Returns: bool: True or raise an exception. """ status = True message = "Waiting until switch %s(%s) is down." % (self.name, self.ipaddr) self.class_logger.info(message) stop_flag = False end_time = time.time() + timeout while not stop_flag: if loggers.LOG_STREAM: sys.stdout.write(".") sys.stdout.flush() if time.time() < end_time: status = clissh.probe_port(self.ipaddr, self._get_port_for_probe(), self.class_logger) if not status: stop_flag = True else: if status: message = "Timeout exceeded. The port %s on host %s is still open." % (self._sshtun_port, self.ipaddr) self.class_logger.warning(message) raise SwitchException(message) time.sleep(1) return not status
[docs] def clearconfig(self): """Perform switchpp clearConfig query and raise an exception if it fails. Returns: None """ try: self.ui.clear_config() except Exception as err: message = "Error clearing switch id:%s config: %s." % (self.id, err, ) self.class_logger.error(message) pytest.fail(message)
[docs] def cleanup(self): """Check if switch is operational and perform clearConfig procedure. Returns: None """ if not self.status: self.class_logger.info("Skip cleanup of switch id:%s due to Off status." % (self.id, )) return self.get() self.clearconfig()
@abstractmethod
[docs] def start(self, wait_on=True): """Mandatory method for environment specific switch classes. Args: wait_on(bool): Indicates if wait for device status """ pass
@abstractmethod
[docs] def stop(self): """Mandatory method for environment specific switch classes. """ pass
@abstractmethod
[docs] def restart(self, wait_on=True, mode=""): """Mandatory method for environment specific switch classes. Args: wait_on(bool): Indicates if wait for device status mode(str): Restart mode. powercycle|ui """ pass
[docs] def get(self, init_start=False, retry_count=7): """Get or start switch instance. Args: init_start(bool): Perform switch start operation or not retry_count(int): Number of retries to start(restart) switch Returns: None or raise an exception. Notes: Also self.opts.fail_ctrl attribute affects logic of this method. fail_ctrl is set in py.test command line options (read py.test --help for more information). """ # If fail_ctrl != "restart", restart retries won't be performed if self.opts.fail_ctrl != "restart": retry_count = 1 for retry in range(retry_count): try: if retry == 0: if init_start: self.start() else: self.waiton() else: self.restart(mode=self.default_restart_type) break except KeyboardInterrupt: message = "KeyboardInterrupt while checking switch %s(%s)..." % (self.name, self.ipaddr) self.class_logger.info(message) self.sanitize() pytest.exit(message) except Exception: self.class_logger.warning("Error while checking switch %s(%s)..." % (self.name, self.ipaddr)) retry += 1 exc_type, exc_value, exc_traceback = sys.exc_info() traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback) message = "Error while checking switch %s(%s):\n%s" % (self.name, self.ipaddr, "".join(traceback_message)) sys.stderr.write(message) sys.stderr.flush() self.class_logger.error(message) if retry > 4: self.class_logger.warning( "Could not complete switch start method for the fourth time. Trying to " "reset the DB...") self.db_corruption = True if retry >= retry_count + 1: message = "Could not complete start switch method after {0} retries. Something " \ "went wrong...\n".format(retry_count) sys.stderr.write(message) sys.stderr.flush() self.class_logger.error(message) if self.opts.fail_ctrl != "ignore": pytest.exit(message) else: pytest.fail(message)
[docs] def check(self): """Check if switch is operational using waiton method. Notes: This mandatory method for all environment classes. """ if not self.status: self.class_logger.info("Skip switch id:%s(%s) check because it's has Off status." % (self.id, self.name)) return status = self.waiton() # Verify Ports table is not empty if self.ui.get_table_ports() == []: if self.opts.fail_ctrl == 'stop': self.class_logger.debug("Exit switch check. Ports table is empty!") pytest.exit('Ports table is empty!') else: self.class_logger.debug("Fail switch check. Ports table is empty!") pytest.fail('Ports table is empty!') return status
[docs] def create(self): """Start switch or get running one. Notes: This is mandatory method for all environment classes. Also self.opts.get_only attribute affects logic of this method. get_only is set in py.test command line options (read py.test --help for more information). """ init_start = not self.opts.get_only return self.get(init_start=init_start)
[docs] def destroy(self): """Stop or release switch. Notes: This is mandatory method for all environment classes. Also self.opts.leave_on and get_only attributes affect logic of this method. leave_on and get_only are set in py.test command line options (read py.test --help for more information). """ if not self.status: self.class_logger.info("Skip switch id:%s(%s) destroying because it's has already Off status." % (self.id, self.name)) return if not self.opts.leave_on and not self.opts.get_only: self.stop() self.sanitize()
[docs] def sanitize(self): """Perform any necessary operations to leave environment in normal state. """ # Close sshtun to prevent hanging threads. self.ui.disconnect()
[docs] def connect_port(self, port_id): """Emulate port connection via setting adminMode into Up state. Args: port_id(int): Port number """ self.class_logger.debug("Emulating connecting for port ID = {0}".format(port_id)) _port = self.ui.get_table_ports([int(port_id)])[0] if _port['operationalStatus'] != "NotPresent": # Check if port is LAG member if _port["type"] == "LAGMember": # Use lag id as port id lag_table = self.ui.get_table_ports2lag() port_id = [x["lagId"] for x in lag_table if x["portId"] == port_id][0] self.ui.modify_ports([int(port_id)], adminMode="Up")
[docs] def disconnect_port(self, port_id): """Emulate port disconnection via setting adminMode into Down state. Args: port_id(int): Port number """ self.class_logger.debug("Emulating disconnecting for port ID = {0}".format(port_id)) _port = self.ui.get_table_ports([int(port_id)])[0] if _port['operationalStatus'] != "NotPresent": # Check if port is LAG member if _port["type"] == "LAGMember": # Use lag id as port id lag_table = self.ui.get_table_ports2lag() port_id = [x["lagId"] for x in lag_table if x["portId"] == port_id][0] self.ui.modify_ports([int(port_id)], adminMode="Down")
[docs] def ssh_connect(self, host, port, login, passw): """Make ssh connection to the device. Args: host(str): Device ssh IP address port(int): Device ssh port login(str): ssh username passw(str): ssh password """ self.ssh_conn = clissh.CLISSH(host, port=port, username=login, password=passw) # pylint: disable=attribute-defined-outside-init self.ssh_conn.login() self.ssh_conn.open_shell()
[docs] def disabled_stp_on_management_ports(self): """Disable STP on management ports. """ pass
[docs] def get_env_prop(self, param): """Read properties from all devices. """ return self.instance_prop[param]
[docs]class SwitchReal(SwitchGeneral): """Real devices class. """ class_logger = loggers.ClassLogger() UI_RESTART_TIMEOUT = 5
[docs] def __init__(self, config, opts): """Initialize SwitchReal class. Args: config(dict): Configuration information. opts(OptionParser): py.test config.option object which contains all py.test cli options. """ from . import powerboard self.powerboard = powerboard self.powercycle_timeout = config.get('reboot_latency', 1) self.pwboard_snmp_rw_community_string = 'private' super(SwitchReal, self).__init__(config, opts) self.pwboard = config["pwboard_host"] self.pwport = config["pwboard_port"] self._sshtun_user = config['sshtun_user'] self._sshtun_pass = config['sshtun_pass'] self.pwboard_snmp_rw_community_string = config.get('pwboard_snmp_rw_community_string', 'private') # conditional init, this should be set in concrete Switch platform classes self.mgmt_iface = getattr(self, "mgmt_iface", "eth0") self._default_gw = config['default_gw'] self._net_mask = config['net_mask'] self.waiton_err_message = "Device is started but does not respond to queries." self.telnet = None self.telnet_prompt = None self.startup_time = 180 # create ssh object: self.ssh = clissh.CLISSH(self.ipaddr, self._sshtun_port, self._sshtun_user, self._sshtun_pass, sudo_prompt="Password:") # Create CLI in subclasses self.netconf = None # TODO: update env file if self.netconf: from . import netconfcmd self.netconf = netconfcmd.NETCONF(config)
[docs] def start(self, wait_on=True): """Power on switch or perform power cycle if it is already On. Args: wait_on(bool): Check if switch boot successfully Raises: SwitchException: unknown device status """ self.class_logger.info("Starting Real switch device %s(%s) ..." % (self.name, self.ipaddr)) self.class_logger.debug("Checking device status on powerboard...") status = self.powerboard.get_status(self.pwboard, self.pwport, self.pwboard_snmp_rw_community_string) self.class_logger.debug("Current status %s." % status) if status == "On": # Turn Off Seacliff with halt. if "halt" in self.config and self.config["halt"]: self.halt() self.powerboard.do_action(self.pwboard, self.pwport, self.pwboard_snmp_rw_community_string, self.powerboard.commands["Off"]) self.class_logger.info("Powercyle latency: {}".format(self.powercycle_timeout)) time.sleep(self.powercycle_timeout) self.powerboard.do_action(self.pwboard, self.pwport, self.pwboard_snmp_rw_community_string, self.powerboard.commands["On"]) elif status == "Off": self.powerboard.do_action(self.pwboard, self.pwport, self.pwboard_snmp_rw_community_string, self.powerboard.commands["On"]) else: raise SwitchException("Cannot determine device status.") # After snmp command is sent APC could restart switch # in few seconds. Terefore it's good to wait a little # to prevent fail login prompt detection. time.sleep(1) if wait_on: self.waiton(timeout=self.startup_time) # Set status to On(True) self.status = True # Perform syslog configuration if one exists. self.setup_syslog() # Set initial ports speed self.speed_preconfig()
[docs] def stop(self): """Power Off real switch. Raises: SwitchException: unknown device status """ self.ui.disconnect() self.class_logger.info("Stopping Real switch device %s(%s) ..." % (self.name, self.ipaddr)) self.class_logger.debug("Checking device status on powerboard...") status = self.powerboard.get_status(self.pwboard, self.pwport, self.pwboard_snmp_rw_community_string) self.class_logger.debug("Current status %s." % status) if status == "On": # WORKAROUND BEGIN: Turn Off the device with halt if "halt" in self.config and self.config["halt"]: self.halt() # WORKAROUND END self.powerboard.do_action(self.pwboard, self.pwport, self.pwboard_snmp_rw_community_string, self.powerboard.commands["Off"]) elif status == "Off": self.class_logger.info("Nothing to do. Switch is already off.") else: raise SwitchException("Cannot determine device status.") self.waitoff(timeout=15) # Set Off(False) status self.status = False return True
[docs] def restart(self, wait_on=True, mode="powercycle"): """Perform switch power cycle. Args: wait_on(bool): Check if switch boot properly. mode(str): set type of reboot, by default it's powercycle. Raises: SwitchException: incorrect restart mode Notes: By default start method performs power cycle in case switch is already On. Therefore this method is just link to start(). """ if mode == 'powercycle': self.class_logger.info("Perform powercycle device reboot") self.stop() self.class_logger.info("Powercyle latency: {}".format(self.powercycle_timeout)) time.sleep(self.powercycle_timeout) self.start(wait_on) elif mode == 'ui': self.class_logger.info("Perform graceful device reboot") self.ui.restart() self.status = False self.waitoff(60) self.class_logger.info("Powercyle latency: {}".format(self.UI_RESTART_TIMEOUT)) time.sleep(self.UI_RESTART_TIMEOUT) self.waiton(300) self.status = True # Set initial ports speed self.speed_preconfig() return else: message = "Incorrect restart mode was specified: %s" % (mode, ) self.class_logger.error(message) raise SwitchException(message)
[docs] def get_serial(self, timeout=90, with_login=None, wait_login=0): """Connect to switch via serial. Args: timeout(int): time out to wait connection with_login(bool): Perform login procedure or not. If param isn't set try automatically determine login necessity. (True|False|None) wait_login(int): time to wait login before sending <Enter>. <Enter> is necessary if login is already appiered. Notes: Create(or check) class attribute telnet with active telnet connection to switch and do login. """ self.telnet = lab.ConsoleServer(self.config) self.telnet.get_serial(timeout=timeout, with_login=with_login, wait_login=wait_login)
[docs] def close_serial(self): """Close telnet connection to switch. """ self.telnet.close_serial() del self.telnet self.telnet = None
[docs] def halt(self): """Do halt before shutdown. Raises: SwitchException: error on device halt """ if not self._use_serial: self.class_logger.warning("Skipping halt procedure because serial access is disabled.") return # Connect and halt switch try: self.class_logger.debug("Halting switch...") self.get_serial(with_login=None, wait_login=0) self.class_logger.debug("Telnet connection - OK...") output, err, _ = self.telnet.exec_command("halt", "System halted.") if err: message = "Cannot halt the device.\nStdOut: %s\nStdErr: %s" % (output, err) self.class_logger.error(message) raise SwitchException(message) time.sleep(5) self.class_logger.debug("Switch terminal output:\n%s" % output) except Exception as err: self.class_logger.error(err) finally: self.close_serial()
@staticmethod
[docs] def _netmsk_to_cidr(ip_addr, netmask): """CIDR conversion. Args: ip_addr(str): IP address netmask(int): netmask value """ cidr = ipaddr.IPv4Network('{0}/{1}'.format(ip_addr, netmask), strict=False) return cidr.with_prefixlen
[docs] def clearconfig(self): """Perform clearConfig query on switch using telnet. And try to configure management interface. """ self.class_logger.debug("Performing clearConfig on real switch.") super(SwitchReal, self).clearconfig() self.setup_syslog() # Set initial ports speed self.speed_preconfig()
[docs] def setup_syslog(self): """Setup remote syslog server settings. """ if 'related_id' in self.config: for val in self.config['related_id']: if self.config['related_conf'][val]['instance_type'] == "syslog_settings": syslog_set_id = val # TODO check on first 1.1 build syslog parameters value syslog_ip = self.config['related_conf'][syslog_set_id]['ip'] syslog_proto = self.config['related_conf'][syslog_set_id]['proto'] syslog_port = self.config['related_conf'][syslog_set_id]['port'] syslog_localport = self.config['related_conf'][syslog_set_id]['localport'] syslog_transport = self.config['related_conf'][syslog_set_id]['transport'] syslog_facility = self.config['related_conf'][syslog_set_id]['facility'] syslog_severity = self.config['related_conf'][syslog_set_id]['severity'] try: self.ui.create_syslog(syslog_proto, syslog_ip, syslog_port, syslog_localport, syslog_transport, syslog_facility, syslog_severity) self.class_logger.debug("Syslog configuration finished. Syslog server: %s, proto: %s" % (syslog_ip, syslog_proto)) return except Exception as err: self.class_logger.debug("Syslog configuration skipped. Some error occurs %s" % (err, )) self.class_logger.debug("Syslog configuration skipped. Syslog settings not found.")
[docs] def speed_preconfig(self, wait_for_ports=False): """Function for ports speed preconfiguration. Args: wait_for_ports(int): wait for Ports table changes size """ def _normalize_port_list(ports_list): """Get lists of Master and Slave ports. """ master_ports = set() ports = set() for _port in ports_list: m_port = _get_master_port(_port) master_ports.add(m_port) if m_port != _port: ports.add(_port) m_list = list(master_ports) p_list = list(ports) return PortsOrder(m_list, p_list) def _get_master_port(port): """Get Master port. """ try: return next(r for r, s in self.ports_map if port in s) except StopIteration: return port # Separate ports per preconfigured speed if self.speed_ports: speed_dict = {} for port, speed in self.speed_ports: speed_dict.setdefault(speed, []).append(port) normalized_speed_dict = {} for speed, ports in speed_dict.items(): normalized_speed_dict[speed] = _normalize_port_list(ports) for speed, ports in normalized_speed_dict.items(): self.setup_ports_speed_configuration(ports.masters, speed) self.setup_ports_speed_configuration(ports.slaves, speed) # Wait for Ports table has expected size if wait_for_ports: table_size = len(self.ui.get_table_ports()) _start_time = time.time() while table_size != self.ports_count: if time.time() > _start_time + 10: pytest.fail("Ports table doesn't have expected ports count") time.sleep(0.2) table_size = len(self.ui.get_table_ports()) else: # WORKAROUND for ONS-254496 time.sleep(3)
[docs] def setup_ports_speed_configuration(self, ports=None, speed=10000): """Configure ports speed. Args: ports(list[int]): list of ports to set speed value speed(int): speed value """ if ports: self.class_logger.debug("Performing ports speed configuration on real switch.") err_message = "Cannot perform ports speed configuration.\nDescription: %s\nERROR: %s" try: self.ui.modify_ports(ports=ports, speed=int(speed)) ports_table = self.ui.get_table_ports(ports) for port_table in ports_table: assert port_table["speed"] == speed except AssertionError as err: message = err_message % ("Switch doesn't accept commands.", err, ) self.class_logger.error(message) pytest.fail(message) except Exception as err: message = err_message % ("Unknown.", err, ) self.class_logger.error(message) pytest.fail(message) else: self.class_logger.debug("Ports speed configuration - OK")