# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""``IxLoadTclAPI.py``
`IxLoad Tcl API wrapper module`
"""
import json
import os
import time
# from ..custom_exceptions import IxiaException
from . import ixia_helpers
from ..loggers import ClassLogger
from ..read_csv import ReadCsv
# To log tcl commands without execution set to True.
SIMULATE = False
[docs]class IxLoadTclAPI(object):
"""IxLoad Tcl API base wrapper class.
"""
class_logger = ClassLogger()
[docs] def __init__(self, ipaddr, user):
"""Initializes connection to IxLoad.
Args:
ipaddr(str): IxLoad host IP address.
user(str): IxLoad windows user.
"""
import tkinter
self.tcl_interpret = tkinter.Tcl()
def tcl_puts(*args):
if len(args) >= 2:
stream = args[0]
if stream == "stdout":
self.class_logger.debug(" ".join(args[1:]))
elif stream == "stderr":
self.class_logger.error(" ".join(args[1:]))
else:
self.class_logger.debug("stream <%s>: %s" % (args[0], " ".join(args[1:])))
elif len(args) == 1:
self.class_logger.debug(args[0])
else:
self.class_logger.error("Called puts without arguments.")
return None
self.tcl_interpret.createcommand("tcl_puts", tcl_puts)
self.class_logger.debug("Insert tcl script to catch puts output.")
ixia_helpers.tcl_puts_replace(self.tcl_interpret)
self.ixload_ip = ipaddr
self.ixload_user = user
# Temp path for sending rxf files to IxLoad
self.ixload_tmppath = "C:\\Users\\{0}\\AppData\\Local\\Temp\\".format(self.ixload_user)
# Path for storing test csv reports. It could be override in child IxLoadHL class.
self.ixload_respath = self.ixload_tmppath
self.__log_name = "IxLoadTclAPI-{0}-{1}-{2}".format(os.uname()[1], os.getuid(), int(time.time()))
self.ixload_logpath = None
ixia_helpers.ixtclhal_import(self.tcl_interpret)
ixia_helpers.ixload_import(self.tcl_interpret)
self.test_controller = None
self.tst = None
[docs] def tcl(self, cmd):
"""Tcl wrapper.
"""
self.class_logger.debug("Run tcl command: %s", cmd)
if not SIMULATE:
return self.tcl_interpret.eval(cmd)
else:
return ""
[docs] def connect(self):
"""Logs in to IXIA and takes ports ownership.
Returns:
None
"""
# Set simple config
# self.tcl("namespace eval ::IxLoadPrivate {};" +
# "namespace eval ::IxLoadPrivate::SimpleSettings {};" +
# "variable ::IxLoadPrivate::SimpleSettings::remoteServer {0};".format(self.ixload_ip) +
# "::IxLoad connect $::IxLoadPrivate::SimpleSettings::remoteServer")
self.tcl("::IxLoad connect {0}".format(self.ixload_ip))
# Set up logger
self.ixload_logpath = (self.ixload_respath + "\\" + self.__log_name).replace("\\", "\\\\")
self.logger_setup()
# Define test controller.
self.test_controller = IxLoadTestController(self.tcl, self.tcl_interpret, self.ixload_respath)
# self.tcl("set testController [::IxLoad new ixTestController -outputDir 1];")
self.tcl("global ixAppPluginManager")
self.class_logger.info("IxLoad startup complete.")
[docs] def disconnect(self):
"""Logs out from IXIA and clears ports ownership.
Returns:
None
"""
self.tcl("::IxLoad disconnect")
[docs] def logger_setup(self):
self.class_logger.info("Setting up IxLoad logger...")
self.tcl("set logtag \"IxLoad-api\";" +
"set logName \"{0}\";".format(self.__log_name) +
"set logger [::IxLoad new ixLogger $logtag 1];" +
"set logEngine [$logger getEngine];" +
"$logEngine setLevels $::ixLogger(kLevelDebug) $::ixLogger(kLevelInfo);" +
"$logEngine setFile {0} 2 1024 1".format(self.ixload_logpath))
# "$logEngine setFile $logName 2 256 1")
[docs] def logger_delete(self):
self.tcl("::IxLoad delete $logger;" +
"::IxLoad delete $logEngine")
[docs] def load_repo(self, repo=None):
"""Loading rxf repo file or create new one.
"""
if repo is None:
self.tcl("set repository [::IxLoad new ixRepository]")
else:
self.class_logger.info("Loading repo: {0}".format(repo))
_repo_name = os.path.basename(repo)
ixload_repo = self.ixload_tmppath + _repo_name
self.copy_local_file(repo, ixload_repo)
self.tcl("set repository [::IxLoad new ixRepository -name \"{0}\"]".format(ixload_repo).replace("\\", "\\\\"))
self.repo_file = repo # pylint: disable=attribute-defined-outside-init
# self.tst = IxLoadTests(self.tcl)
self.tst = IxLoadTests(self.tcl, self.test_controller,
"{0}{1}".format(self.ixload_respath, self.__log_name))
self.class_logger.debug("Discovered tests list: {0}".format(self.tst.tc_list))
[docs] def copy_local_file(self, local_path, remote_path):
"""Copy local file to IxLoad host.
"""
self.tcl("::IxLoad sendFileCopy \"{0}\" \"{1}\"".format(local_path, remote_path).replace("\\", "\\\\"))
[docs] def copy_remote_file(self, remote_path, local_path):
"""Copy remote file from IxLoad host to local host.
"""
self.tcl("::IxLoad retrieveFileCopy \"{0}\" \"{1}\"".format(remote_path, local_path).replace("\\", "\\\\"))
[docs] def retrieve_results(self, dst_path):
"""Retrieve result csv files from IxLoad host to local dst_path.
"""
self.tcl("::IxLoad retrieveResults \"{0}\"".format(dst_path).replace("\\", "\\\\"))
[docs] def load_plugin(self, plugin):
self.tcl("$ixAppPluginManager load \"{0}\"".format(plugin))
[docs] def update_stats(self, stype="file", stat_name=None):
def s2i_safe(val):
try:
return int(val.replace("kInt ", "").replace("timestamp ", ""))
except Exception:
try:
return float(val.replace("kInt ", "").replace("timestamp ", ""))
except Exception:
return val
if stype == "file":
if self.test_controller.test_result_path is None:
raise Exception("Any test is started or csv result path isn't set.")
tmp_path = os.path.join("/tmp", "taf_ixload_file_stats.{0}".format(os.getpid()))
if stat_name:
stat_names = [stat_name, ]
else:
stat_names = [sn[0] for sn in self.test_controller.stats_list]
for stat_name in stat_names:
self.copy_remote_file(self.test_controller.test_result_path + "\\" + stat_name.replace(" ", "_") + ".csv", tmp_path)
csv = ReadCsv(tmp_path)
# 15 is minimal acceptable length of csv report. Take bigger number for assurance.
if len(csv.content) < 18:
self.class_logger.warning("IxLoad {0} csv file is empty yet.".format(stat_name))
return False
# Remove unnecessary lines from IxLoad csv
# -1 because last line could be not full filled.
for i in [-1, 12, 11, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]:
csv.content.pop(i)
self.test_controller.file_stats[stat_name] = IxLoadStats()
self.test_controller.file_stats[stat_name].extend([list(map(s2i_safe, x)) for x in csv.content[1:]])
self.test_controller.file_stats[stat_name].add_header(csv.content[0])
elif stype == "runtime":
for stat_item in self.test_controller.stats_list:
stat_name = stat_item[0]
_h = ["Elapsed Time", ]
_h.extend(stat_item[1])
self.test_controller.runtime_stats[stat_name] = IxLoadStats()
self.test_controller.runtime_stats[stat_name].add_header(_h)
for stat in self.test_controller.stats[:]:
time_stamp = s2i_safe(stat[1])
# Convert time_stamp to seconds
time_stamp = time_stamp / 1000 if isinstance(time_stamp, int) else time_stamp
stat_values = list(map(s2i_safe, stat[2]))
last_stat_item = 0
for stat_item in self.test_controller.stats_list:
stat_name = stat_item[0]
stat_num = len(stat_item[1])
_l = stat_values[last_stat_item:stat_num + last_stat_item]
_l.insert(0, time_stamp)
last_stat_item += stat_num
self.test_controller.runtime_stats[stat_name].append(_l)
else:
raise Exception("Unknown stats type: {0}".format(stype))
return True
[docs] def get_stats(self, stype="file"):
if stype == "file":
return self.test_controller.file_stats
elif stype == "runtime":
return self.test_controller.runtime_stats
else:
raise Exception("Incorrect stats type: {0}".format(stype))
[docs]class IxLoadTests(object):
"""Class for managing IxLoad Tests.
"""
def __init__(self, tcl, test_controller, res_path=""):
self.tcl = tcl
self.tc_list = []
self.load_tclist()
self.res_path = res_path
self.test_controller = test_controller
[docs] def load_tclist(self):
"""Loading list of IxLoad Tests.
"""
_tlist = []
num_tests = self.tcl("$repository testList.indexCount")
for _i in range(int(num_tests)):
tc_name = self.tcl("$repository testList({0}).cget -name".format(_i))
_tlist.append(tc_name)
# Test list is read. Cleanup previous one and store new list.
self.tc_list = _tlist
[docs] def start(self, t_name):
"""Start ixLoad test without waiting for result.
"""
self.tcl("puts {0}".format(t_name))
[docs] def run(self, t_name, res_path=None):
"""Run ixLoad test until completion.
"""
# Set result dir.
res_path = res_path if res_path is not None else self.res_path
res_path = "{0}\\{1}".format(res_path, t_name)
self.tcl("${0} setResultDir \"{1}\"".format(self.test_controller.name, res_path).replace("\\", "\\\\"))
# Execute the test.
self.tcl("set test [$repository testList.getItem {}];".format(t_name) +
"${0} run $test".format(self.test_controller.name))
self.tcl("vwait ::ixTestControllerMonitor; puts $::ixTestControllerMonitor")
return res_path
[docs] def cleanup(self):
self.tcl("${0} releaseConfigWaitFinish;".format(self.test_controller.name) +
"if {[lsearch [info vars] test] >= 0} {$test clearDUTList; ::IxLoad delete $test}")
[docs] def report(self, pdf=False):
"""Enable/Disable report options.
"""
self.tcl("ixNet setAttribute [ixNet getRoot]/testConfiguration -enableGenerateReportAfterRun {0}".format(pdf))
[docs]class IxLoadStats(list):
"""Custom list class to support columns header names.
"""
headers = None
def _get(self, row_num, col_names):
if col_names is None:
# Return whole row
return self[row_num]
elif isinstance(col_names, str):
return self[row_num][self.headers[col_names]]
elif isinstance(col_names, list):
indexes = [self.headers[x] for x in col_names]
return [self[row_num][x] for x in indexes]
else:
raise TypeError("Incorrect col_names type: {0}. str/list of str/None are allowed.".format(type(col_names)))
[docs] def get(self, row_num=-1, col_names=None):
if isinstance(row_num, int):
return self._get(row_num, col_names)
elif isinstance(row_num, list):
return [self._get(x, col_names) for x in row_num]
else:
raise TypeError("Incorrect row_num type: {0}. int and list are allowed.".format(type(row_num)))
[docs]class IxLoadGenericObjectMixin(object):
IXLOAD_CLASS = None
TCL_VARS = ""
def __init__(self, tcl):
self.tcl = tcl
self.name = self.get_free_name()
if self.IXLOAD_CLASS is not None:
self.tcl("set {0} [::IxLoad new {1}]".format(self.name, self.IXLOAD_CLASS))
def __del__(self):
self.tcl("::IxLoad delete ${0}".format(self.name))
self.tcl("unset {0}".format(self.name))
[docs] def get_free_name(self):
def is_int(num):
try:
return int(num)
except ValueError:
return None
if not SIMULATE:
_tcl_vars = self.tcl("info vars")
else:
self.tcl("")
_tcl_vars = IxLoadGenericObjectMixin.TCL_VARS
# Return unused number for class variable.
_id = max([is_int(_v.replace(self.NAME, "")) for _v in _tcl_vars.split(" ") if _v.startswith(self.NAME)] or [0, ]) or 0
if SIMULATE:
IxLoadGenericObjectMixin.TCL_VARS += " " + self.NAME + str(_id + 1)
return self.NAME + str(_id + 1)
def _gen_params(self, **kw):
def arg_repr(arg):
if isinstance(arg, IxLoadGenericObjectMixin):
return "${0}".format(arg.name)
elif isinstance(arg, str):
return '"{0}"'.format(arg)
else:
return arg
return " ".join(["-{0} {1}".format(k, arg_repr(kw[k])) for k in kw])
[docs] def config(self, **kwargs):
params = self._gen_params(**kwargs)
self.tcl("${0} config {1}".format(self.name, params))
[docs] def clear(self, target):
self.tcl("${0} {1}.clear".format(self.name, target))
[docs]class IxLoadTestController(IxLoadGenericObjectMixin):
NAME = "testController"
class_logger = ClassLogger()
def __init__(self, tcl, tcl_interpret, root_path="", res_path=None):
super(IxLoadTestController, self).__init__(tcl)
self.tcl("set {0} [::IxLoad new ixTestController -outputDir True]".format(self.name))
self.res_path = res_path or "TAF-{0}-{1}-{2}".format(os.uname()[1], os.getuid(), int(time.time()))
self.root_path = root_path
self.test_result_path = None
self.statcollector = None
self.testserverhandle = None
# Stats collector for runtime statistics collecting.
self.stats = []
self.runtime_stats = {}
self.file_stats = {}
self.stats_list = []
self.statmap = {}
def collect_stats(*args):
try:
# Try to pars stat args.
a1, a2 = args
time_stamp, stat_args = a2.split(" stats ")
time_stamp = time_stamp.replace("timestamp ", "")
stat_args = stat_args.lstrip("{{").rstrip("}}").split("} {")
self.stats.append((a1, time_stamp, stat_args))
except Exception as err:
# Append stat args as is.
self.class_logger.warning("Failed to parse stat args. Err: {0}".format(err))
self.stats.append(args)
tcl_interpret.createcommand("collect_stats", collect_stats)
self.set_statcollector()
self.get_testserverhandle()
self.statcollector.init_tsh(self.testserverhandle)
def __del__(self):
self.cleanup()
super(IxLoadTestController, self).__del__()
[docs] def cleanup(self):
tcs = self.status()
if tcs == "1":
self.stop(force=True)
else:
self.tcl("${0} releaseConfigWaitFinish".format(self.name))
[docs] def clear_stats(self):
self.runtime_stats = {}
self.file_stats = {}
# self.stats_list = []
self.statcollector.clear()
[docs] def set_resultdir(self, root_path=None, res_dir=None):
root_path = root_path or self.root_path
_result_path = "{0}{1}\\{2}".format(root_path, self.res_path, res_dir)
self.tcl("${0} setResultDir {1}".format(self.name, _result_path).replace("\\", "\\\\"))
return _result_path
[docs] def apply(self, test):
self.tcl("${0} applyConfig ${1}".format(self.name, test))
[docs] def run_test(self, test, test_name=None, stats=None):
self.test_result_path = self.set_resultdir(self.root_path, test_name or test.name)
statmap = {}
if stats:
self.stats_list = stats
_stat_list = ""
_stat_types_file = os.path.join(os.path.dirname(__file__), "ixload_stat_types")
_stat_types = json.loads(open(_stat_types_file).read().encode("ascii"), encoding="latin-1")
for stat in stats:
self.runtime_stats[stat[0]] = IxLoadStats()
_stat_list_name = stat[0].replace(" ", "_") + "_StatList"
_stat_items = ""
statmap[stat[0]] = {}
for stat_item in stat[1]:
_stat_items += " {\"%s\" \"%s\" \"%s\"}" % (stat[0], stat_item,
_stat_types[stat[0]][stat_item])
statmap[stat[0]][stat_item] = len(statmap[stat[0]])
self.tcl("set %s { %s }" % (_stat_list_name, _stat_items))
_stat_list += " $" + _stat_list_name
self.statmap = statmap
self.tcl("${0} clearGridStats".format(test.name))
self.tcl("set statList [concat {0}]".format(_stat_list))
self.tcl("set count 1; " +
"foreach stat $statList { " +
" set caption [format \"Watch_Stat_%s\" $count]; " +
" set statSourceType [lindex $stat 0]; " +
" set statName [lindex $stat 1]; " +
" set aggregationType [lindex $stat 2]; " +
" ${%s}::AddStat " % (self.statcollector.name, ) +
" -caption $caption " +
" -statSourceType $statSourceType " +
" -statName $statName " +
" -aggregationType $aggregationType " +
" -filterList {}; "
" incr count}")
self.statcollector.start()
self.tcl("set ::ixTestControllerMonitor \"\"; ${0} run ${1}".format(self.name, test.name))
return self.test_result_path
[docs] def stop(self, force=False):
if force:
self.tcl("${0} stopRun".format(self.name))
else:
self.tcl("${0} stopRunGraceful".format(self.name))
[docs] def status(self):
return self.tcl("${0} isBusy".format(self.name))
[docs] def wait_test(self):
self.tcl("vwait ::ixTestControllerMonitor; puts $::ixTestControllerMonitor")
if self.statcollector is not None:
self.statcollector.stop()
[docs] def check_testexecution(self):
rc = self.tcl("if { $::ixTestControllerMonitor == \"\" } {return \"RUN\"}")
if rc == "RUN":
return True
else:
return False
[docs] def get_testserverhandle(self):
self.testserverhandle = IxLoadTestServerHandle(self.tcl)
self.tcl("set {0} [${1} getTestServerHandle]".format(self.testserverhandle.name, self.name))
return self.testserverhandle
[docs] def set_statcollector(self):
self.statcollector = IxLoadstatCollectorUtils(self.tcl)
[docs]class IxLoadChassisChain(IxLoadGenericObjectMixin):
NAME = "chassisChain"
IXLOAD_CLASS = "ixChassisChain"
def __init__(self, tcl, ipaddr=None):
super(IxLoadChassisChain, self).__init__(tcl)
if ipaddr is None:
ipaddr = ["127.0.0.1", ]
self.__ipaddr = ipaddr
for _ipaddr in self.__ipaddr:
self.tcl("${0} addChassis {1}".format(self.name, _ipaddr))
[docs]class IxLoadixEventHandlerSettings(IxLoadGenericObjectMixin):
NAME = "ixEventHandlerSettings"
IXLOAD_CLASS = "ixEventHandlerSettings"
def __init__(self, tcl):
super(IxLoadixEventHandlerSettings, self).__init__(tcl)
self.tcl("${0} config".format(self.name))
[docs]class IxLoadixViewOptions(IxLoadGenericObjectMixin):
NAME = "ixViewOptions"
IXLOAD_CLASS = "ixViewOptions"
def __init__(self, tcl):
super(IxLoadixViewOptions, self).__init__(tcl)
self.tcl("${0} config".format(self.name))
[docs]class IxLoadixTest(IxLoadGenericObjectMixin):
NAME = "ixTest"
IXLOAD_CLASS = "ixTest"
def __init__(self, tcl):
super(IxLoadixTest, self).__init__(tcl)
self.clear("scenarioList")
self.scenarioelement = None
self.scenariofactory = None
self.scenario = None
self.eventhandlersettings = None
self.viewoptions = None
self.sessionspecificdata = {}
self.profiledir = None
def __del__(self):
self.tcl("${0} clearDUTList".format(self.name))
super(IxLoadixTest, self).__del__()
[docs] def get_scenarioelementfactory(self):
self.scenarioelement = IxLoadScenarioElementFactory(self.tcl)
self.tcl("set {0} [${1} getScenarioElementFactory]".format(self.scenarioelement.name, self.name))
return self.scenarioelement
[docs] def get_scenariofactory(self):
self.scenariofactory = IxLoadScenarioFactory(self.tcl)
self.tcl("set {0} [${1} getScenarioFactory]".format(self.scenariofactory.name, self.name))
return self.scenariofactory
[docs] def get_scenario(self):
if self.scenariofactory is None:
self.get_scenariofactory()
self.scenario = self.scenariofactory.create_scenario()
return self. scenario
[docs] def get_sessionspecificdata(self, _type):
ssd = IxLoadixTestSessionSpecificData(self.tcl)
self.tcl("set {0} [${1} getSessionSpecificData \"{2}\"]".format(ssd.name, self.name, _type))
self.sessionspecificdata[ssd.name] = ssd
return ssd
[docs] def get_profiledir(self):
self.profiledir = IxLoadixTestProfileDirectory(self.tcl)
self.tcl("set {0} [${1} cget -profileDirectory]".format(self.profiledir.name, self.name))
return self.profiledir
[docs]class IxLoadScenarioElementFactory(IxLoadGenericObjectMixin):
NAME = "scenarioElementFactory"
[docs] def create_nettraffic(self):
nettraffic = IxLoadSETkNetTraffic(self.tcl)
self.tcl("set {0} [${1} create $::ixScenarioElementType(kNetTraffic)]".format(nettraffic.name, self.name))
return nettraffic
[docs] def create_dut(self):
dut = IxLoadSETkDutBasic(self.tcl)
self.tcl("set {0} [${1} create $::ixScenarioElementType(kDutBasic)]".format(dut.name, self.name))
return dut
[docs]class IxLoadScenarioFactory(IxLoadGenericObjectMixin):
NAME = "scenarioFactory"
# IXLOAD_CLASS = "getScenarioFactory"
[docs] def create_scenario(self):
scenario = IxLoadScenario(self.tcl)
self.tcl("set {0} [${1} create \"Scenario\"]".format(scenario.name, self.name))
scenario.clear("columnList")
scenario.clear("links")
return scenario
[docs]class IxLoadScenario(IxLoadGenericObjectMixin):
NAME = "Scenario"
def __init__(self, tcl):
super(IxLoadScenario, self).__init__(tcl)
self.columnlist = []
[docs] def append_columnlist(self, column):
self.tcl("${0} columnList.appendItem -object ${1}".format(self.name, column.name))
self.columnlist.append(column)
[docs] def new_traffic_column(self):
return IxLoadixTrafficColumn(self.tcl)
[docs]class IxLoadixTrafficColumn(IxLoadGenericObjectMixin):
NAME = "ixTrafficColumn"
IXLOAD_CLASS = "ixTrafficColumn"
def __init__(self, tcl):
super(IxLoadixTrafficColumn, self).__init__(tcl)
self.clear("elementList")
self.elementlist = []
[docs] def append_elementlist(self, element):
self.tcl("${0} elementList.appendItem -object ${1}".format(self.name, element.name))
self.elementlist.append(element)
[docs]class IxLoadixNetIxLoadSettingsPlugin(IxLoadGenericObjectMixin):
NAME = "Settings"
IXLOAD_CLASS = "ixNetIxLoadSettingsPlugin"
[docs]class IxLoadixNetFilterPlugin(IxLoadGenericObjectMixin):
NAME = "Filter"
IXLOAD_CLASS = "ixNetFilterPlugin"
[docs]class IxLoadixNetGratArpPlugin(IxLoadGenericObjectMixin):
NAME = "GratARP"
IXLOAD_CLASS = "ixNetGratArpPlugin"
[docs]class IxLoadixNetTCPPlugin(IxLoadGenericObjectMixin):
NAME = "TCP"
IXLOAD_CLASS = "ixNetTCPPlugin"
[docs]class IxLoadixNetDnsPlugin(IxLoadGenericObjectMixin):
NAME = "DNS"
IXLOAD_CLASS = "ixNetDnsPlugin"
def __init__(self, tcl):
super(IxLoadixNetDnsPlugin, self).__init__(tcl)
self.clear("hostList")
self.clear("searchList")
self.clear("nameServerList")
ixNetIxLoadPlugins = {"Settings": IxLoadixNetIxLoadSettingsPlugin,
"Filter": IxLoadixNetFilterPlugin,
"GratARP": IxLoadixNetGratArpPlugin,
"TCP": IxLoadixNetTCPPlugin,
"DNS": IxLoadixNetDnsPlugin, }
[docs]class IxLoadixNetEthernetELMPlugin(IxLoadGenericObjectMixin):
NAME = "ixNetEthernetELMPlugin"
IXLOAD_CLASS = "ixNetEthernetELMPlugin"
[docs]class IxLoadixNetDualPhyPlugin(IxLoadGenericObjectMixin):
NAME = "ixNetDualPhyPlugin"
IXLOAD_CLASS = "ixNetDualPhyPlugin"
[docs]class IxLoadIPRMacRange(IxLoadGenericObjectMixin):
NAME = "MAC_R"
[docs]class IxLoadIPRVlanIdRange(IxLoadGenericObjectMixin):
NAME = "VLAN_R"
[docs]class IxLoadixNetIpV4V6Range(IxLoadGenericObjectMixin):
NAME = "IP_R"
IXLOAD_CLASS = "ixNetIpV4V6Range"
def __init__(self, tcl):
super(IxLoadixNetIpV4V6Range, self).__init__(tcl)
self.macrange = None
self.vlanidrange = None
[docs] def get_macrange(self):
self.macrange = IxLoadIPRMacRange(self.tcl)
self.tcl("set {0} [${1} getLowerRelatedRange \"MacRange\"]".format(self.macrange.name, self.name))
return self.macrange
[docs] def get_vlanidrange(self):
self.vlanidrange = IxLoadIPRVlanIdRange(self.tcl)
self.tcl("set {0} [${1} getLowerRelatedRange \"VlanIdRange\"]".format(self.vlanidrange.name, self.name))
return self.vlanidrange
[docs]class IxLoadixNetRangeGroup(IxLoadGenericObjectMixin):
NAME = "DistGroup"
IXLOAD_CLASS = "ixNetRangeGroup"
def __init__(self, tcl):
super(IxLoadixNetRangeGroup, self).__init__(tcl)
self.clear("rangeList")
self.ranges = []
[docs] def append_range(self, iprange):
self.tcl("${0} rangeList.appendItem -object ${1}".format(self.name, iprange.name))
self.ranges.append(iprange)
[docs]class IxLoadixNetIpV4V6Plugin(IxLoadGenericObjectMixin):
NAME = "IP"
IXLOAD_CLASS = "ixNetIpV4V6Plugin"
def __init__(self, tcl):
super(IxLoadixNetIpV4V6Plugin, self).__init__(tcl)
self.clear("childrenList")
self.clear("extensionList")
self.clear("rangeList")
self.clear("rangeGroups")
self.ranges = []
self.distgroup = None
[docs] def new_range(self):
_range = IxLoadixNetIpV4V6Range(self.tcl)
self.tcl("${0} rangeList.appendItem -object ${1}".format(self.name, _range.name))
self.ranges.append(_range)
return _range
[docs] def new_distgroup(self):
self.distgroup = IxLoadixNetRangeGroup(self.tcl)
self.tcl("${0} rangeGroups.appendItem -object ${1}".format(self.name, self.distgroup.name))
return self.distgroup
[docs] def append_iprange(self, ip_range):
if self.distgroup is None:
self.new_distgroup()
self.tcl("${0} rangeList.appendItem -object ${1}".format(self.distgroup.name, ip_range.name))
self.distgroup.config(distribType=0, _Stale=False, name=self.distgroup.name)
[docs]class IxLoadixNetL2EthernetPlugin(IxLoadGenericObjectMixin):
NAME = "MAC_VLAN"
IXLOAD_CLASS = "ixNetL2EthernetPlugin"
def __init__(self, tcl):
super(IxLoadixNetL2EthernetPlugin, self).__init__(tcl)
self.clear("childrenList")
self.clear("extensionList")
self.ipplugin = None
[docs] def new_ipplugin(self):
self.ipplugin = IxLoadixNetIpV4V6Plugin(self.tcl)
self.tcl("${0} childrenList.appendItem -object ${1}".format(self.name, self.ipplugin.name))
self.ipplugin.config(_Stale=False)
return self.ipplugin
[docs]class IxLoadNetworkL1Plugin(IxLoadGenericObjectMixin):
NAME = "Ethernet"
def __init__(self, tcl):
super(IxLoadNetworkL1Plugin, self).__init__(tcl)
self.elm = None
self.phy = None
self.l2plugin = []
[docs] def init(self):
self.clear("childrenList")
self.clear("extensionList")
[docs] def new_l2plugin(self):
l2plugin = IxLoadixNetL2EthernetPlugin(self.tcl)
self.tcl("${0} childrenList.appendItem -object ${1}".format(self.name, l2plugin.name))
l2plugin.config(_Stale=False)
self.l2plugin.append(l2plugin)
return l2plugin
[docs]class IxLoadixLinearTimeSegment(IxLoadGenericObjectMixin):
NAME = "Linear_Segment"
IXLOAD_CLASS = "ixLinearTimeSegment"
[docs]class IxLoadixAdvancedIteration(IxLoadGenericObjectMixin):
NAME = "ixAdvancedIteration"
IXLOAD_CLASS = "ixAdvancedIteration"
def __init__(self, tcl):
super(IxLoadixAdvancedIteration, self).__init__(tcl)
self.clear("segmentList")
self.segmentlist = []
[docs] def append_segmentlist(self, segment):
self.tcl("${0} segmentList.appendItem -object ${1}".format(self.name, segment.name))
self.segmentlist.append(segment)
[docs] def add_segment(self, **kwargs):
segment = IxLoadixLinearTimeSegment(self.tcl)
segment.config(**kwargs)
self.append_segmentlist(segment)
[docs]class IxLoadixTimeline(IxLoadGenericObjectMixin):
NAME = "Timeline"
IXLOAD_CLASS = "ixTimeline"
def __init__(self, tcl):
super(IxLoadixTimeline, self).__init__(tcl)
self.iteration = None
[docs] def new_iteration(self):
self.iteration = IxLoadixAdvancedIteration(self.tcl)
return self.iteration
[docs]class IxLoadixMatchLongestTimeline(IxLoadGenericObjectMixin):
NAME = "Timeline_Match_Longest"
IXLOAD_CLASS = "ixMatchLongestTimeline"
[docs]class IxLoadixHttpCommand(IxLoadGenericObjectMixin):
NAME = "ixHttpCommand"
IXLOAD_CLASS = "ixHttpCommand"
[docs]class IxLoadPageObject(IxLoadGenericObjectMixin):
NAME = "PageObject"
IXLOAD_CLASS = "PageObject"
response = None
[docs]class IxLoadCookieObject(IxLoadGenericObjectMixin):
NAME = "CookieObject"
IXLOAD_CLASS = "CookieObject"
def __init__(self, tcl):
super(IxLoadCookieObject, self).__init__(tcl)
self.clear("cookieContentList")
self.cookiecontentlist = []
[docs] def append_cookiecontent(self, cookiecontent):
self.tcl("${0} cookieContentList.appendItem -object ${1}".format(self.name, cookiecontent.name))
self.cookiecontentlist.append(cookiecontent)
[docs]class IxLoadixCookieContent(IxLoadGenericObjectMixin):
NAME = "ixCookieContent"
IXLOAD_CLASS = "ixCookieContent"
[docs]class IxLoadCustomPayloadObject(IxLoadGenericObjectMixin):
NAME = "CustomPayloadObject"
IXLOAD_CLASS = "CustomPayloadObject"
[docs]class IxLoadHTTPClient(IxLoadGenericObjectMixin):
NAME = "HTTPClient"
def __init__(self, tcl):
super(IxLoadHTTPClient, self).__init__(tcl)
[docs] def init(self):
self.clear("agent.actionList")
self.clear("agent.cmdPercentagePool.percentageCommandList")
self.agent_actionlist = [] # pylint: disable=attribute-defined-outside-init
self.agent_headerlist = [] # pylint: disable=attribute-defined-outside-init
self.timeline = None # pylint: disable=attribute-defined-outside-init
[docs] def append_agent_actionlist(self, agent_action):
self.tcl("${0} agent.actionList.appendItem -object ${1}".format(self.name, agent_action.name))
self.agent_actionlist.append(agent_action)
[docs] def new_timeline(self):
self.timeline = IxLoadixTimeline(self.tcl) # pylint: disable=attribute-defined-outside-init
return self.timeline
[docs] def config_percentagecmdlist(self, **kwargs):
params = self._gen_params(**kwargs) if kwargs else ""
self.tcl("${0} agent.cmdPercentagePool.percentageCommandList.clear".format(self.name))
self.tcl("${0} agent.cmdPercentagePool.config {1}".format(self.name, params))
[docs] def config_agent(self, **kwargs):
params = self._gen_params(**kwargs) if kwargs else ""
self.tcl("${0} agent.config {1}".format(self.name, params))
[docs] def modify_objectivevalue(self, value):
return self.tcl("$%s config -objectiveValue %s; " % (self.name, value) +
"set canSetObjectiveValue [$%s canSetObjectiveValue]; " % (self.name, ) +
"puts \"Can set objective value? - $canSetObjectiveValue\"; " +
"if { $canSetObjectiveValue } { $%s applyObjectiveValues } " % (self.name, ) +
"{puts \"Failed to set objectiveValue for %s.\"}; " % (self.name, ) +
"return $canSetObjectiveValue")
[docs] def config_timeline(self, **kwargs):
self.new_timeline()
segments = kwargs.pop("segments") if "segments" in kwargs else []
if segments:
iteration = self.timeline.new_iteration()
for segment in segments:
iteration.add_segment(**segment)
kwargs.update({"advancedIteration": iteration})
self.timeline.config(**kwargs)
return self.timeline
[docs] def add_command(self, **kwargs):
httpcmd = IxLoadixHttpCommand(self.tcl)
httpcmd.config(**kwargs)
self.append_agent_actionlist(httpcmd)
[docs]class IxLoadHTTPServer(IxLoadGenericObjectMixin):
NAME = "HTTPServer"
def __init__(self, tcl):
super(IxLoadHTTPServer, self).__init__(tcl)
self.timeline = None
self.pagelist = []
self.cookielist = []
self.payloadlist = []
self.responseheaderlist = []
[docs] def init(self):
self.clear("agent.cookieList")
self.clear("agent.webPageList")
self.clear("agent.customPayloadList")
self.clear("agent.responseHeaderList")
[docs] def new_timeline(self):
self.timeline = IxLoadixMatchLongestTimeline(self.tcl)
return self.timeline
[docs] def append_pageobject(self, page):
self.tcl("${0} agent.webPageList.appendItem -object ${1}".format(self.name, page.name))
self.pagelist.append(page)
[docs] def new_response(self, code="200"):
response = IxLoadResponseHeader(self.tcl)
_t1 = time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime())
_t2 = time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime(time.time() + 2592000))
if code == "200":
name = "200_OK"
elif code == "404":
name = "404_PageNotFound"
else:
name = "Custom"
response.config(code=code, name=name, description=name,
lastModifiedDateTimeValue=_t1, dateTimeValue=_t1,
expirationDateTimeValue=_t2)
return response
[docs] def add_pageobject(self, response="200", **kwargs):
page = IxLoadPageObject(self.tcl)
page.response = self.new_response(response)
page.config(response=page.response, **kwargs)
self.append_pageobject(page)
[docs] def append_cookielist(self, cookie):
self.tcl("${0} agent.cookieList.appendItem -object ${1}".format(self.name, cookie.name))
self.cookielist.append(cookie)
[docs] def add_cookie(self, cookiecontentlist):
_cookie = IxLoadCookieObject(self.tcl)
for cookie_params in cookiecontentlist:
_cookiecontent = IxLoadixCookieContent(self.tcl)
_cookiecontent.config(**cookie_params)
_cookie.append_cookiecontent(_cookiecontent)
self.append_cookielist(_cookie)
[docs] def append_payloadlist(self, payload):
self.tcl("${0} agent.customPayloadList.appendItem -object ${1}".format(self.name, payload.name))
self.payloadlist.append(payload)
[docs] def add_payload(self, **kwargs):
payload = IxLoadCustomPayloadObject(self.tcl)
payload.config(**kwargs)
self.append_payloadlist(payload)
[docs] def config_agent(self, **kwargs):
params = self._gen_params(**kwargs) if kwargs else ""
self.tcl("${0} agent.config {1}".format(self.name, params))
[docs] def config_timeline(self, **kwargs):
self.new_timeline()
self.config(name=self.name, timeline=self.timeline)
activityListItems = {"HTTP Client": IxLoadHTTPClient,
"HTTP Server": IxLoadHTTPServer, }
[docs]class IxLoadNetTrafficNetwork(IxLoadGenericObjectMixin):
NAME = "NetTrafficNetwork"
def __init__(self, tcl):
super(IxLoadNetTrafficNetwork, self).__init__(tcl)
self.plugins = {}
self.l1plugin = None
self.activities = {}
[docs] def append_portlist(self, port):
chass, card, pid = port.split("/")
self.tcl("${0} portList.appendItem -chassisId {1} -cardId {2} -portId {3}".
format(self.name, chass, card, pid))
[docs] def clear_plugins(self):
# self.clear("childrenList")
self.clear("globalPlugins")
[docs] def new_plugin(self, plugin):
_plugin = ixNetIxLoadPlugins[plugin](self.tcl)
self.tcl("${0} globalPlugins.appendItem -object ${1}".format(self.name, _plugin.name))
if _plugin.NAME not in self.plugins:
self.plugins[_plugin.NAME] = {}
self.plugins[_plugin.NAME][_plugin.name] = _plugin
return _plugin
[docs] def new_l1plugin(self):
self.l1plugin = IxLoadNetworkL1Plugin(self.tcl)
self.tcl("set {0} [${1} getL1Plugin]".format(self.l1plugin.name, self.name))
self.l1plugin.init()
return self.l1plugin
[docs]class IxLoadSETkNetTraffic(IxLoadGenericObjectMixin):
# NAME = "NetworkTraffic"
NAME = "Traffic_Network"
PORTOPERMODE = {"ThroughputAcceleration": "kOperationModeThroughputAcceleration"}
def __init__(self, tcl):
super(IxLoadSETkNetTraffic, self).__init__(tcl)
self.activities = {}
self.activitydst = None
[docs] def new_network(self):
network = IxLoadNetTrafficNetwork(self.tcl)
self.tcl("set {0} [${1} cget -network]".format(network.name, self.name))
network.clear_plugins()
return network
[docs] def set_portopermode(self, mode):
self.tcl("${0} setPortOperationModeAllowed $::ixPort(kOperationModeThroughputAcceleration) {1}".format(self.name, mode))
[docs] def set_tcpaccel(self, mode):
self.tcl("${0} setTcpAccelerationAllowed $::ixAgent(kTcpAcceleration) {1}".format(self.name, mode))
[docs] def config_traffic(self):
self.tcl("${0} traffic.config".format(self.name))
[docs] def new_activity(self, activity):
_activity = activityListItems[activity](self.tcl)
self.tcl("set {0} [${1} activityList.appendItem -protocolAndType \"{2}\"]".
format(_activity.name, self.name, activity))
_activity.init()
if activity not in self.activities:
self.activities[activity] = []
# self.activities[activity][_activity.name] = _activity
self.activities[activity].append(_activity)
return _activity
[docs] def get_activitydst(self, *args):
self.activitydst = IxLoadDestinationForActivity(self.tcl)
params = str(args)[1:-1].replace("'", '"').replace(",", "")
self.tcl("set {0} [${1} getDestinationForActivity {2}]".format(self.activitydst.name, self.name, params))
return self.activitydst
[docs] def set_activityendpoint(self, iprange, activity, activity_type, enable):
self.tcl("${0} setActivityEndPointAvailableForSmRange ${1} \"{2}\" \"{3}\" {4}".
format(self.name, iprange.name, activity.name, activity_type, enable))
[docs]class IxLoadSETkDutBasic(IxLoadGenericObjectMixin):
NAME = "DUT"
def __init__(self, tcl):
super(IxLoadSETkDutBasic, self).__init__(tcl)
self.cfg_packetswitch = None
self.cfg_vip = None
[docs] def new_cfg_packetswitch(self):
self.cfg_packetswitch = IxLoadixDutConfigPacketSwitch(self.tcl)
return self.cfg_packetswitch
[docs] def new_cfg_vip(self):
self.cfg_vip = IxLoadixDutConfigVip(self.tcl)
return self.cfg_vip
[docs]class IxLoadixDutConfigPacketSwitch(IxLoadGenericObjectMixin):
NAME = "ixDutConfigPacketSwitch"
IXLOAD_CLASS = "ixDutConfigPacketSwitch"
def __init__(self, tcl):
super(IxLoadixDutConfigPacketSwitch, self).__init__(tcl)
self.clear("originateProtocolPortRangeList")
self.clear("terminateProtocolPortRangeList")
self.clear("terminateNetworkRangeList")
self.clear("originateNetworkRangeList")
self.originatenetworkrangelist = []
self.terminatenetworkrangelist = []
[docs] def append_networkrangelist(self, dst, networkrange):
self.tcl("${0} {1}NetworkRangeList.appendItem -object ${2}".format(self.name, dst, networkrange.name))
getattr(self, "{0}networkrangelist".format(dst)).append(networkrange)
[docs] def add_networkrange(self, dst, **kwargs):
networkrange = IxLoadixDutNetworkRange(self.tcl)
name = "DUT NetworkRange{0} {1}+{2}".format(len(getattr(self, "{0}networkrangelist".format(dst))),
kwargs['firstIp'], kwargs['ipCount'])
networkrange.config(name=name, **kwargs)
self.append_networkrangelist(dst, networkrange)
[docs]class IxLoadixDutNetworkRange(IxLoadGenericObjectMixin):
NAME = "ixDutNetworkRange"
IXLOAD_CLASS = "ixDutNetworkRange"
[docs]class IxLoadDestinationForActivity(IxLoadGenericObjectMixin):
NAME = "DestinationForActivity"
[docs]class IxLoadixTestSessionSpecificData(IxLoadGenericObjectMixin):
NAME = "SessionSpecificData"
[docs]class IxLoadixTestProfileDirectory(IxLoadGenericObjectMixin):
NAME = "profileDirectory"
[docs]class IxLoadixDutConfigVip(IxLoadGenericObjectMixin):
NAME = "ixDutConfigVip"
IXLOAD_CLASS = "ixDutConfigVip"
[docs]class IxLoadstatCollectorUtils(IxLoadGenericObjectMixin):
NAME = "NS"
def __init__(self, tcl):
super(IxLoadstatCollectorUtils, self).__init__(tcl)
self.tcl("set {0} statCollectorUtils".format(self.name))
self.status = False
# self.tcl("proc ::my_stat_collector_command {args} { " +
# "puts \"=====================================\"; " +
# "puts \"INCOMING STAT RECORD >>> $args\"; " +
# "puts \"=====================================\";" +
# "}")
[docs] def init_tsh(self, tsh):
self.tcl("${%s}::Initialize -testServerHandle $%s;" % (self.name, tsh.name) +
"${%s}::ClearStats" % (self.name, ))
[docs] def start(self):
# self.tcl("${%s}::StartCollector -command ::my_stat_collector_command -interval 2" % (self.name, ))
# self.tcl("${%s}::StartCollector -command collect_stats -interval 2" % (self.name, ))
self.tcl("${%s}::StartCollector -command collect_stats -interval 4" % (self.name, ))
self.status = True
[docs] def stop(self):
if self.status:
self.tcl("${%s}::StopCollector" % (self.name, ))
self.status = False
[docs] def clear(self):
self.tcl("${%s}::ClearStats" % (self.name, ))
[docs]class IxLoadTestServerHandle(IxLoadGenericObjectMixin):
NAME = "TestServerHandle"