# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""``IxiaHAL.py``
`TAF Ixia traffic generator based on IxTclHal API`
"""
import copy
import codecs
import os
import platform
import time
import struct
from tkinter import Tcl, TclError
from . import ixia_helpers
from ..custom_exceptions import IxiaException
import pypacker
[docs]class IxiaHALMixin(object):
"""IXIA interaction base class.
"""
class_logger = None
# Constants in seconds
DEFAULT_MAX_SNIFF_TIME = 3600
[docs] def __init__(self, config, opts):
"""Initializes connection to IXIA.
Args:
config(dict): Configuration information
opts(OptionParser): py.test config.option object which contains all py.test cli options
"""
self.__opts = opts
self.__config = config
self._init_tcl()
self.id = config['id']
self.type = config['instance_type']
self.host = config['ip_host']
self.owned_ifaces = []
self.ownership_state = False
self.connection_state = False
self.stream_ids = {}
self.sniff_ids = {}
self.username = ""
if "user" in config:
self.username = config['user']
[docs] def _convert_iface(self, iface):
"""Convert representation of TG port from tuple to string.
Args:
iface(tuple(int)): TG port in format tuple(chassisID, cardId, portId)
Returns:
str: Representation of TG port in format {chassisID cardId portId}
"""
return "{" + " ".join([str(x) for x in iface]) + "}"
[docs] def _init_tcl(self):
"""Initialize Tcl interpreter.
Returns:
None
"""
self.Tcl = Tcl()
def tcl_puts(*args):
"""Enables logging for tcl output.
Returns:
None
"""
if len(args) >= 2:
stream = args[0]
if stream == "stdout":
self.class_logger.debug(" ".join(args[1:]))
elif stream == "stderr":
self.class_logger.error(" ".join(args[1:]))
else:
self.class_logger.debug("stream <%s>: %s" % (args[0], " ".join(args[1:])))
elif len(args) == 1:
self.class_logger.debug(args[0])
else:
self.class_logger.error("Called puts without arguments.")
return None
self.Tcl.createcommand("tcl_puts", tcl_puts)
self.class_logger.debug("Insert tcl script to catch puts output.")
ixia_helpers.tcl_puts_replace(self.Tcl)
ixia_helpers.ixtclhal_import(self.Tcl)
[docs] def tcl(self, cmd):
"""Log end execute tcl code.
Args:
cmd(str): Tcl command
Returns:
str: Result of execution
"""
self.class_logger.debug("Run tcl command: %s", cmd)
return self.Tcl.eval(cmd)
[docs] def _get_version(self):
"""Get Ixia version.
Returns:
str: Version of product
"""
return self.tcl('version cget -productVersion')
[docs] def connect(self):
"""Logs in to IXIA and takes ports ownership.
Raises:
AssertionError: error in executing tcl code
Exception: Connection error
RuntimeError: Error on taking/clearing port ownership
Returns:
None
"""
try:
if platform.system() == 'Linux':
self.tcl('ixConnectToTclServer %s' % (self.host, ))
if self.username == "":
try:
os_username = os.environ['SUDO_USER']
except KeyError:
os_username = os.environ['USER']
self.username = "%s__%s" % ("AutoTest", os_username.replace(".", "_"))
self.class_logger.debug(self.username)
assert self.tcl('set hostname %s' % (self.host, )) == self.host
assert self.tcl('set userName %s' % (self.username, )) == self.username
assert self.tcl('set portList %s' % (self.ifaces, )) == self.ifaces[1:-1]
assert self.tcl('ixLogin %s' % (self.username, )) == '0'
assert self.tcl('ixConnectToChassis %s' % (self.host, )) == '0'
self.class_logger.info("IxTclHAL Version: %s." % (self.tcl('version cget -ixTclHALVersion'), ))
self.class_logger.info("Product version: %s." % (self._get_version(), ))
self.class_logger.info("Installed version: %s." % (self.tcl('version cget -installVersion'), ))
self.connection_state = True
except Exception as err:
self.class_logger.debug("Error connecting to IXIA: %s" % (err, ))
raise err
errs = {}
# Clear port ownership if option 'ixia_clear_ownership' enable
if self.__opts.ixia_clear_ownership and not self.is_protocol_emulation_present:
for iface in self.ports:
iface = self._convert_iface(iface)
try:
assert self.tcl('ixClearOwnership {%s} force' % (iface, )) == '0'
except Exception as err:
self.class_logger.debug("Error clearing ownership on port %s" % (iface, ))
raise RuntimeError("Error clearing ownership: %s" % (errs, ))
self.owned_ifaces = []
for iface in self.ports:
iface = self._convert_iface(iface)
try:
assert self.tcl('ixTakeOwnership {%s}' % iface) == '0'
self.owned_ifaces.append(iface)
except Exception as err:
self.class_logger.debug("Error taking ownership on port %s" % (iface, ))
errs[iface] = err
if self.owned_ifaces:
self.ownership_state = True
if errs:
raise RuntimeError("Error taking ownership: %s" % (errs, ))
owned_ports = "{"
owned_ports = owned_ports + " ".join(self.owned_ifaces) + " }"
assert self.tcl('set ownedPortList %s' % (owned_ports, )) == owned_ports[1:-1]
try:
self.stop_all_streams()
except Exception as err:
self.class_logger.warning("Caught an exception while stopping streams on connection. Type %s. Err: %s" % (type(err), err))
self._reset_ports()
self.class_logger.info("Ixia startup complete.")
__connect = connect
[docs] def _reset_ports(self):
"""Reset TG ports configuration.
Raises:
AssertionError: error in executing tcl code
Returns:
None
"""
for iface in self.ports:
chassis, card, port = iface
self.class_logger.debug("Reseting config for port %s %s %s." % iface)
assert self.tcl('port setFactoryDefaults %s %s %s' % (chassis, card, port)) == '0'
self.tcl('port setDefault; port config -autonegotiate true; port config -duplex full; port config -numAddresses 1')
self.tcl('port config -transmitMode portTxModeAdvancedScheduler')
self.tcl('port set %s %s %s' % (chassis, card, port))
self.tcl('port write %s %s %s' % (chassis, card, port))
[docs] def disconnect(self, mode='fast'):
"""Logs out from IXIA and clears ports ownership.
Args:
mode(str): Type of mode to execute
Raises:
AssertionError: error in executing tcl code
Returns:
None
"""
if self.ownership_state:
for iface in self.owned_ifaces:
assert self.tcl('ixClearOwnership {%s}' % (iface, )) == '0'
self.ownership_state = False
if self.connection_state:
assert self.tcl('ixLogout') == '0'
assert self.tcl('ixDisconnectFromChassis') == '0'
if platform.system() == 'Linux':
self.tcl('ixDisconnectTclServer %s' % (self.host, ))
self.connection_state = False
__disconnect = disconnect
[docs] def disconnect_port(self, iface):
"""Simulate port link disconnecting (set it to admin down etc).
Args:
iface(str): Interface to disconnect.
Raises:
NotImplementedError: not implemented
Returns:
None or raise and exception.
"""
chassis, card, port = iface
self.class_logger.debug("Emulating disconnecting for port %s %s %s." % iface)
self.tcl('port config -enableSimulateCableDisconnect true')
self.tcl('port set %s %s %s' % (chassis, card, port))
self.tcl('port write %s %s %s' % (chassis, card, port))
[docs] def connect_port(self, iface):
"""Simulate port link connecting (set it to admin up etc).
Args:
iface(str): Interface to connect.
Raises:
NotImplementedError: not implemented
Returns:
None or raise and exception.
"""
chassis, card, port = iface
self.class_logger.debug("Emulating connecting for port %s %s %s." % iface)
self.tcl('port config -enableSimulateCableDisconnect false')
self.tcl('port set %s %s %s' % (chassis, card, port))
self.tcl('port write %s %s %s' % (chassis, card, port))
[docs] def check(self):
"""Check if TG object is alive and ready for processing.
Returns:
None or raise and exception.
"""
try:
self._get_version()
except TclError:
try:
self.__disconnect()
except TclError:
pass
self.__init__(self.__config, self.__opts)
[docs] def create(self):
"""Perform all necessary procedures to initialize TG device and prepare it for interaction.
Returns:
None or raise and exception.
Notes:
Method has to check --get_only option. Set of steps to configure TG device is related to particular TG type.
"""
return self.__connect()
[docs] def destroy(self):
"""Perform all necessary procedures to uninitialize TG device.
Returns:
None or raise and exception.
Notes:
Method has to check --get_only and --leave_on options.
Set of steps to unconfigure TG device is related to particular TG type.
Method has to clear all connections and stop all captures and data streams.
"""
self.cleanup(mode="fast")
self.__disconnect()
[docs] def cleanup(self, mode="complete"):
"""This method should do Ixia ports cleanup (remove streams etc).
Args:
mode(str): "fast" or "complete". If mode == "fast", method does not clear streams on the port, but stops them (str).
Returns:
None or raise and exception.
"""
# TODO: Add stop_sniff etc
# TODO: Handle errors more gracefully
try:
self.stop_streams()
except Exception as err:
self.class_logger.warning("Caught an exception while stopping streams. Type %s. Err: %s" % (type(err), err))
if mode == "complete":
self.sniff_ids = {}
self.stream_ids = {}
self._reset_ports()
[docs] def sanitize(self):
"""This method has to clear all stuff which can cause device inconsistent state after exit or unexpected exception.
Notes:
E.g. clear connections, stop threads. This method is called from pytest.softexit
"""
self.__disconnect()
[docs] def clear_streams(self):
"""Stop and clear all traffic streams.
"""
self.stream_ids = {}
self._reset_ports()
[docs] def _transmit_stream(self, chassis, card, port, ix_stream_id):
"""Transmit Ixia stream.
Args:
chassis(int): Chassis id
card(int): Card id
port(int): Port id
ix_stream_id(int): Stream id
Raises:
AssertionError: error in executing tcl code
IxiaException: Link is down
Returns:
None
"""
# Check Link State before sending stream.
if self.tcl("set linkToCheck {{%s %s %s}}; ixCheckLinkState linkToCheck" % (chassis, card, port)) != "0":
raise IxiaException("Link {%s %s %s} is Down." % (chassis, card, port))
# Get initial stream statistics
self.tcl("streamTransmitStats get {chassis} {card} {port} {stream_id} {stream_id}; "
"set startCount{stream_id} [streamTransmitStats cget -framesSent]".format(chassis=chassis,
card=card,
port=port,
stream_id=ix_stream_id))
# Enable stream.
self._enable_stream(chassis, card, port, ix_stream_id)
# Start transmission if it's disabled on the port.
assert self.tcl("if {[stat getTransmitState %(chassis)s %(card)s %(port)s] != 1} \
{ixStartPortTransmit %(chassis)s %(card)s %(port)s} \
else \
{return \"0\"}" %
{'chassis': chassis, 'card': card, 'port': port}) == "0"
[docs] def _enable_stream(self, chassis, card, port, ix_stream_id):
"""Enable Ixia stream.
Args:
chassis(int): Chassis id
card(int): Card id
port(int): Port id
ix_stream_id(int): Stream id
Returns:
None
"""
self.tcl("stream get %(chassis)s %(card)s %(port)s %(stream_id)s; \
stream config -enable true; \
stream set %(chassis)s %(card)s %(port)s %(stream_id)s; \
stream write %(chassis)s %(card)s %(port)s %(stream_id)s;" %
{'chassis': chassis, 'card': card, 'port': port, 'stream_id': ix_stream_id}) == "0"
[docs] def _disable_stream(self, chassis, card, port, ix_stream_id):
"""Disable Ixia stream.
Args:
chassis(int): Chassis id
card(int): Card id
port(int): Port id
ix_stream_id(int): Stream id
Returns:
None
"""
self.tcl("stream get %(chassis)s %(card)s %(port)s %(stream_id)s; \
stream config -enable false; \
stream set %(chassis)s %(card)s %(port)s %(stream_id)s; \
stream write %(chassis)s %(card)s %(port)s %(stream_id)s;" %
{'chassis': chassis, 'card': card, 'port': port, 'stream_id': ix_stream_id}) == "0"
[docs] def _check_increment(self, increment, name):
"""Verify that representation of increment is correct.
Args:
increment(tuple(int)): Increment in format tuple(step, count)
name(str): Name of increment
Raises:
TypeError: Incorrect type of parameters.
Returns:
tuple: Step and count.
"""
if not isinstance(increment, tuple):
raise TypeError("%s must be a tuple." % name)
step = increment[0]
count = increment[1]
if not isinstance(step, int) or not isinstance(count, int):
raise TypeError("%s must be a tuple of integers." % name)
return step, count
[docs] def _set_ixia_udf_field(self, udf_id=1, initval='1', offset=24, bit_offset=0, counter_type='c32', step=1, count=1, continuous=False):
"""Setup Ixia stream UDF.
Args:
udf_id(int): UDF id
initval(str): Initial value
offset(int): offset value
bit_offset(int): bit_offset value
counter_type(str): number of bits to increment
step(int): increment step
count(int): increment count
continuous(bool): continuous increment
Returns:
list: Tcl commands.
"""
tcl_commands = []
if udf_id > 5:
self.class_logger.warning("Increment will not be applied because udf_id is more than 5: %s" % udf_id)
else:
tcl_commands.append("udf setDefault; udf config -enable true; udf config -initval {0};".format(initval))
tcl_commands.append("udf config -offset {0};".format(offset))
tcl_commands.append("udf config -bitOffset {0};".format(bit_offset))
tcl_commands.append("udf config -countertype {0};".format(counter_type))
if continuous and count == 0:
tcl_commands.append("udf config -continuousCount true;")
else:
tcl_commands.append("udf config -continuousCount false;")
if step >= 0:
tcl_commands.append("udf config -updown uuuu;")
else:
tcl_commands.append("udf config -updown dddd;")
if count > 0:
tcl_commands.append("udf config -repeat {0};".format(count))
tcl_commands.append("udf config -step {0};".format(abs(step)))
tcl_commands.append("udf set {0};".format(udf_id))
return tcl_commands
[docs] def _set_increments(self, packet, sa_increment, da_increment, sip_increment, dip_increment, arp_sa_increment,
arp_sip_increment, igmp_ip_increment, lldp_sa_increment, sudp_increment, dudp_increment, stcp_increment, dtcp_increment,
vlan_increment, continuous, required_size, eth_type_increment, dscp_increment, protocol_increment, sipv6_increment,
dipv6_increment, fl_increment, dhcp_si_increment, in_vlan_increment, tc_increment, nh_increment, isis_lspid_increment, chassis, card, port, force_errors):
"""Set stream increments.
Args:
packet(pypacker.Packet): Packet to analyze
sa_increment(tuple): Source MAC increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
da_increment(tuple): Destination MAC increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
sip_increment(tuple): Source IPv4 increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
dip_increment(tuple): Destination IPv4 increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
arp_sa_increment(tuple): Source MAC increment parameters for ARP packet. Tuple (<step>, <count>). Has to be used in pair with arp_sip_increment.
arp_sip_increment(tuple): Source IP increment parameters for ARP packet. Tuple (<step>, <count>). Has to be used in pair with arp_sa_increment.
igmp_ip_increment(tuple): Destination IP increment parameters for IGMP packet. Tuple (<step>, <count>).
lldp_sa_increment(tuple): Source MAC increment parameters for LLDP packet. Tuple (<step>, <count>).
sudp_increment(tuple): UDP source port increment parameters.
dudp_increment(tuple): UDP destination port increment parameters.
stcp_increment(tuple): source TCP address increment
dtcp_increment(tuple): destination TCP address increment
vlan_increment(tuple): VLAN increment parameters for tagged packet. Tuple (<step>, <count>).
continuous(bool): Should stream be sent continuously or not. Continuous streams have to be started using start_streams method.
required_size (int or tuple): Integer or tuple of parameters needed to be set when packet size should be incremented .
Tuple examples: ('Increment', <step>, <min>, <max>), ('Random', <min>, <max>)
dscp_increment(tuple): DSCP increment parameters.
protocol_increment(tuple): IP protocol incrementation..
sipv6_increment(tuple): Source IPv6 increment parameters.
dipv6_increment(tuple): Destination IPv6 increment parameters.
fl_increment(tuple): Flow label increment parameters.
dhcp_si_increment(tuple): DHCP IP increment parameters.
in_vlan_increment(tuple): Inner vlan ID increment parameters for double tagged frames. Tuple (<step>, <count>).
tc_increment(tuple): IPv6 traffic class increment parameters.
nh_increment(tuple): Next header increment parameters.
eth_type_increment(tuple): Ethernet frame type increment parameters.
isis_lspid_increment(tuple): Dot3|LLC|ISIS_LSP LSP ID field increment parameters.
chassis(int): TG chassis id
card(int): TG card id
port(int): TG port id
force_errors(str): Emulate Errors for configured stream. Enum ("bad" /*streamErrorBadCRC, "none" /*streamErrorNoCRC,
"dribble" /*streamErrorDribble, "align" /*streamErrorAlignment)
Returns:
list: Tcl commands.
Notes:
For increments description see set_stream method().
"""
tcl_commands = []
# Set source address increment
if sa_increment is not None:
sa_increment_step, sa_increment_count = self._check_increment(sa_increment, "sa_increment")
if continuous and sa_increment_count == 0:
if sa_increment_step >= 0:
tcl_commands.append("stream config -saRepeatCounter contIncrement;")
else:
tcl_commands.append("stream config -saRepeatCounter contDecrement;")
else:
if sa_increment_step >= 0:
tcl_commands.append("stream config -saRepeatCounter increment; stream config -numSA %s;" % (sa_increment_count, ))
else:
tcl_commands.append("stream config -saRepeatCounter decrement; stream config -numSA %s;" % (sa_increment_count, ))
tcl_commands.append("stream config -saStep %s;" % (abs(sa_increment_step), ))
# Set destination address increment
if da_increment is not None:
da_increment_step, da_increment_count = self._check_increment(da_increment, "da_increment")
if continuous and da_increment_count == 0:
if da_increment_step >= 0:
tcl_commands.append("stream config -daRepeatCounter contIncrement;")
else:
tcl_commands.append("stream config -daRepeatCounter contDecrement;")
else:
if da_increment_step >= 0:
tcl_commands.append("stream config -daRepeatCounter increment; stream config -numDA %s;" % (da_increment_count, ))
else:
tcl_commands.append("stream config -daRepeatCounter decrement; stream config -numDA %s;" % (da_increment_count, ))
tcl_commands.append("stream config -daStep %s;" % (abs(da_increment_step), ))
tcl_commands.append("protocol setDefault;")
self.udf_dict = {}
pattern = None
if (sip_increment is not None or dip_increment is not None or arp_sa_increment is not None or arp_sip_increment is not None or
igmp_ip_increment is not None or sudp_increment is not None or dudp_increment is not None or stcp_increment is not None or
dtcp_increment is not None or vlan_increment is not None or dscp_increment is not None or protocol_increment is not None or
in_vlan_increment is not None):
tcl_commands.append("protocol config -ethernetType ethernetII;")
payl = copy.deepcopy(packet)
while payl:
class_name = payl.__class__.__name__
if class_name == "Ethernet":
if payl.vlan:
vlan_type = struct.unpack('!HH', payl.vlan)
payl, commands = self._configure_vlan(payl, vlan_type, chassis, card, port, vlan_increment, in_vlan_increment, continuous)
tcl_commands.extend(commands)
payl = payl.upper_layer
elif class_name == "IP":
payl, commands = self._configure_ip(payl, chassis, card, port)
tcl_commands.extend(commands)
# Set source ip address increment
if sip_increment is not None:
sip_increment_step, sip_increment_count = self._check_increment(sip_increment, "sip_increment")
udf_id = len(self.udf_dict) + 1
src_ip = packet[pypacker.layer3.ip.IP].src_s.split('.')
sip_initval = str(hex(int(src_ip[0])))[2:].zfill(2) + str(hex(int(src_ip[1])))[2:].zfill(2) + \
str(hex(int(src_ip[2])))[2:].zfill(2) + str(hex(int(src_ip[3])))[2:].zfill(2)
if packet.vlan:
offset = 30
else:
offset = 26
self.udf_dict["sip"] = {"udf_id": udf_id, "initval": sip_initval, "offset": offset, "counter_type": 'c32', "step": sip_increment_step,
"count": sip_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["sip"]))
# TODO
# Set dscp increment
if dscp_increment is not None:
dscp_increment_step, dscp_increment_count = self._check_increment(dscp_increment, "dscp_increment")
udf_id = len(self.udf_dict) + 1
dscp = packet[pypacker.layer3.ip.IP].tos
dscp_initval = hex(int(dscp))[2:]
if packet.vlan:
offset = 19
else:
offset = 15
self.udf_dict["dscp"] = {"udf_id": udf_id, "initval": dscp_initval, "offset": offset, "counter_type": 'c8',
"step": dscp_increment_step * 4, "count": dscp_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["dscp"]))
# TODO
# Set protocol increment
if protocol_increment is not None:
protocol_increment_step, protocol_increment_count = self._check_increment(protocol_increment, "protocol_increment")
udf_id = len(self.udf_dict) + 1
protocol = packet[pypacker.layer3.ip.IP].p
protocol_initval = hex(int(protocol))[2:]
if packet.vlan:
offset = 27
else:
offset = 23
self.udf_dict["protocol"] = {"udf_id": udf_id, "initval": protocol_initval, "offset": offset, "counter_type": 'c8',
"step": protocol_increment_step, "count": protocol_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["protocol"]))
# Set destination ip address increment
if dip_increment is not None:
dip_increment_step, dip_increment_count = self._check_increment(dip_increment, "dip_increment")
udf_id = len(self.udf_dict) + 1
dst_ip = packet[pypacker.layer3.ip.IP].dst_s.split('.')
dip_initval = str(hex(int(dst_ip[0])))[2:].zfill(2) + str(hex(int(dst_ip[1])))[2:].zfill(2) + \
str(hex(int(dst_ip[2])))[2:].zfill(2) + str(hex(int(dst_ip[3])))[2:].zfill(2)
if packet.vlan:
offset = 34
else:
offset = 30
self.udf_dict["dip"] = {"udf_id": udf_id, "initval": dip_initval, "offset": offset, "counter_type": 'c32', "step": dip_increment_step,
"count": dip_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["dip"]))
elif class_name == "ARP":
payl, commands = self._configure_arp(payl, chassis, card, port)
tcl_commands.extend(commands)
# Set arp source mac address increment
if arp_sa_increment is not None:
sa_increment_step, sa_increment_count = self._check_increment(arp_sa_increment, "arp_sa_increment")
udf_id = len(self.udf_dict) + 1
arp_hwsrc = packet[pypacker.layer12.arp.ARP].sha_s
src_mac = arp_hwsrc.split(':')
sa_initval = str(hex(int(src_mac[2], 16)))[2:].zfill(2) + str(hex(int(src_mac[3], 16)))[2:].zfill(2) + \
str(hex(int(src_mac[4], 16)))[2:].zfill(2) + str(hex(int(src_mac[5], 16)))[2:].zfill(2)
# Define Ether.src increment via IXIA UDF
self.udf_dict["src"] = {"udf_id": udf_id, "initval": sa_initval, "offset": 8, "counter_type": 'c32', "step": sa_increment_step,
"count": sa_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["src"]))
# Define ARP.hwsrc increment via IXIA UDF
udf_id += 1
if packet.vlan:
offset = 28
else:
offset = 24
self.udf_dict["arp_src"] = {"udf_id": udf_id, "initval": sa_initval, "offset": offset, "counter_type": 'c32', "step": sa_increment_step,
"count": sa_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["arp_src"]))
# Set arp source ip address increment
if arp_sip_increment is not None:
sip_increment_step, sip_increment_count = self._check_increment(arp_sip_increment, "arp_sip_increment")
udf_id = len(self.udf_dict) + 1
src_ip = packet[pypacker.layer12.arp.ARP].spa_s.split('.')
sip_initval = str(hex(int(src_ip[0])))[2:].zfill(2) + str(hex(int(src_ip[1])))[2:].zfill(2) + \
str(hex(int(src_ip[2])))[2:].zfill(2) + str(hex(int(src_ip[3])))[2:].zfill(2)
if packet.vlan:
offset = 32
else:
offset = 28
self.udf_dict["arp_ip"] = {"udf_id": udf_id, "initval": sip_initval, "offset": offset, "counter_type": 'c32',
"step": sip_increment_step, "count": sip_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["arp_ip"]))
elif class_name == "TCP":
payl, commands = self._configure_tcp(payl, chassis, card, port)
tcl_commands.extend(commands)
# Set tcp source port increment
if stcp_increment is not None:
stcp_increment_step, stcp_increment_count = self._check_increment(stcp_increment, "stcp_increment")
udf_id = len(self.udf_dict) + 1
stcp = packet[pypacker.layer4.tcp.TCP].sport
stcp_initval = str(hex(stcp))[2:].zfill(4)
if packet.vlan:
offset = 38
else:
offset = 34
self.udf_dict["stcp"] = {"udf_id": udf_id, "initval": stcp_initval, "offset": offset, "counter_type": 'c16',
"step": stcp_increment_step, "count": stcp_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["stcp"]))
# Set tcp destination port increment
if dtcp_increment is not None:
dtcp_increment_step, dtcp_increment_count = self._check_increment(dtcp_increment, "dtcp_increment")
udf_id = len(self.udf_dict) + 1
dtcp = packet[pypacker.layer4.tcp.TCP].dport
dtcp_initval = str(hex(dtcp))[2:].zfill(4)
if packet.vlan:
offset = 40
else:
offset = 36
self.udf_dict["dtcp"] = {"udf_id": udf_id, "initval": dtcp_initval, "offset": offset, "counter_type": 'c16',
"step": dtcp_increment_step, "count": dtcp_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["dtcp"]))
elif class_name == "UDP":
payl, commands = self._configure_udp(payl, chassis, card, port)
tcl_commands.extend(commands)
# Set udp source port increment
if sudp_increment is not None:
sudp_increment_step, sudp_increment_count = self._check_increment(sudp_increment, "sudp_increment")
udf_id = len(self.udf_dict) + 1
sudp = packet[pypacker.layer4.udp.UDP].sport
sudp_initval = str(hex(sudp))[2:].zfill(4)
if packet.vlan:
offset = 38
else:
offset = 34
self.udf_dict["sudp"] = {"udf_id": udf_id, "initval": sudp_initval, "offset": offset, "counter_type": 'c16',
"step": sudp_increment_step, "count": sudp_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["sudp"]))
# Set udp destination port increment
if dudp_increment is not None:
dudp_increment_step, dudp_increment_count = self._check_increment(dudp_increment, "dudp_increment")
udf_id = len(self.udf_dict) + 1
dudp = packet[pypacker.layer4.udp.UDP].dport
dudp_initval = str(hex(dudp))[2:].zfill(4)
if packet.vlan:
offset = 40
else:
offset = 36
self.udf_dict["dudp"] = {"udf_id": udf_id, "initval": dudp_initval, "offset": offset, "counter_type": 'c16',
"step": dudp_increment_step, "count": dudp_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["dudp"]))
elif class_name == "IGMP":
payl, commands = self._configure_igmp(payl, chassis, card, port)
tcl_commands.extend(commands)
# Set IGMP increment
if igmp_ip_increment is not None:
igmp_ip_increment_step, igmp_ip_increment_count = self._check_increment(igmp_ip_increment, "igmp_ip_increment")
udf_id = len(self.udf_dict) + 1
dst_ip = packet[pypacker.layer3.igmp.IGMP].gaddr.split('.')
dip_initval = str(hex(int(dst_ip[0])))[2:].zfill(2) + str(hex(int(dst_ip[1])))[2:].zfill(2) + \
str(hex(int(dst_ip[2])))[2:].zfill(2) + str(hex(int(dst_ip[3])))[2:].zfill(2)
if packet.vlan:
offset = 46
else:
offset = 42
self.udf_dict["igmp_ip"] = {"udf_id": udf_id, "initval": dip_initval, "offset": offset, "counter_type": 'c32',
"step": igmp_ip_increment_step, "count": igmp_ip_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["igmp_ip"]))
elif class_name == "ICMP":
payl, commands = self._configure_icmp(payl, chassis, card, port)
tcl_commands.extend(commands)
# TODO: check this scenario
else:
break
else:
hex_data = codecs.encode(packet.bin(), "hex_codec").decode()
pattern = hex_data[24:]
self.class_logger.debug("Packet data: %s" % (pattern, ))
packet_size = len(packet) + 4
if isinstance(required_size, int):
if packet_size < 64:
self.class_logger.warning("In case of AFS cross connect usage undersized packets less 64 bytes will be droped!!!")
if packet_size >= 48:
tcl_commands.append("stream config -frameSizeMIN %s; stream config -frameSizeMAX %s;" % (packet_size, packet_size))
else:
tcl_commands.append("stream config -frameSizeMIN 48; stream config -frameSizeMAX 48;")
tcl_commands.append("stream config -frameSizeType sizeRandom;")
else:
tcl_commands.append("stream config -framesize %s;" % (packet_size, ))
size_increment_type = ''
if isinstance(required_size, tuple):
if required_size[0] == 'Increment':
size_increment_type = 'increment'
try:
size_increment_step = required_size[1]
size_increment_min_val = required_size[2]
size_increment_max_val = required_size[3]
except:
raise TypeError("'Increment' required_size must contain 3 integer values.")
if size_increment_max_val < size_increment_min_val:
raise TypeError("'Increment' max_value is less than min_value.")
elif required_size[0] == 'Random':
size_increment_type = 'random'
try:
size_increment_min_val = required_size[1]
size_increment_max_val = required_size[2]
except:
raise TypeError("'Increment' required_size must contain 3 integer values.")
if size_increment_max_val < size_increment_min_val:
raise TypeError("'Increment' max_value is less than min_value.")
else:
raise TypeError("required_size contains wrong values.")
if size_increment_type == 'random':
tcl_commands.append("stream config -frameSizeMIN %s; stream config -frameSizeMAX %s;" % (size_increment_min_val, size_increment_max_val))
tcl_commands.append("stream config -frameSizeType sizeRandom;")
elif size_increment_type == 'increment':
tcl_commands.append("stream config -frameSizeMIN %s; stream config -frameSizeMAX %s;" % (size_increment_min_val, size_increment_max_val))
tcl_commands.append("stream config -frameSizeStep %s;" % (size_increment_step,))
tcl_commands.append("stream config -frameSizeType sizeIncr;")
tcl_commands.append("stream config -enableIncrFrameBurstOverride true;")
# Set source ipv6 address increment
if sipv6_increment is not None:
sipv6_increment_step, sipv6_increment_count = self._check_increment(sipv6_increment, "sipv6_increment")
el_id = 0
udf_id = len(self.udf_dict) + 1
src_ipv6 = packet[pypacker.layer3.ip6.IP6].src_s.split(':')
for i, v in enumerate(src_ipv6):
if len(src_ipv6[i]) == 0:
el_id = src_ipv6.index(src_ipv6[i])
src_ipv6.remove(src_ipv6[i])
break
while True:
if len(src_ipv6) < 8:
src_ipv6.insert(el_id, '0000')
else:
break
sipv6_initval = str(src_ipv6[6]).zfill(4) + str(src_ipv6[7]).zfill(4)
if packet.vlan:
offset = 38
else:
offset = 34
self.udf_dict["sipv6"] = {"udf_id": udf_id, "initval": sipv6_initval, "offset": offset, "counter_type": 'c32', "step": sipv6_increment_step,
"count": sipv6_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["sipv6"]))
# Set destination ipv6 address increment
if dipv6_increment is not None:
dipv6_increment_step, dipv6_increment_count = self._check_increment(dipv6_increment, "dipv6_increment")
el_id = 0
udf_id = len(self.udf_dict) + 1
dst_ipv6 = packet[pypacker.layer3.ip6.IP6].dst_s.split(':')
for i, v in enumerate(dst_ipv6):
if len(dst_ipv6[i]) == 0:
el_id = dst_ipv6.index(dst_ipv6[i])
dst_ipv6.remove(dst_ipv6[i])
break
while True:
if len(dst_ipv6) < 8:
dst_ipv6.insert(el_id, '0000')
else:
break
dipv6_initval = str(dst_ipv6[6]).zfill(4) + str(dst_ipv6[7]).zfill(4)
if packet.vlan:
offset = 54
else:
offset = 50
self.udf_dict["dipv6"] = {"udf_id": udf_id, "initval": dipv6_initval, "offset": offset, "counter_type": 'c32', "step": dipv6_increment_step,
"count": dipv6_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["dipv6"]))
# Set Flow Label increment
if fl_increment is not None:
fl_increment_step, fl_increment_count = self._check_increment(fl_increment, "fl_increment")
if fl_increment_count > 1048575:
self.class_logger.warning("IP.fl increment count decreased to 255 for proper IP checksum")
fl_increment_count = 1048576
udf_id = len(self.udf_dict) + 1
fl = packet[pypacker.layer3.ip6.IP6].fl
fl_initval = hex(int(fl))[2:]
if packet.vlan:
offset = 19
else:
offset = 15
self.udf_dict["fl"] = {"udf_id": udf_id, "initval": fl_initval, "offset": offset, "counter_type": 'c24', "step": fl_increment_step,
"count": fl_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["fl"]))
# Set lldp ip address increment
if lldp_sa_increment is not None:
lldp_increment_step, lldp_increment_count = self._check_increment(lldp_sa_increment, "lldp_sa_increment")
udf_id = len(self.udf_dict) + 1
lldp_tlv = hex_data[28:34]
lldp_mac_initval = hex_data[38:46]
if lldp_tlv == '020704':
self.udf_dict["lldp_sa"] = {"udf_id": udf_id, "initval": lldp_mac_initval, "offset": 19, "counter_type": 'c32', "step": lldp_increment_step,
"count": lldp_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["lldp_sa"]))
else:
raise TypeError("lldp tlv is not '\\x02\\x07\\x04'.")
# Set ether type increment
if eth_type_increment is not None:
eth_type_increment_step, eth_type_increment_count = self._check_increment(eth_type_increment, "eth_type_increment")
udf_id = len(self.udf_dict) + 1
eth_type = packet[pypacker.layer12.ethernet.Ethernet].type
eth_type_initval = str(hex(eth_type))[2:].zfill(4)
self.udf_dict["eth_type"] = {"udf_id": udf_id, "initval": eth_type_initval, "offset": 12, "counter_type": 'c16', "step": eth_type_increment_step,
"count": eth_type_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["eth_type"]))
# Set dhcp siaddr increment
if dhcp_si_increment is not None:
dhcp_si_increment_step, dhcp_si_increment_count = self._check_increment(dhcp_si_increment, "dhcp_si_increment")
udf_id = len(self.udf_dict) + 1
dhcp_si = packet.getlayer(pypacker.BOOTP).siaddr.split('.') # pylint: disable=no-member
dhcp_si_initval = str(hex(int(dhcp_si[0])))[2:].zfill(2) + str(hex(int(dhcp_si[1])))[2:].zfill(2) + \
str(hex(int(dhcp_si[2])))[2:].zfill(2) + str(hex(int(dhcp_si[3])))[2:].zfill(2)
if packet.vlan:
offset = 66
else:
offset = 62
self.udf_dict["dhcp_si"] = {"udf_id": udf_id, "initval": dhcp_si_initval, "offset": offset, "counter_type": 'c32', "step": dhcp_si_increment_step,
"count": dhcp_si_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["dhcp_si"]))
# Set IPv6 TC increment
if tc_increment is not None:
tc_increment_step, tc_increment_count = self._check_increment(tc_increment, "tc_increment")
udf_id = len(self.udf_dict) + 1
tc = packet[pypacker.layer3.ip6.IP6].tc
tc_initval = tc
bit_offset = 4
if packet.vlan:
offset = 18
else:
offset = 14
self.udf_dict["ipv6_tc"] = {"udf_id": udf_id, "initval": tc_initval, "offset": offset, "counter_type": 'c8', "bit_offset": bit_offset,
"step": tc_increment_step, "count": tc_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["ipv6_tc"]))
# Set IPv6 NH increment
if nh_increment is not None:
nh_increment_step, nh_increment_count = self._check_increment(nh_increment, "nh_increment")
udf_id = len(self.udf_dict) + 1
nh = packet[pypacker.layer3.ip6.IP6].nh
nh_initval = nh
if packet.vlan:
offset = 24
else:
offset = 20
self.udf_dict["ipv6_nh"] = {"udf_id": udf_id, "initval": nh_initval, "offset": offset, "counter_type": 'c8',
"step": nh_increment_step, "count": nh_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["ipv6_nh"]))
# Set LSP ID increment
if isis_lspid_increment is not None:
lspid_increment_step, lspid_increment_count = self._check_increment(isis_lspid_increment, "isis_lspid_increment")
udf_id = len(self.udf_dict) + 1
lid_h = packet[pypacker.layer3.ip6.IP6].lspid
try:
lid_init = str(int(lid_h.split('.')[2]) + 1).zfill(4)
except IndexError:
lid_init = "0001"
if packet.vlan:
offset = 37
else:
offset = 33
self.udf_dict["lspid"] = {"udf_id": udf_id, "initval": lid_init, "offset": offset, "counter_type": 'c16',
"step": lspid_increment_step, "count": lspid_increment_count, "continuous": continuous}
tcl_commands.extend(self._set_ixia_udf_field(**self.udf_dict["lspid"]))
# self.stream_ids[stream_id]['size'] = packet_size
tcl_commands.append("stream config -dataPattern userpattern;")
tcl_commands.append("stream config -patternType nonRepeat;")
if pattern is not None:
tcl_commands.append("stream config -pattern {%s};" % (pattern,))
return tcl_commands
[docs] def _set_ixia_stream(self, packet, count, inter, rate, sa_increment, da_increment, sip_increment, dip_increment, arp_sa_increment,
arp_sip_increment, igmp_ip_increment, lldp_sa_increment, sudp_increment, dudp_increment, stcp_increment,
dtcp_increment, vlan_increment, continuous, required_size, iface, chassis, card, port, stream_id, eth_type_increment,
dscp_increment, protocol_increment, sipv6_increment, dipv6_increment, fl_increment, dhcp_si_increment,
in_vlan_increment, tc_increment, nh_increment, isis_lspid_increment, cont_burst, force_errors, udf_dependancies):
"""Set traffic stream with specified parameters on specified TG port.
Args:
packet(pypacker.Packet): Packet to analyze
count(int): How many packets to send in a stream.
inter(int): Interval between sending each packet.
rate(int): Interface rate in percents.
continuous(bool): Should stream be sent continuously or not. Continuous streams have to be started using start_streams method.
iface(str, tuple): Interface to use for packet sending (type depends on particular tg ports type).
required_size(int, tuple): Integer or tuple of parameters needed to be set when packet size should be incremented.
Tuple examples: ('Increment', <step>, <min>, <max>), ('Random', <min>, <max>)
sa_increment(tuple): Source MAC increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
da_increment(tuple): Destination MAC increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
sip_increment(tuple): Source IPv4 increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
dip_increment(tuple): Destination IPv4 increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
arp_sa_increment(tuple): Source MAC increment parameters for ARP packet. Tuple (<step>, <count>). Has to be used in pair with arp_sip_increment.
arp_sip_increment(tuple): Source IP increment parameters for ARP packet. Tuple (<step>, <count>). Has to be used in pair with arp_sa_increment.
igmp_ip_increment(tuple): Destination IP increment parameters for IGMP packet. Tuple (<step>, <count>).
lldp_sa_increment(tuple): Source MAC increment parameters for LLDP packet. Tuple (<step>, <count>).
vlan_increment(tuple): VLAN increment parameters for tagged packet. Tuple (<step>, <count>).
sudp_increment(tuple): UDP source port increment parameters.
dudp_increment(tuple): UDP destination port increment parameters.
eth_type_increment(tuple): Ethernet frame type increment parameters.
dscp_increment(tuple): DSCP increment parameters.
protocol_increment(tuple): IP protocol incrementation..
sipv6_increment(tuple): Source IPv6 increment parameters.
dipv6_increment(tuple): Destination IPv6 increment parameters.
fl_increment(tuple): Flow label increment parameters.
dhcp_si_increment(tuple): DHCP IP increment parameters.
in_vlan_increment(tuple): Inner vlan ID increment parameters for double tagged frames. Tuple (<step>, <count>).
tc_increment(tuple): IPv6 Traffic Class increment parameters.
nh_increment(tuple): IPv6 Next Header increment parameters.
cont_burst(bool): Should stream be sent as continuous burst or not. Continuous streams have to be started using start_streams method.
force_errors(str): Emulate Errors for configured stream.
Enum ("bad" /*streamErrorBadCRC, "none" /*streamErrorNoCRC, "dribble" /*streamErrorDribble, "align" /*streamErrorAlignment).
udf_dependancies(dict): Set UDF dependencies in case one incerement is dependant from another.
Dictionary {<dependant_increment> : <initial_increment>}
chassis(int): TG chassis id
card(int): TG card id
port(int): TG port id
Returns:
int: stream id
Notes:
It's not expected to configure a lot of incrementation options. Different traffic generator could have different limitations for these options.
Example::
stream_id_1 = tg.set_stream(pack_ip, count=100, iface=iface)
stream_id_2 = tg.set_stream(pack_ip, continuous=True, inter=0.1, iface=iface)
stream_id_3 = tg.set_stream(pack_ip_udp, count=5, protocol_increment=(3, 5), iface=iface)
stream_id_4 = tg.set_stream(pack_ip_udp, count=18, sip_increment=(3, 3), dip_increment=(3, 3), iface=iface,
udf_dependancies={'sip_increment': 'dip_increment'})
"""
# Defining ix_stream_id
iface_stream_ids = []
if self.stream_ids:
for stream_conf in list(self.stream_ids.values()):
if stream_conf['iface'] == iface:
# iface_stream_ids.append(stream_conf['ix_stream_id'])
# Add all elements of stream_conf['ix_stream_id'] to iface_stream_ids
list(map(iface_stream_ids.append, stream_conf['ix_stream_id']))
if iface_stream_ids:
ix_stream_id = max(iface_stream_ids) + 1
else:
ix_stream_id = 1
else:
ix_stream_id = 1
self.class_logger.debug("Stream IX_ID is: %s" % (ix_stream_id, ))
# Set stream default parameters
tcl_commands = []
tcl_commands.append("stream setDefault;")
tcl_commands.append("stream config -name \"TST%s\";" % stream_id)
if continuous:
tcl_commands.append("stream config -dma contPacket;")
elif cont_burst:
tcl_commands.append("stream config -dma contBurst;")
else:
tcl_commands.append("stream config -dma stopStream;")
if force_errors == "bad":
tcl_commands.append("stream config -fcs streamErrorBadCRC;")
elif force_errors == "none":
tcl_commands.append("stream config -fcs streamErrorNoCRC;")
elif force_errors == "dribble":
tcl_commands.append("stream config -fcs streamErrorDribble;")
elif force_errors == "align":
tcl_commands.append("stream config -fcs streamErrorAlignment;")
else:
tcl_commands.append("stream config -fcs streamErrorGood;")
# Set frame per second rate
if inter != 0:
tcl_commands.append("stream config -rateMode streamRateModeFps; stream config -fpsRate %s;" % (1.0 / inter))
else:
# By default rate = 99% to avoid the issue with dropping frames on AFS. But here we can override default values
tcl_commands.append("stream config -percentPacketRate %s;" % (rate, ))
# Set source and destination MAC
# TODO: Pypacker does not support Dot3(IEEE 802.3). Need to develop
# try:
# tcl_commands.append("stream config -sa {%s}; stream config -da {%s};" % (packet.getlayer(pypacker.Dot3).src, packet.getlayer(pypacker.Dot3).dst))
# except AttributeError:
tcl_commands.append("stream config -sa {%s}; stream config -da {%s};" % (packet.src_s, packet.dst_s))
tcl_commands.extend(
self._set_increments(packet, sa_increment, da_increment, sip_increment, dip_increment,
arp_sa_increment,
arp_sip_increment, igmp_ip_increment, lldp_sa_increment,
sudp_increment, dudp_increment, stcp_increment, dtcp_increment,
vlan_increment, continuous, required_size, eth_type_increment,
dscp_increment, protocol_increment, sipv6_increment,
dipv6_increment, fl_increment, dhcp_si_increment,
in_vlan_increment, tc_increment, nh_increment, isis_lspid_increment, chassis, card, port, force_errors))
if not continuous:
tcl_commands.append("stream config -numFrames %s;" % count)
# Applying settings
# self.class_logger.debug("tcl_commands:\n%s" % "\n".join(tcl_commands))
self.tcl(" ".join(tcl_commands))
# Setting stream (stream is disabled by default)
assert self.tcl("stream config -enable false; \
stream set %(chassis)s %(card)s %(port)s %(stream_id)s; \
stream write %(chassis)s %(card)s %(port)s %(stream_id)s" %
{'chassis': chassis, 'card': card, 'port': port, 'stream_id': ix_stream_id}) == "0"
# Set UDF dependencies
tcl_commands = []
self.tcl("stream get %(chassis)s %(card)s %(port)s %(stream_id)s;" %
{'chassis': chassis, 'card': card, 'port': port, 'stream_id': ix_stream_id})
if udf_dependancies is not None:
for _key, _value in udf_dependancies.items():
tcl_commands.extend(self._set_udf_dependence(_key, _value))
self.tcl(" ".join(tcl_commands))
self.tcl("stream set %(chassis)s %(card)s %(port)s %(stream_id)s;" %
{'chassis': chassis, 'card': card, 'port': port, 'stream_id': ix_stream_id})
return ix_stream_id
[docs] def _set_udf_dependence(self, key, value):
"""Set UDF dependencies.
Args:
key(str): increment name, e.g "vlan_increment", "sipv6_increment"
value(str): increment name, e.g "vlan_increment", "sipv6_increment"
Returns:
list: Tcl commands
"""
tcl_commands = []
udf_id = self.udf_dict[key.replace("_increment", "")]["udf_id"]
dep_udf_id = self.udf_dict[value.replace("_increment", "")]["udf_id"]
tcl_commands.append("udf get %(udf_id)s;" % {'udf_id': udf_id, })
tcl_commands.append("udf config -chainFrom udf%(dep_udf_id)s;" % {'dep_udf_id': dep_udf_id, })
tcl_commands.append("udf set %(udf_id)s;" % {'udf_id': udf_id, })
return tcl_commands
[docs] def set_stream(self, packet_def=None, count=1, inter=0, rate=99, sa_increment=None, da_increment=None, sip_increment=None, dip_increment=None,
is_valid=False, arp_sa_increment=None, arp_sip_increment=None, igmp_ip_increment=None, lldp_sa_increment=None, vlan_increment=None,
sudp_increment=None, dudp_increment=None, stcp_increment=None, dtcp_increment=None, continuous=False, iface=None, adjust_size=True,
required_size=64, fragsize=None, build_packet=True, eth_type_increment=None, dscp_increment=None, protocol_increment=None,
sipv6_increment=None, dipv6_increment=None, fl_increment=None, dhcp_si_increment=None, in_vlan_increment=None,
tc_increment=None, nh_increment=None, isis_lspid_increment=None, cont_burst=False, force_errors=None, udf_dependancies=None):
"""Set traffic stream with specified parameters on specified TG port.
Args:
packet_def(tuple(dict{dict})): Packet definition. Tuple of dictionaries of dictionaries in format:
({layerX: {field1: value, field2: value}, {layerY: {field1:value, fieldN: value}})
count(int): How many packets to send in a stream.
inter(int): Interval between sending each packet.
rate(int): Interface rate in percents.
continuous(bool): Should stream be sent continuously or not. Continuous streams have to be started using start_streams method.
iface(str, tuple): Interface to use for packet sending (type depends on particular tg ports type).
adjust_size(bool): See description for _build_pypacker_packet function.
required_size(int, tuple): Integer or tuple of parameters needed to be set when packet size should be incremented.
Tuple examples: ('Increment', <step>, <min>, <max>), ('Random', <min>, <max>)
fragsize(int): Max size of packet's single frame
is_valid(bool): Recalculate check sum and length for each packet layer
(by default pypacker do this automatically in case length and check sum aren't set).
This parameter has to be set True with all incrementation parameters.
build_packet(bool): Build packet from definition or use already built pypacker packet.
sa_increment(tuple): Source MAC increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
da_increment(tuple): Destination MAC increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
sip_increment(tuple): Source IPv4 increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
dip_increment(tuple): Destination IPv4 increment parameters. Tuple (<step>, <count>). Use count=0 for continuous increment.
arp_sa_increment(tuple): Source MAC increment parameters for ARP packet. Tuple (<step>, <count>). Has to be used in pair with arp_sip_increment.
arp_sip_increment(tuple): Source IP increment parameters for ARP packet. Tuple (<step>, <count>). Has to be used in pair with arp_sa_increment.
igmp_ip_increment(tuple): Destination IP increment parameters for IGMP packet. Tuple (<step>, <count>).
lldp_sa_increment(tuple): Source MAC increment parameters for LLDP packet. Tuple (<step>, <count>).
vlan_increment(tuple): VLAN increment parameters for tagged packet. Tuple (<step>, <count>).
sudp_increment(tuple): UDP source port increment parameters.
dudp_increment(tuple): UDP destination port increment parameters.
eth_type_increment(tuple): Ethernet frame type increment parameters.
dscp_increment(tuple): DSCP increment parameters.
protocol_increment(tuple): IP protocol incrementation..
sipv6_increment(tuple): Source IPv6 increment parameters.
dipv6_increment(tuple): Destination IPv6 increment parameters.
fl_increment(tuple): Flow label increment parameters.
dhcp_si_increment(tuple): DHCP IP increment parameters.
in_vlan_increment(tuple): Inner vlan ID increment parameters for double tagged frames. Tuple (<step>, <count>).
tc_increment(tuple): IPv6 Traffic Class increment parameters.
nh_increment(tuple): IPv6 Next Header increment parameters.
cont_burst(bool): Should stream be sent as continuous burst or not. Continuous streams have to be started using start_streams method.
force_errors(str): Emulate Errors for configured stream.
Enum ("bad" /*streamErrorBadCRC, "none" /*streamErrorNoCRC, "dribble" /*streamErrorDribble, "align" /*streamErrorAlignment).
udf_dependancies(dict): Set UDF dependencies in case one incerement is dependant from another.
Dictionary {<dependant_increment> : <initial_increment>}
stcp_increment(tuple): source TCP address increment
dtcp_increment(tuple): destination TCP address increment
Returns:
int: stream id
Notes:
It's not expected to configure a lot of incrementation options. Different traffic generator could have different limitations for these options.
Example::
stream_id_1 = tg.set_stream(pack_ip, count=100, iface=iface)
stream_id_2 = tg.set_stream(pack_ip, continuous=True, inter=0.1, iface=iface)
stream_id_3 = tg.set_stream(pack_ip_udp, count=5, protocol_increment=(3, 5), iface=iface)
stream_id_4 = tg.set_stream(pack_ip_udp, count=18, sip_increment=(3, 3), dip_increment=(3, 3), iface=iface,
udf_dependancies={'sip_increment': 'dip_increment'})
"""
stream_id = max(self.stream_ids.keys()) + 1 if self.stream_ids else 1
self.class_logger.debug("Stream ID is: %s" % (stream_id, ))
if build_packet:
if isinstance(required_size, int):
packet = self._build_pypacker_packet(packet_def, adjust_size=adjust_size, required_size=required_size - 4)
else:
packet = self._build_pypacker_packet(packet_def, adjust_size=adjust_size)
else:
packet = packet_def
if iface is not None:
# pylint can't figure out that this might be a tuple
chassis, card, port = iface # pylint: disable=W0633
rate = rate * self.rate[iface]
self.stream_ids[stream_id] = \
{'iface': iface, 'count': count, 'inter': inter, 'rate': rate, 'continuous': continuous, 'sa_increment': sa_increment,
'da_increment': da_increment, 'sip_increment': sip_increment, 'dip_increment': dip_increment, 'arp_sa_increment': arp_sa_increment,
'arp_sip_increment': arp_sip_increment, 'igmp_ip_increment': igmp_ip_increment, 'vlan_increment': vlan_increment, 'ix_stream_id': [],
'sudp_increment': sudp_increment, 'dudp_increment': dudp_increment, 'stcp_increment': stcp_increment, 'dtcp_increment': dtcp_increment,
'eth_type_increment': eth_type_increment, 'sipv6_increment': sipv6_increment, 'dipv6_increment': dipv6_increment, 'fl_increment': fl_increment,
'dhcp_si_increment': dhcp_si_increment, 'in_vlan_increment': in_vlan_increment, 'tc_increment': tc_increment, 'nh_increment': nh_increment,
'isis_lspid_increment': isis_lspid_increment, 'cont_burst': cont_burst, 'force_errors': force_errors,
'udf_dependancies': udf_dependancies}
kwargs = locals()
kwargs.pop("self")
kwargs.pop("packet_def")
kwargs.pop("fragsize")
kwargs.pop("adjust_size")
kwargs.pop("is_valid")
kwargs.pop("build_packet")
if fragsize is not None:
fragments = pypacker.fragment(packet, fragsize) # pylint: disable=no-member
for fragment in fragments:
kwargs['packet'] = fragment
self.stream_ids[stream_id]['ix_stream_id'].append(self._set_ixia_stream(**kwargs))
else:
kwargs['packet'] = packet
self.stream_ids[stream_id]['ix_stream_id'].append(self._set_ixia_stream(**kwargs))
self.class_logger.debug("Stream set done.")
self.class_logger.debug("stream_id[%s]: %s" % (stream_id, self.stream_ids[stream_id]))
self.class_logger.debug("stream_ids: %s" % ([_id for _id in self.stream_ids]))
return stream_id
[docs] def send_stream(self, stream_id=None):
"""Sends the stream created by 'set_stream' method.
Args:
stream_id(int): ID of the stream to be send.
Returns:
float: timestamp.
"""
def _send_ixia_stream(ix_stream_id, chassis, card, port, count):
self.tcl("stream get %(chassis)s %(card)s %(port)s %(stream_id)s;" %
{'chassis': chassis, 'card': card, 'port': port, 'stream_id': ix_stream_id})
_rate = float(self.tcl("stream cget -fpsRate;"))
attempts = 10 + 1000 / (50 * _rate)
self._transmit_stream(chassis, card, port, ix_stream_id)
# Wait until all packets sent
assert self.tcl("streamTransmitStats get %(chassis)s %(card)s %(port)s %(stream_id)s %(stream_id)s;\
set txCount [streamTransmitStats cget -framesSent];\
set endCount [expr {$startCount%(stream_id)s+%(count)s}];\
puts \"IXIA: Start sending %(count)s frames...\";\
set _tx_count 0;\
set attemptsCount 0;\
while {$txCount<$endCount} {\
after 50; \
streamTransmitStats get %(chassis)s %(card)s %(port)s 1 %(stream_id)s;\
streamTransmitStats getGroup %(stream_id)s;\
set txCount [streamTransmitStats cget -framesSent];\
if {$txCount > $_tx_count} {\
set _tx_count $txCount;\
set attemptsCount 0;\
} else {\
incr attemptsCount;\
}; \
if {$attemptsCount > %(attempts)s} {\
return 1;\
}; \
}; \
set sentCount [expr {$txCount-$startCount%(stream_id)s}];\
puts \"IXIA: Sent $sentCount frames.\";\
return 0;" %
{'chassis': chassis, 'card': card, 'port': port, 'stream_id': ix_stream_id, 'count': count, 'attempts': attempts}) == "0", \
"Ixia: Not all packets were sent"
# Disable stream
self._disable_stream(chassis, card, port, ix_stream_id)
# Sending IXIA stream or streams if frame fragmentation was performed
self.class_logger.debug("Sending stream %s..." % stream_id)
chassis, card, port = self.stream_ids[stream_id]['iface']
for ix_stream_id in self.stream_ids[stream_id]['ix_stream_id']:
_send_ixia_stream(ix_stream_id, chassis, card, port, self.stream_ids[stream_id]['count'])
[docs] def start_streams(self, stream_list):
"""Enable and start streams from the list simultaneously.
Args:
stream_list(list[int]): List of stream IDs.
Returns:
None
"""
self.class_logger.debug("Starting streams %s..." % stream_list)
# Enable streams and collect tx ports
ports_list = set()
for stream_id in stream_list:
chassis, card, port = self.stream_ids[stream_id]['iface']
for ix_stream_id in self.stream_ids[stream_id]['ix_stream_id']:
self._enable_stream(chassis, card, port, ix_stream_id)
ports_list.add(self.stream_ids[stream_id]['iface'])
# Convert collected tx port to tcl representation
tcl_ports_list = str(list(ports_list)).replace("(", "{").replace(")", "}").replace("[", "{").replace("]", "}").replace("'", "").replace(",", "")
self.tcl("set txPortIdList %s" % tcl_ports_list) == "0"
# Check Link State before sending stream.
if self.tcl("ixCheckLinkState txPortIdList") != "0":
raise IxiaException("Link is Down.")
# Start transmission
assert self.tcl("ixStartTransmit txPortIdList") == "0"
[docs] def stop_streams(self, stream_list=None):
""" Disable streams from the list.
Args:
stream_list(list[int]): Stream IDs to stop. In case stream_list is not set all running streams will be stopped.
Returns:
None
"""
# If stream_list not defined then stop all streams
if not stream_list:
stream_list = [key for key in list(self.stream_ids.keys())]
self.class_logger.debug("Stopping streams %s..." % stream_list)
for stream_id in stream_list:
chassis, card, port = self.stream_ids[stream_id]['iface']
for ix_stream_id in self.stream_ids[stream_id]['ix_stream_id']:
self._disable_stream(chassis, card, port, ix_stream_id)
[docs] def stop_all_streams(self):
"""Stop streams for all owned ports. ownedPortList variable creates on connect stage.
Raises:
AssertionError: error in executing tcl code
Returns:
None
"""
assert self.tcl("ixStopTransmit ownedPortList") == "0"
@staticmethod
[docs] def _get_port_info(iface):
"""Simple helper which allows to get interface info split.
Args:
iface(str): Which IXIA interface to use for packet sending (in format "{chassis_id card_id port_id}")
Returns:
list(str): [chassis_id, card_id, port_id]
"""
return iface[1:-1].split(" ")
@staticmethod
[docs] def _get_port_to_string(iface):
"""Simple helper which allows to get string representation for interface.
Args:
iface(list): Which IXIA interface to use for packet sending (in format [chassis_id, card_id, port_id])
Returns:
str: "chassis_id/card_id/port_id"
"""
return "/".join(map(str, iface))
[docs] def _set_filter_params(self, layer):
"""Configures filter parameters for specified layer.
Args:
layer(str): Layer name.
Returns:
str: Tcl command.
"""
def _set_user_pattern(ptrn_num, ptrn_cfg):
_tcl_commands = ""
_tcl_commands += "filterPallette config -pattern%s \"%s\";" % (ptrn_num, ptrn_cfg[0], )
_tcl_commands += "filterPallette config -patternMask%s \"%s\";" % (ptrn_num, ptrn_cfg[1], )
_tcl_commands += "filterPallette config -patternOffset%s \"%s\";" % (ptrn_num, ptrn_cfg[2], )
if len(ptrn_cfg) > 3:
_tcl_commands += "filterPallette config -patternOffsetType%s \"filterPalletteOffsetStartOf%s\";" % (ptrn_num, ptrn_cfg[3], )
return _tcl_commands
tcl_filter = ""
if "ptrn1" in self.flt_patterns[layer]:
tcl_filter += _set_user_pattern(1, self.flt_patterns[layer]['ptrn1'])
tcl_filter += "filterPallette config -matchType1 %s;" % (self.flt_patterns[layer]['mt1'], )
if "ptrn2" in self.flt_patterns[layer]:
tcl_filter += _set_user_pattern(2, self.flt_patterns[layer]['ptrn2'])
tcl_filter += "filterPallette config -matchType2 %s;" % (self.flt_patterns[layer]['mt2'], )
tcl_filter += "filter config -captureFilterPattern %s;" % (self.flt_patterns[layer]['cfp'], )
return tcl_filter
[docs] def start_sniff(self, ifaces, sniffing_time=None, packets_count=0, filter_layer=None, src_filter=None, dst_filter=None):
"""Starts sniffing on specified interfaces.
Args:
ifaces(list): List of TG interfaces for capturing.
sniffing_time(int): Time in seconds for sniffing.
packets_count(int): Count of packets to sniff (no count limitation in case 0).
filter_layer(str): Name of predefined sniffing filter criteria.
src_filter(str): Sniff only packet with defined source MAC.
dst_filter(str): Sniff only packet with defined destination MAC.
Returns:
None
Notes:
This method introduces additional 1.5 seconds timeout after capture enabling.
It's required by Ixia sniffer to wait until capturing is started.
Example::
env.tg[1].start_sniff(['eth0', ], filter_layer='IP', src_filter='00:00:00:01:01:01', dst_filter='00:00:00:22:22:22')
"""
self.class_logger.debug("Starting capturing on ifaces: %s" % (ifaces, ))
if filter_layer:
if isinstance(filter_layer, tuple) and len(filter_layer) == 3:
self.flt_patterns["Custom"] = {'ptrn1': [filter_layer[1], filter_layer[2], str(filter_layer[0])], 'mt1': "matchUser",
'cfp': "pattern1"}
filter_layer = "Custom"
if filter_layer not in self.flt_patterns:
raise IxiaException("Invalid filter_layer = %s. Allowed values: %s" % (filter_layer, list(self.flt_patterns.keys())))
self.class_logger.debug("Sniff params: %s seconds, %s packets, %s filter layer, %s srcMac, %s dstMac." %
(sniffing_time, packets_count if packets_count != 0 else None, filter_layer, src_filter, dst_filter))
# Store sniff settings per port to dictionary
for iface in ifaces:
_iface = self._get_port_to_string(iface)
if _iface in self.sniff_ids:
message = "Sniffer already started on iface %s %s %s." % iface
self.class_logger.error(message)
raise IxiaException(message)
self.sniff_ids[_iface] = {}
self.sniff_ids[_iface]['count'] = packets_count
self.sniff_ids[_iface]['layer'] = filter_layer
self.sniff_ids[_iface]['time'] = sniffing_time
# Define empty filter by default
filter_tcl = []
filter_tcl.append("filter setDefault;")
filter_tcl.append("capture setDefault;")
filter_tcl.append("capture config -captureMode captureContinuousMode;")
# Define realtime filter
if filter_layer or src_filter or dst_filter:
# and filtering_method == "realtime":
self.class_logger.debug("Enabling filter for data capturing...")
filter_tcl.append("filterPallette setDefault;")
filter_tcl.append("capture config -continuousFilter captureContinuousFilter;")
filter_tcl.append("filter config -captureFilterEnable true;")
else:
filter_tcl.append("filter config -captureFilterEnable false;")
if filter_layer:
# and filtering_method == "realtime":
filter_tcl.append(self._set_filter_params(filter_layer))
# Define srcMac filter
if src_filter:
if ':' in src_filter:
src_filter = src_filter.replace(':', ' ')
filter_tcl.append("filterPallette config -SA1 {%s};" % (src_filter, ))
filter_tcl.append("filter config -captureFilterSA addr1;")
# Define dstMac filter
if dst_filter:
if ':' in dst_filter:
dst_filter = dst_filter.replace(':', ' ')
filter_tcl.append("filterPallette config -DA1 {%s};" % (dst_filter, ))
filter_tcl.append("filter config -captureFilterDA addr1;")
# Send filter config to IXIA
self.tcl(" ".join(filter_tcl))
# Apply filter to ports
ports_list = []
for iface in ifaces:
tcl_cmd = ["set retCode $::TCL_OK;"]
chassis, card, port = iface
tcl_cmd.append("if {[capture set %(chassis)s %(card)s %(port)s]} {errorMsg \"<<capture set>> return error.\"; set retCode $::TCL_ERROR};")
if filter_layer or src_filter or dst_filter:
tcl_cmd.append("if {[filterPallette set %(chassis)s %(card)s %(port)s]} \
{errorMsg \"<<filterPallette set>> command return error.\"; set retCode $::TCL_ERROR};")
tcl_cmd.append("if {[filter set %(chassis)s %(card)s %(port)s]} {errorMsg \"<<filter set>> command return error.\"; set retCode $::TCL_ERROR};")
tcl_cmd.append("return $retCode;")
assert self.tcl(" ".join(tcl_cmd) % locals()) == "0"
ports_list.append(iface)
tcl_ports_list = str(ports_list).replace("(", "{").replace(")", "}").replace("[", "{").replace("]", "}").replace("'", "").replace(",", "")
assert self.tcl("set rxPortIdList %s;\
set retCode $::TCL_OK;\
if {[ixWriteConfigToHardware rxPortIdList]} {errorMsg \"ixWriteConfigToHardware return error.\"; set retCode $::TCL_ERROR};\
if {[ixClearStats rxPortIdList]} {errorMsg \"ixClearStats return error.\"; set retCode $::TCL_ERROR};\
if {[ixStartCapture rxPortIdList]} {errorMsg \"ixStartCapture return error.\"; set retCode $::TCL_ERROR};\
if {[ixClearTimeStamp rxPortIdList]} {errorMsg \"ixClearTimeStamp return error.\"; set retCode $::TCL_ERROR};\
return $retCode" %
(tcl_ports_list, )) == "0"
sniffing_start_timestamp = time.time()
for iface in ifaces:
_iface = self._get_port_to_string(iface)
self.sniff_ids[_iface]['start_time'] = sniffing_start_timestamp
# Wait for assurance that sniffing is started
time.sleep(1.5)
[docs] def stop_sniff(self, ifaces, force=False, drop_packets=False, sniff_packet_count=1000):
"""Stops sniffing on specified interfaces and returns captured data.
Args:
ifaces(list): List of interfaces where capturing has to be stopped.
force(bool): Stop capturing even if time or packet count criteria isn't achieved.
drop_packets(bool): Don't return sniffed data (used in case you need just read statistics).
sniff_packet_count(int): Default number of packets to return (used to avoid test hanging in case storm).
Returns:
dict: Dictionary where key = interface name, value = list of sniffed packets.
"""
def _stop_capture_on_port(iface):
chassis, card, port = iface.split("/")
assert self.tcl("ixStopPortCapture %s %s %s" % (chassis, card, port)) == "0"
self.class_logger.debug("Stopped capture on iface: %s" % iface)
def _get_captured_frames_count(iface):
chassis, card, port = iface.split("/")
tcl_cmd = []
message = "Cannot process tcl command: stat get statAllStats %s %s %s" % (chassis, card, port)
tcl_cmd.append("if {[stat get statAllStats %s %s %s]} {errorMsg \"%s\"; return \"ERROR\"} else" %
(chassis, card, port, message))
if self.sniff_ids[iface]['layer'] is not None:
tcl_cmd.append("{stat cget -captureFilter}")
else:
tcl_cmd.append("{stat cget -framesReceived}")
result = self.tcl(" ".join(tcl_cmd))
if result == "ERROR":
self.class_logger.error(message)
raise IxiaException(message)
else:
num_frames = int(result)
return num_frames
self.class_logger.debug("Stopping capturing on ifaces: %s ..." % (ifaces, ))
capture_state = dict((self._get_port_to_string(iface), True) for iface in ifaces)
while True in list(capture_state.values()):
# This flag will True when we have timed out sniffer without packets count. And we have to make timeout between check.
# In case sniffer with packets count we check numFrames and this procedure makes timeout.
timeout_flag = True
if force:
for iface in list(capture_state.keys()):
if capture_state[iface]:
_stop_capture_on_port(iface)
capture_state[iface] = False
for iface in list(capture_state.keys()):
if capture_state[iface]:
# Stop ifaces that do not have time or count parameters
if not self.sniff_ids[iface]['time'] and not self.sniff_ids[iface]['count']:
_stop_capture_on_port(iface)
capture_state[iface] = False
# Stop ifaces if time is elapsed
if self.sniff_ids[iface]['time'] and (time.time() - self.sniff_ids[iface]['start_time']) >= self.sniff_ids[iface]['time']:
_stop_capture_on_port(iface)
capture_state[iface] = False
for iface in list(capture_state.keys()):
if capture_state[iface]:
# Stop if frames count reached
if self.sniff_ids[iface]['count'] != 0:
num_frames = _get_captured_frames_count(iface)
if num_frames > self.sniff_ids[iface]['count']:
_stop_capture_on_port(iface)
capture_state[iface] = False
timeout_flag = False
if timeout_flag:
time.sleep(0.15)
# Loop prevention
for iface in list(capture_state.keys()):
if capture_state[iface]:
if not self.sniff_ids[iface]['time'] and (time.time() - self.sniff_ids[iface]['start_time']) >= self.DEFAULT_MAX_SNIFF_TIME:
_stop_capture_on_port(iface)
capture_state[iface] = False
packet_dict = {}
for iface in ifaces:
_iface = self._get_port_to_string(iface)
# After sniff finished we shouldn't use this method. Because statistics update is continuing
# captured_packets_count = _get_captured_frames_count(iface)
# Instead we check captureBuffer. (which stops capturing and we couldn't use it before)
chassis, card, port = iface
captured_packets_count = int(self.tcl("captureBuffer get %s %s %s; \
captureBuffer cget -numFrames;" %
(chassis, card, port)))
self.class_logger.debug("Collected %s packets on %s interface (real count)" % (captured_packets_count, iface))
if self.sniff_ids[_iface]['count'] != 0:
if captured_packets_count < self.sniff_ids[_iface]['count']:
packet_count = captured_packets_count
else:
packet_count = self.sniff_ids[_iface]['count']
else:
packet_count = captured_packets_count
self.class_logger.debug("Collected %s packets on %s interface (count for processing)" % (packet_count, iface))
# TODO: Develop user friendly format of packet list
# packet_list = pypacker.PacketList()
packet_list = []
if sniff_packet_count != 0 and sniff_packet_count < packet_count:
packet_count = sniff_packet_count
if packet_count and not drop_packets:
# chassis, card, port = self._get_port_info(iface)
assert self.tcl("capture get %(chassis)s %(card)s %(port)s; \
captureBuffer get %(chassis)s %(card)s %(port)s 1 %(packet_count)s" % locals()) == "0"
# self.tcl("captureBuffer get %s %s %s 1 %s" % (chassis, card, port, packet_count))
for packet_num in range(1, packet_count + 1):
# self.class_logger.debug("Processing packet %s... " % (packet_num, ))
assert self.tcl("captureBuffer getframe %s" % (packet_num, )) == "0"
raw_packet = self.tcl("captureBuffer cget -frame").replace(" ", "")
timestamp = float(self.tcl("captureBuffer cget -timestamp").replace(" ", ""))
# self.class_logger.debug(raw_packet)
# Add workaround for malformed packets (packet length less than 14)
# try:
pkt = pypacker.layer12.ethernet.Ethernet(codecs.decode(raw_packet, "hex_codec"))
# except StructError:
# pkt = pypacker.Padding(codecs.decode(raw_packet, "hex_codec"))
pkt.time = self.sniff_ids[_iface]['start_time'] + timestamp / 1000000000
packet_list.append(pkt)
packet_dict[iface] = packet_list
else:
packet_dict[iface] = []
self.sniff_ids.pop(_iface)
return packet_dict
[docs] def get_received_frames_count(self, iface):
"""Read statistics - number of received valid frames.
Args:
iface(str): Interface name.
Returns:
int: Number of received frames.
"""
chassis, card, port = iface
tcl_cmd = []
tcl_cmd.append("stat get framesReceived %s %s %s;" % (chassis, card, port))
tcl_cmd.append("stat cget -framesReceived")
result = self.tcl(" ".join(tcl_cmd))
return int(result)
[docs] def get_filtered_frames_count(self, iface):
"""Read statistics - number of received frames which fit filter criteria.
Args:
iface(str): Interface name.
Returns:
int: Number of filtered frames.
"""
chassis, card, port = iface
tcl_cmd = []
tcl_cmd.append("stat get captureFilter %s %s %s;" % (chassis, card, port))
tcl_cmd.append("stat cget -captureFilter")
result = self.tcl(" ".join(tcl_cmd))
return int(result)
[docs] def get_uds_3_frames_count(self, iface):
"""Read statistics - number of non-filtered received frames (valid and invalid).
Args:
iface(str): Interface name.
Returns:
int: Number of received frames
"""
chassis, card, port = iface
tcl_cmd = []
tcl_cmd.append("stat get captureTrigger %s %s %s;" % (chassis, card, port))
tcl_cmd.append("stat cget -captureTrigger")
result = self.tcl(" ".join(tcl_cmd))
return int(result)
[docs] def get_sent_frames_count(self, iface):
"""Read statistics - number of sent frames.
Args:
iface(str): Interface name.
Returns:
int: Number of sent frames.
"""
chassis, card, port = iface
tcl_cmd = []
tcl_cmd.append("stat get framesSent %s %s %s;" % (chassis, card, port))
tcl_cmd.append("stat cget -framesSent")
result = self.tcl(" ".join(tcl_cmd))
return int(result)
[docs] def clear_statistics(self, sniff_port_list):
"""Clear statistics - number of frames.
Args:
sniff_port_list(list): List of interface names.
Returns:
None
"""
tcl_port_list = str(sniff_port_list).replace("(", "{").replace(")", "}").replace("[", "{").replace("]", "}").replace("'", "").replace(",", "")
assert self.tcl("set rxPortIdList %s;\
set retCode $::TCL_OK;\
if {[ixClearStats rxPortIdList]} {errorMsg \"ixClearStats return error.\"; set retCode $::TCL_ERROR};\
return $retCode" %
(tcl_port_list, )) == "0"
[docs] def set_flow_control(self, iface, mode):
"""Enable/Disable flow control on the port.
Args:
iface(str): Interface name.
mode(bool): True/False.
Returns:
None
"""
chassis, card, port = iface
self.tcl("port config -flowControl $::%s" % (str(mode).lower(), ))
assert self.tcl("port set %s %s %s" % (chassis, card, port)) == "0"
assert self.tcl("port write %s %s %s" % (chassis, card, port)) == "0"
[docs] def set_qos_stat_type(self, iface, ptype):
"""Set the QoS counters to look for priority bits for given packets type.
Args:
iface(str): Interface name.
ptype(str): Priority type: VLAN/IP.
Returns:
None
"""
if ptype == "VLAN":
_ptype = "vlan"
elif ptype == "IP":
_ptype = "ipEthernetII"
else:
raise IxiaException("Invalid packet type for QoS setup: %s" % (ptype, ))
chassis, card, port = iface
self.tcl("stat config -mode statQos")
assert self.tcl("stat set %s %s %s" % (chassis, card, port)) == "0"
assert self.tcl("stat write %s %s %s" % (chassis, card, port)) == "0"
self.tcl("qos setup %s" % (_ptype, ))
if ptype == "IP":
self.tcl("qos config -packetType ipEthernetII;" +
"qos config -byteOffset 15;" +
"qos config -patternMatch \"08 00\"")
assert self.tcl("qos set %s %s %s" % (chassis, card, port)) == "0"
assert self.tcl("qos write %s %s %s" % (chassis, card, port)) == "0"
[docs] def get_qos_frames_count(self, iface, prio):
"""Get captured QoS frames count.
Args:
iface(str): Interface name.
prio(int): Priority.
Returns:
int: captured QoS frames count.
"""
chassis, card, port = iface
tcl_cmd = []
tcl_cmd.append("stat get allStats %s %s %s;" %
(chassis, card, port))
tcl_cmd.append("stat cget -qualityOfService%d" % (prio, ))
result = self.tcl(" ".join(tcl_cmd))
return int(result)
[docs] def get_port_txrate(self, iface):
"""Return port transmission rate.
Args:
iface(str): Interface name.
Returns:
int: Frames per second.
"""
chassis, card, port = iface
tcl_cmd = []
tcl_cmd.append("stat getRate statAllStats %s %s %s;" %
(chassis, card, port))
tcl_cmd.append("stat cget -framesSent")
result = self.tcl(" ".join(tcl_cmd))
return int(result)
[docs] def get_port_rxrate(self, iface):
"""Return port receiving rate.
Args:
iface(str): Interface name.
Returns:
int: Frames per second.
"""
chassis, card, port = iface
tcl_cmd = []
tcl_cmd.append("stat getRate statAllStats %s %s %s;" %
(chassis, card, port))
tcl_cmd.append("stat cget -framesReceived")
result = self.tcl(" ".join(tcl_cmd))
return int(result)
[docs] def get_port_qos_rxrate(self, iface, qos):
"""Return port receiving rate for specific qos.
Args:
iface(str): Interface name.
qos(int): Qos value.
Returns:
int: Frames per second (int)
"""
chassis, card, port = iface
tcl_cmd = []
tcl_cmd.append("stat getRate statAllStats %s %s %s;" %
(chassis, card, port))
tcl_cmd.append("stat cget -qualityOfService%s" % qos)
result = self.tcl(" ".join(tcl_cmd))
return int(result)