Source code for unittests.traffic_generator.test_ixnet

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``test_ixnet.py``

`Unittests for IxNetwork`

"""

import time
import platform
import os

import pytest
import pypacker

from testlib.Ixia.IxiaHLT import IxiaHLTMixin
from testlib.Ixia.IxiaHAL import IxiaHALMixin
from testlib.loggers import ClassLogger
from testlib.packet_processor import PacketProcessor
from testlib.custom_exceptions import IxiaException

IXNET_CONF = {"name": "IxNetwork-103", "entry_type": "tg", "instance_type": "ixiahl", "id": "1",
              "ip_host": "X.X.X.X", "tcl_server": "X.X.X.X:8200", "user": "IxNetwork/user",
              "kprio": 200, "sprio": 200, "cprio": 200, "tprio": 200,
              "ports": [[1, 2, 9]]}


[docs]class Tg(IxiaHLTMixin, PacketProcessor): """ """ class_logger = ClassLogger() # Constants in seconds DEFAULT_MAX_SNIFF_TIME = 3600 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.sniff_ids = {} self.host = args[0]['ip_host'] self.username = args[0]['user'] self.ports, self.port_list = self._get_speed_ports(args[0]) self.ifaces = "{" for iface in self.ports: self.ifaces = self.ifaces + self._convert_iface(iface) + " " self.ifaces = self.ifaces + "}" def _convert_iface(self, iface): return "{" + " ".join([str(x) for x in iface]) + "}" def _get_version(self): return self.tcl('version cget -productVersion') @staticmethod
[docs] def _get_port_to_string(iface): """Simple helper which allows to get string representation for interface Args: iface(list): Which IXIA interface to use for packet sending (list in format [chassis_id, card_id, port_id]) Returns: str: string in format "chassis_id/card_id/port_id" """ return "/".join(map(str, iface))
def _reset_ports(self): for iface in self.ports: chassis, card, port = iface self.class_logger.debug("Reseting config for port %s %s %s." % iface) assert self.tcl('port setFactoryDefaults %s %s %s' % (chassis, card, port)) == '0' self.tcl('port setDefault; port config -autonegotiate true; port config -duplex full; port config -numAddresses 1') self.tcl('port config -transmitMode portTxModeAdvancedScheduler') self.tcl('port set %s %s %s' % (chassis, card, port)) self.tcl('port write %s %s %s' % (chassis, card, port))
[docs] def _get_speed_ports(self, args): """Get ports with speed from config. Notes: This function check if port has speed in config file. Returns: List of ports used in real config """ ports = [] ports_list = [] if 'ports' in args: ports = [tuple(x) for x in args["ports"]] if "port_list" in args: ports = [tuple(x[0]) for x in args["port_list"]] ports_list = [[tuple(x[0]), x[1]] for x in args["port_list"]] return ports, ports_list
[docs] def connect_hal(self): """Logs in to IXIA and takes ports ownership. Returns: None """ try: if platform.system() == 'Linux': self.tcl('ixConnectToTclServer %s' % (self.host, )) if self.username == "": try: os_username = os.environ['SUDO_USER'] except Exception: os_username = os.environ['USER'] self.username = "%s__%s" % ("AutoTest", os_username.replace(".", "_")) self.class_logger.debug(self.username) assert self.tcl('set hostname %s' % (self.host, )) == self.host assert self.tcl('set userName %s' % (self.username, )) == self.username assert self.tcl('set portList %s' % (self.ifaces, )) == self.ifaces[1:-1] assert self.tcl('ixLogin %s' % (self.username, )) == '0' assert self.tcl('ixConnectToChassis %s' % (self.host, )) == '0' self.class_logger.info("IxTclHAL Version: %s." % (self.tcl('version cget -ixTclHALVersion'), )) self.class_logger.info("Product version: %s." % (self._get_version(), )) self.class_logger.info("Installed version: %s." % (self.tcl('version cget -installVersion'), )) self.connection_state = True except Exception as err: self.class_logger.debug("Error connecting to IXIA: %s" % (err, )) raise err errs = {} self.owned_ifaces = [] for iface in self.ports: iface = self._convert_iface(iface) try: assert self.tcl('ixTakeOwnership {%s}' % iface) == '0' self.owned_ifaces.append(iface) except Exception as err: self.class_logger.debug("Error taking ownership on port %s" % (iface, )) errs[iface] = err if self.owned_ifaces: self.ownership_state = True if errs: raise RuntimeError("Error taking ownership: %s" % (errs, )) owned_ports = "{" owned_ports = owned_ports + " ".join(self.owned_ifaces) + " }" assert self.tcl('set ownedPortList %s' % (owned_ports, )) == owned_ports[1:-1] try: IxiaHALMixin.stop_all_streams(self) except Exception as err: self.class_logger.warning("Caught an exception while stopping streams on connection. Type %s. Err: %s" % (type(err), err)) self._reset_ports() self.class_logger.info("Ixia startup complete.")
[docs] def disconnect_hal(self, mode='fast'): """Logs out from IXIA and clears ports ownership. Returns: None """ if self.ownership_state: for iface in self.owned_ifaces: assert self.tcl('ixClearOwnership {%s}' % (iface, )) == '0' self.ownership_state = False if self.connection_state: assert self.tcl('ixLogout') == '0' assert self.tcl('ixDisconnectFromChassis') == '0' if platform.system() == 'Linux': self.tcl('ixDisconnectTclServer %s' % (self.host, )) self.connection_state = False
[docs] def start_sniff(self, ifaces, sniffing_time=None, packets_count=0, filter_layer=None, src_filter=None, dst_filter=None): """Starts sniffing on specified interfaces. Args: ifaces(list): List of TG interfaces for capturing. sniffing_time(int): Time in seconds for sniffing. packets_count(int): Count of packets to sniff (no count limitation in case 0). filter_layer(str): Name of predefined sniffing filter criteria. src_filter(str): Sniff only packet with defined source MAC. dst_filter(str): Sniff only packet with defined destination MAC. Returns: None Notes: This method introduces additional 1.5 seconds timeout after capture enabling. It's required by Ixia sniffer to wait until capturing is started. Examples:: env.tg[1].start_sniff(['eth0', ], filter_layer='IP', src_filter='00:00:00:01:01:01', dst_filter='00:00:00:22:22:22') """ self.class_logger.debug("Starting capturing on ifaces: %s" % (ifaces, )) if filter_layer: if filter_layer not in self.flt_patterns: raise IxiaException("Invalid filter_layer = %s. Allowed values: %s" % (filter_layer, list(self.flt_patterns.keys()))) self.class_logger.debug("Sniff params: %s seconds, %s packets, %s filter layer, %s srcMac, %s dstMac." % (sniffing_time, packets_count if packets_count != 0 else None, filter_layer, src_filter, dst_filter)) # Store sniff settings per port to dictionary for iface in ifaces: _iface = _get_port_to_string(iface) if _iface in self.sniff_ids: message = "Sniffer already started on iface %s %s %s." % iface self.class_logger.error(message) raise IxiaException(message) self.sniff_ids[_iface] = {} self.sniff_ids[_iface]['count'] = packets_count self.sniff_ids[_iface]['layer'] = filter_layer self.sniff_ids[_iface]['time'] = sniffing_time # Define empty filter by default filter_tcl = [] filter_tcl.append("filter setDefault;") filter_tcl.append("capture setDefault;") filter_tcl.append("capture config -captureMode captureContinuousMode;") # Define realtime filter if filter_layer or src_filter or dst_filter: # and filtering_method == "realtime": self.class_logger.debug("Enabling filter for data capturing...") filter_tcl.append("filterPallette setDefault;") filter_tcl.append("capture config -continuousFilter captureContinuousFilter;") filter_tcl.append("filter config -captureFilterEnable true;") else: filter_tcl.append("filter config -captureFilterEnable false;") if filter_layer: # and filtering_method == "realtime": filter_tcl.append(self._set_filter_params(filter_layer)) # Define srcMac filter if src_filter: if ':' in src_filter: src_filter = src_filter.replace(':', ' ') filter_tcl.append("filterPallette config -SA1 {%s};" % (src_filter, )) filter_tcl.append("filter config -captureFilterSA addr1;") # Define dstMac filter if dst_filter: if ':' in dst_filter: dst_filter = dst_filter.replace(':', ' ') filter_tcl.append("filterPallette config -DA1 {%s};" % (dst_filter, )) filter_tcl.append("filter config -captureFilterDA addr1;") # Send filter config to IXIA self.tcl(" ".join(filter_tcl)) # Apply filter to ports ports_list = [] tcl_cmd = [] tcl_cmd.append("set retCode $::TCL_OK;") for iface in ifaces: chassis, card, port = iface tcl_cmd.append("if {[capture set %(chassis)s %(card)s %(port)s]} {errorMsg \"<<capture set>> return error.\"; set retCode $::TCL_ERROR};") if filter_layer or src_filter or dst_filter: tcl_cmd.append("if {[filterPallette set %(chassis)s %(card)s %(port)s]} \ {errorMsg \"<<filterPallette set>> command return error.\"; set retCode $::TCL_ERROR};") tcl_cmd.append("if {[filter set %(chassis)s %(card)s %(port)s]} {errorMsg \"<<filter set>> command return error.\"; set retCode $::TCL_ERROR};") tcl_cmd.append("return $retCode;") assert self.tcl(" ".join(tcl_cmd) % locals()) == "0" ports_list.append(iface) tcl_ports_list = str(ports_list).replace("(", "{").replace(")", "}").replace("[", "{").replace("]", "}").replace("'", "").replace(",", "") assert self.tcl("set rxPortIdList %s;\ set retCode $::TCL_OK;\ if {[ixWriteConfigToHardware rxPortIdList]} {errorMsg \"ixWriteConfigToHardware return error.\"; set retCode $::TCL_ERROR};\ if {[ixClearStats rxPortIdList]} {errorMsg \"ixClearStats return error.\"; set retCode $::TCL_ERROR};\ if {[ixStartCapture rxPortIdList]} {errorMsg \"ixStartCapture return error.\"; set retCode $::TCL_ERROR};\ if {[ixClearTimeStamp rxPortIdList]} {errorMsg \"ixClearTimeStamp return error.\"; set retCode $::TCL_ERROR};\ return $retCode" % (tcl_ports_list, )) == "0" sniffing_start_timestamp = time.time() for iface in ifaces: _iface = _get_port_to_string(iface) self.sniff_ids[_iface]['start_time'] = sniffing_start_timestamp # Wait for assurance that sniffing is started time.sleep(1.5)
[docs] def stop_sniff(self, ifaces, force=False, drop_packets=False, sniff_packet_count=1000): """Stops sniffing on specified interfaces and returns captured data. Args: ifaces(list): List of interfaces where capturing has to be stopped. force(bool): Stop capturing even if time or packet count criteria isn't achieved. drop_packets(bool): Don't return sniffed data (used in case you need just read statistics). sniff_packet_count(int): Default number of packets to return (used to avoid test hanging in case storm). Returns: dict: Dictionary where key = interface name, value = list of sniffed packets. """ def _stop_capture_on_port(iface): chassis, card, port = iface.split("/") assert self.tcl("ixStopPortCapture %s %s %s" % (chassis, card, port)) == "0" self.class_logger.debug("Stopped capture on iface: %s" % iface) def _get_captured_frames_count(iface): chassis, card, port = iface.split("/") tcl_cmd = [] message = "Cannot process tcl command: stat get statAllStats %s %s %s" % (chassis, card, port) tcl_cmd.append("if {[stat get statAllStats %s %s %s]} {errorMsg \"%s\"; return \"ERROR\"} else" % (chassis, card, port, message)) if self.sniff_ids[iface]['layer'] is not None: tcl_cmd.append("{stat cget -captureFilter}") else: tcl_cmd.append("{stat cget -framesReceived}") result = self.tcl(" ".join(tcl_cmd)) if result == "ERROR": self.class_logger.error(message) raise IxiaException(message) else: num_frames = int(result) return num_frames self.class_logger.debug("Stopping capturing on ifaces: %s ..." % (ifaces, )) capture_state = dict((self._get_port_to_string(iface), True) for iface in ifaces) while True in list(capture_state.values()): # This flag will True when we have timed out sniffer without packets count. And we have to make timeout between check. # In case sniffer with packets count we check numFrames and this procedure makes timeout. timeout_flag = True if force: for iface in list(capture_state.keys()): if capture_state[iface]: _stop_capture_on_port(iface) capture_state[iface] = False for iface in list(capture_state.keys()): if capture_state[iface]: # Stop ifaces that do not have time or count parameters if not self.sniff_ids[iface]['time'] and not self.sniff_ids[iface]['count']: _stop_capture_on_port(iface) capture_state[iface] = False # Stop ifaces if time is elapsed if self.sniff_ids[iface]['time'] and (time.time() - self.sniff_ids[iface]['start_time']) >= self.sniff_ids[iface]['time']: _stop_capture_on_port(iface) capture_state[iface] = False for iface in list(capture_state.keys()): if capture_state[iface]: # Stop if frames count reached if self.sniff_ids[iface]['count'] != 0: num_frames = _get_captured_frames_count(iface) if num_frames > self.sniff_ids[iface]['count']: _stop_capture_on_port(iface) capture_state[iface] = False timeout_flag = False if timeout_flag: time.sleep(0.15) # Loop prevention for iface in list(capture_state.keys()): if capture_state[iface]: if not self.sniff_ids[iface]['time'] and (time.time() - self.sniff_ids[iface]['start_time']) >= self.DEFAULT_MAX_SNIFF_TIME: _stop_capture_on_port(iface) capture_state[iface] = False packet_dict = {} for iface in ifaces: _iface = self._get_port_to_string(iface) # After sniff finished we shouldn't use this method. Because statistics update is continuing # captured_packets_count = _get_captured_frames_count(iface) # Instead we check captureBuffer. (which stops capturing and we couldn't use it before) chassis, card, port = iface captured_packets_count = int(self.tcl("captureBuffer get %s %s %s; captureBuffer cget -numFrames;" % (chassis, card, port))) self.class_logger.debug("Collected %s packets on %s interface (real count)" % (captured_packets_count, iface)) if self.sniff_ids[_iface]['count'] != 0: if captured_packets_count < self.sniff_ids[_iface]['count']: packet_count = captured_packets_count else: packet_count = self.sniff_ids[_iface]['count'] else: packet_count = captured_packets_count self.class_logger.debug("Collected %s packets on %s interface (count for processing)" % (packet_count, iface)) packet_list = [] if sniff_packet_count != 0 and sniff_packet_count < packet_count: packet_count = sniff_packet_count if packet_count and not drop_packets: # chassis, card, port = self._get_port_info(iface) assert self.tcl("capture get %(chassis)s %(card)s %(port)s; \ captureBuffer get %(chassis)s %(card)s %(port)s 1 %(packet_count)s" % locals()) == "0" # self.tcl("captureBuffer get %s %s %s 1 %s" % (chassis, card, port, packet_count)) for packet_num in range(1, packet_count + 1): # self.class_logger.debug("Processing packet %s... " % (packet_num, )) assert self.tcl("captureBuffer getframe %s" % (packet_num, )) == "0" raw_packet = self.tcl("captureBuffer cget -frame").replace(" ", "") timestamp = float(self.tcl("captureBuffer cget -timestamp").replace(" ", "")) # self.class_logger.debug(raw_packet) pkt = pypacker.ethernet.Ethernet(raw_packet.decode("hex")) # pylint: disable=no-member pkt.time = self.sniff_ids[_iface]['start_time'] + timestamp / 1000000000 packet_list.append(pkt) packet_dict[iface] = packet_list else: packet_dict[iface] = [] self.sniff_ids.pop(_iface) return packet_dict
[docs] def _set_filter_params(self, layer): """Configures filter parameters for specified layer. Args: Layer(str): compatible with pypacker "layers" Returns: None """ def _set_user_pattern(ptrn_num, ptrn_cfg): _tcl_commands = "" _tcl_commands += "filterPallette config -pattern%s \"%s\";" % (ptrn_num, ptrn_cfg[0], ) _tcl_commands += "filterPallette config -patternMask%s \"%s\";" % (ptrn_num, ptrn_cfg[1], ) _tcl_commands += "filterPallette config -patternOffset%s \"%s\";" % (ptrn_num, ptrn_cfg[2], ) if len(ptrn_cfg) > 3: _tcl_commands += "filterPallette config -patternOffsetType%s \"filterPalletteOffsetStartOf%s\";" % (ptrn_num, ptrn_cfg[3], ) return _tcl_commands tcl_filter = "" if "ptrn1" in self.flt_patterns[layer]: tcl_filter += _set_user_pattern(1, self.flt_patterns[layer]['ptrn1']) tcl_filter += "filterPallette config -matchType1 %s;" % (self.flt_patterns[layer]['mt1'], ) if "ptrn2" in self.flt_patterns[layer]: tcl_filter += _set_user_pattern(2, self.flt_patterns[layer]['ptrn2']) tcl_filter += "filterPallette config -matchType2 %s;" % (self.flt_patterns[layer]['mt2'], ) tcl_filter += "filter config -captureFilterPattern %s;" % (self.flt_patterns[layer]['cfp'], ) return tcl_filter
[docs]def _get_port_to_string(iface): """Simple helper which allows to get string representation for interface. Args: iface(list): Which IXIA interface to use for packet sending (list in format [chassis_id, card_id, port_id]) Returns: str: string in format "chassis_id/card_id/port_id" """ return "/".join(map(str, iface))
[docs]def _get_speed_ports(): """Get ports with speed from config. Notes: This function check if port has speed in config file. Returns: list: List of ports used in real config """ ports = [] ports_list = [] if 'ports' in IXNET_CONF: ports = [tuple(x) for x in IXNET_CONF["ports"]] if "port_list" in IXNET_CONF: ports = [tuple(x[0]) for x in IXNET_CONF["port_list"]] ports_list = [[tuple(x[0]), x[1]] for x in IXNET_CONF["port_list"]] return ports, ports_list
def _set_loopback(tg): iface = tg.ports[0] chassis, card, port = iface assert tg.tcl("ixClearPortStats %(chassis)s %(card)s %(port)s; \ port get %(chassis)s %(card)s %(port)s; \ port config -rxTxMode gigLoopback; \ port config -loopback portLoopback; \ port set %(chassis)s %(card)s %(port)s; \ port write %(chassis)s %(card)s %(port)s" % {'chassis': chassis, 'card': card, 'port': port}) == '0' @pytest.mark.skipif(True, reason="Temporary skip this TC on real environment")
[docs]def test_connect_disconnect(request): """ """ tg = Tg(IXNET_CONF, request.config.option) tg.create() tg.connect_hal() _set_loopback(tg) tg.disconnect_hal() tg.disconnect()
@pytest.mark.skipif(True, reason="Temporary skip this TC on real environment")
[docs]def test_stp(request): """ """ tg = Tg(IXNET_CONF, request.config.option) tg_port = _get_speed_ports()[0][0] tg.create() tg.connect_hal() _set_loopback(tg) tg.STP.configure_bridges(tg_port, auto_pick_bridge_mac="0", auto_pick_port="0", # pylint: disable=no-member bridge_mac="2234.5678.9abc", bridge_mac_step="0000.0000.0005", bridge_mode="mstp", bridge_priority="8192", cist_external_root_cost="5", cist_external_root_mac="aaaa.bbbb.cccc", cist_external_root_priority="4096", cist_reg_root_cost="10", cist_reg_root_mac="abcd.abcd.abcd", cist_reg_root_priority="16384", cist_remaining_hop="10", count="5", enable_jitter="1", forward_delay="1000", hello_interval="1500", inter_bdpu_gap="10", intf_cost="7", intf_count="3", jitter_percentage="20", link_type="shared", mac_address_bridge_step="0000.0001.0000", mac_address_init="3111.2222.1111", mac_address_intf_step="0000.1111.0000", max_age="12500", message_age="11520", mstc_name="ixia", mstc_revision="3", mtu="576", port_no="1", port_no_bridge_step="0", port_no_intf_step="1", port_priority="144", pvid="8", vlan="1", vlan_id="5,6,7", vlan_id_intf_step="0,0,1", vlan_user_priority="3") tg.STP.configure_msti(tg_port, bridge_handler_id='1', count="5", msti_id="4", # pylint: disable=no-member msti_id_step="1", msti_hops="13", msti_internal_root_path_cost="17", msti_mac="1234.1234.1234", msti_mac_step="0000.0002.0001", msti_name="msti-%", msti_port_priority="176", msti_priority="61440", msti_vlan_start="7", msti_vlan_start_step="2", msti_vlan_stop="13", msti_wildcard_percent_enable="1", msti_wildcard_percent_start="10", msti_wildcard_percent_step="2") tg.STP.configure_bridges(tg_port, bridge_handler_id='1', vlan_msti_handler_id='1', mode="modify") # pylint: disable=no-member tg.STP.configure_lans(tg_port, count="10", mac_address="000e.4cd7.c011", # pylint: disable=no-member mac_incr_enable="1", vlan_enable="1", vlan_id="2", vlan_incr_enable="1") tg.STP.control(tg_port, mode="start") # pylint: disable=no-member time.sleep(10) iface = tg.ports[0] tg.start_sniff([iface], sniffing_time=25, packets_count=100, filter_layer="STP") data = tg.stop_sniff([iface]) assert "STP" in data[iface] tg.STP.cleanup() # pylint: disable=no-member tg.disconnect_hal() tg.disconnect()