unittests

Submodules

unittests.common

common.py

Unittests common functionality

class unittests.common.FakeXMLRPCServer(port=9999)[source]

Bases: object

applications_exists(app_id, pid_id, app_name)[source]
applications_find(app_id, pid_id, app_name)[source]
applications_get_size()[source]
applications_gettable()[source]
applications_set_loglevel(app_id, loglevel)[source]
clear_config()[source]
lags_add_row(*row)[source]
lags_get_size()[source]
lags_get_table()[source]
method_help(method)[source]
platform_get_row(row)[source]
platform_get_size()[source]
platform_get_table()[source]
ports_add_row(*row)[source]
ports_del_row(row_id)[source]
ports_get_info()[source]
ports_get_info_name()[source]
ports_get_name(row_id)[source]
ports_get_size()[source]
ports_gettable()[source]
ports_lag_add_row(*row)[source]
ports_lags_get_size()[source]
ports_lags_get_table()[source]
start()[source]
stop()[source]
system_multicall(*calls)[source]
system_tables_ready()[source]

unittests.conftest

conftest.py

TAF unittests common options

unittests.conftest.env_setup(request)[source]
unittests.conftest.pytest_addoption(parser)[source]

unittests.test_ab_parser

test_ab_parser.py

class unittests.test_ab_parser.TestAbParser[source]

Bases: object

test_aggregation()[source]
test_parsing_of_single_ab_output()[source]

unittests.test_clicmd

test_clicmd.py

Unittests for CLI functions

class unittests.test_clicmd.TestCliCmd[source]

Bases: object

Unittests for testlib.clicmd_ons.

line = '---- -------- ------- ------------ ----------------- -----------'
test_get_column_names()[source]

Test get_column_names method.

test_get_column_ranges()[source]

Test get_column_ranges method.

test_get_dotted_table()[source]

Test get_dotted_table method.

test_get_table_value()[source]

Test get_table_value method.

unittests.test_clissh

test_clissh.py

Unittests for cli<X>.py modules

class unittests.test_clissh.TestCLISSH[source]

Bases: object

EMPTY_PASSWORD_KEY = '-----BEGIN RSA PRIVATE KEY-----\nMIIEpgIBAAKCAQEAybpzWXae7rYCORumvBc6f+J77fhZ/WU2fiqLgv62DojfWFqY\n92U0Bo8NtynU4NcVwQBrNCCpinMD3JdDcLSXsN70ON5z5FLm1Ms4gvpICei7TegC\nFVTEMsa9gfiMygDAOAapLlsZP6v1F/r/zQtsV9Nqm5pTlZ5gF6e/FmlQbg/sF52K\nA3sB762eBKfwq9p5/l2XfAELY4ypvGaAS+alVStuop3hhax5D6RUy1hG7IsMfT1x\ntFfwqgKqbO0AahjojakTlKZ+VBrGQYb9SUWSEOTN/EdU2wYDK9u08ilSCYW1HbN7\nrV4yX4ZZXZBuddll8DRVQIs5fZP1xKQBKfiSZQIDAQABAoIBAQCaIPwrGafbKXNP\nYOInCfRna4tWyg8vvVpCUY1gm+5L8qX7ItWHCGsUq85F6Q8+bvevC/vcyyvenXwQ\n2f3sKf9QYzjkDosro2+8nDzkTggmkgwyPRcCZ060oQaAPICNgr9azzQKOA51iJPu\nK5eweY7hF6Z3lxVP1r8Cs+cbX4HVZLbmva+98476zw9MD+XEd2qkjgldLAmVCsiR\nE3H862GU0yk+mReSs0Qz8OYjyWAGXPN9SmMPIv4qZg4qVLw6y17pN+A4aMmL1ThK\nh9jPoAsXL+5lpi3T1rHUes1ene9hQLyv46B6TKTiTRPnP2aUeJF7xXS4aZSEVy23\n0zTphoatAoGBAPMosVDw+iT8bMzmG5cFHEA2CAd/EdXKkPEvWUCf4uFcqzslImUW\nVq0asuFyOHBCHsVxsA5BmYRsCcQdn8jUuTHM3GAUwf29rmhW584W5Jznv1bByNMQ\nOg35zXNYm+CgxWOaD8ayXtAZ+3GgjKA+JbpsIh1wqwrv/q4atiha1CTvAoGBANRh\npnCBFgLt6l5TsnmqB4yUiSC+SIluuz5csoQnCSgb4DBKe6yjDzqXTStHQOBv5l7o\nwcuXzziX3rXZ5ym14aU20Dix7H+fjjTDCOT/A4r44PhZBKcC53RnndJ3a283BrAK\ns2p4gn0iGPtG24UG9UokDD56vDChED3Bc8a3ngXrAoGBAOJXIpbBeVcsUOp513zA\nGQf8Q4UW1zc2k6yt8lqhecNlS06GxnlqTcxcad5JQBfetF398W+TyJ7nIkAXg0Ci\nIrEkjI4zRFA5XDtrieLglHUpk4XiZFlzZVbVDFUuSgrSHGsWYVEHgBId3VxrofsX\nXm8lcKwO0Ggh9eOCocT2pzqpAoGBAJFfIekiQqnQpjrYuXKT0sUEKvTRqp7/v4UJ\nOFxCx/6/Te5gHVVm65akV/sGs76seZh/Y59zEzFeqt/4/kTLrV9ELLSR/RrCYTl2\nQpFUiN1IS91SOWAEGd/QyPN2MICYvqgjOvnm8RKsE0N0FfBxeda84/CkXEpBBPfw\ngcoEh1LvAoGBAN0Tv5gSXLkEaUfL6BTeOKr+PxKrdmUfLHSKTTT0MlA/oAig9FmZ\ndVcroxqKsqWhQmY4EgXH20IOyNdRX4d8oMyTEA1Xyorq78DVfPQAhE2y6wO0Z8K/\nmAvF7/+hRzNa4l25lailJFHR7VgwLPo24xNlWgyjn9T5JNnor8TIimoy\n-----END RSA PRIVATE KEY-----\n'
pytestmark = [<MarkDecorator 'unittests' {'args': (), 'kwargs': {}}>]
test_invalid_pkey_raises_SSHException(credentials)[source]
test_pkey(credentials)[source]
test_probe_port_1()[source]

Test probe_port function.

test_probe_port_2()[source]

Test probe_port function negative.

class unittests.test_clissh.TestSSH[source]

Bases: object

CLISSH unittests.

pytestmark = [<MarkDecorator 'unittests' {'args': (), 'kwargs': {}}>]
test_alter_in_command(cli_obj)[source]

Verify if prompt present in command it doesn’t influence on finding prompt in output data.

test_cleared_shell_buffer(cli_obj)[source]

Cleared buffer after open_shell().

test_enter_exit_mode(cli_obj, credentials)[source]

Verify enter/exit mode.

test_exec_command_timeout_ssh(cli_obj)[source]

Verify timeout for exec_command.

test_exec_command_timeout_telnet(cli_obj)[source]

Verify timeout for exec_command.

test_get_file(tmpdir, cli_obj, credentials)[source]

Copying file to remote host.

test_interactive_command_1(cli_obj)[source]

Interactive shell command with str actions.

test_interactive_command_2(cli_obj)[source]

Interactive shell command with func action.

test_login_false_username_ssh(credentials)[source]

Verify AuthenticationException in case Incorrect username for ssh object.

test_login_false_username_telnet(credentials)[source]

Verify AuthenticationException in case Incorrect username for telnet object.

test_login_false_userpass_ssh(credentials)[source]

Verify AuthenticationException in case Incorrect password for ssh object.

test_login_false_userpass_telnet(credentials)[source]

Verify AuthenticationException in case Incorrect password for telnet object.

test_login_true(credentials, request_object)[source]

Verify login/logout.

test_multiple_login_logout(credentials, request_object)[source]

Verify login after logout multiple times.

test_put_file(cli_obj, credentials)[source]

Copying file to remote host.

test_quiet_1(cli_obj)[source]

Verify raising an exception on return code != 0.

test_quiet_2(cli_obj)[source]

Check expected_rc parameter.

test_quiet_3(cli_obj, monkeypatch)[source]

Verify an exception isn’t raised on return code != 0 and default quiet option.

test_send_command(cli_obj)[source]

Send command without waiting exit.

test_send_command_continuous_output(cli_obj)[source]

Send command without waiting exit and read continuous output.

test_shell_command_1(cli_obj)[source]

Non interactive shell command. No prompt is defined.

test_shell_command_2(cli_obj)[source]

Non interactive shell command. Read prompt and set prompt.

test_shell_command_3(cli_obj)[source]

Non interactive shell command. Non 0 exit code.

test_shell_command_timeout(cli_obj)[source]

Verify timeout for shell_command.

test_sudo_shell_command_ssh(cli_obj, credentials)[source]

Verify sudo mode for ssh.

test_sudo_shell_command_telnet(cli_obj, credentials)[source]

Verify sudo mode for telnet.

unittests.test_clissh.cli_obj(request, credentials)[source]
unittests.test_clissh.create_nns(request, host, login)[source]
unittests.test_clissh.create_ssh(request, host, login)[source]
unittests.test_clissh.create_telnet(request, host, login)[source]
unittests.test_clissh.credentials(request)[source]
unittests.test_clissh.request_object(request, credentials)[source]

unittests.test_collectd

test_collectd.py

Collectd library unittests

class unittests.test_collectd.TestCollectd[source]

Bases: object

setup_tests()[source]
test_collect_no_plugins_config()[source]
test_collectd_restart()[source]
test_collectd_start()[source]
test_collectd_stop()[source]

unittests.test_common3

test_common3.py

Unittests for common3 functions

Notes

  • get_ports()
  • get_port_speed()
  • get_device_id
class unittests.test_common3.FakeOpts[source]

Bases: object

build_path = ''
env = ''
get_only = False
setup = 'setup.json'
ui = 'ons_xmlrpc'
class unittests.test_common3.TestCross[source]

Bases: object

create_ui(test_cross)[source]
test_get_device_id1()[source]
test_not_found_raises_exception()[source]
unittests.test_common3.test(request, monkeypatch)[source]

Fixture of environment with LXC for unittests of methods get_ports and get_speed.

unittests.test_common3.test_cross(request, monkeypatch)[source]

Fixture of environment with LXC for unittests of cross.

unittests.test_common3.test_get_device_id_1(test)[source]

Verify that method get_device_id returns the correct value if input data of acronym.

unittests.test_common3.test_get_device_id_2(test)[source]

Verify that method get_device_id returns the correct value if input data of autoname.

unittests.test_common3.test_get_device_id_3(test)[source]

Verify that method get_device_id returns Error message if input data incorrect.

unittests.test_common3.test_get_device_id_4(test)[source]

Verify that method get_device_id returns the correct value if input data of device’s ID.

unittests.test_common3.test_get_port_speed_1(test)[source]

Verify that method get_port_speed returns the correct value.

unittests.test_common3.test_get_port_speed_2(test)[source]

Verify that method get_port_speed returns the correct value.

unittests.test_common3.test_get_port_speed_3(test)[source]

Verify that method get_port_speed returns Error message if input data incorrect.

unittests.test_common3.test_get_port_speed_4(test)[source]

Verify that method get_port_speed returns Error message if input data incorrect.

unittests.test_common3.test_get_ports_1(test)[source]

Verify that method get_ports returns the correct value if input data of three links between devices.

unittests.test_common3.test_get_ports_12(test_tg)[source]

Verify that method get_ports returns the correct values for setup with TG.

unittests.test_common3.test_get_ports_13(test)[source]

Verify that method get_ports returns the correct value if input data without optional parameter ‘number_of_links’.

unittests.test_common3.test_get_ports_14(test)[source]

Verify that method get_ports returns the correct value if input data with two optional parameters.

unittests.test_common3.test_get_ports_15(test)[source]

Verify that method get_ports returns the correct value if input data with optional parameter ‘number_of_links’ - enum ‘ALL’.

unittests.test_common3.test_get_ports_2(test)[source]

Verify that method get_ports returns the correct value if input data of two links between devices.

unittests.test_common3.test_get_ports_3(test)[source]

Verify that method get_ports returns the correct value if input data of one links between devices.

unittests.test_common3.test_get_ports_4(test)[source]

Verify that method get_ports returns all links if no input data.

unittests.test_common3.test_get_ports_5(test)[source]

Verify that method get_ports returns all links if number of links are not defined.

unittests.test_common3.test_get_ports_6(test)[source]

Verify that method get_ports returns the correct value if input data of two links between devices and optional parameter port_speed.

unittests.test_common3.test_get_ports_7(test)[source]

Verify that method get_ports returns the correct value if input data of one links between devices and optional parameter port_speed.

unittests.test_common3.test_get_ports_8(test)[source]

Verify that that method get_ports returns the correct value if input data of one links between devices , autoname and optional parameter port_speed.

unittests.test_common3.test_get_ports_error_1(test)[source]

Verify that method get_ports returns Error message if input data of zero links between devices.

unittests.test_common3.test_get_ports_error_2(test)[source]

Verify that method get_ports returns Error message if input data are incorrect.

unittests.test_common3.test_get_ports_skip_1(test, monkeypatch)[source]

Verify that behavior of method get_ports is correct if device name or type is incorrect.

unittests.test_common3.test_get_ports_skip_2(test, monkeypatch)[source]

Verify that behavior of method get_ports is correct if input data of links are incorrect between devices.

unittests.test_common3.test_get_ports_skip_3(test, monkeypatch)[source]

Verify that behavior of method get_ports is correct if there is no connection between devices.

unittests.test_common3.test_get_ports_skip_4(test, monkeypatch)[source]

Verify that behavior of method get_ports is correct if there is no connection between devices and links are not defined

unittests.test_common3.test_get_ports_skip_5(test, monkeypatch)[source]

Verify that behavior of method get_ports is correct if port speed is incorrect

unittests.test_common3.test_get_real_port_name_1(test)[source]

Verify that method get_real_port_name returns Error message if input data incorrect.

unittests.test_common3.test_get_real_port_name_2(test)[source]

Verify that method get_real_port_name returns Error message if input data incorrect.

unittests.test_common3.test_get_real_port_name_3(test)[source]

Verify that method get_real_port_name returns Error message if input data incorrect.

unittests.test_common3.test_id2instance_1(test)[source]

Verify that method id2instance returns the correct object if input data id device as str.

unittests.test_common3.test_id2instance_2(test_tg)[source]

Verify that method id2instance returns the correct object if input data id device as int.

unittests.test_common3.test_id2instance_3(test)[source]

Verify that method id2instance returns the correct object if input data device LINK_NAME.

unittests.test_common3.test_tg(request, monkeypatch)[source]

Fixture of environment with TG for unittests of methods get_ports.

unittests.test_dependencies_core

test_dependencies_core.py

Unittests for dependencies core functions

class unittests.test_dependencies_core.FakeOpts[source]

Bases: object

FakeOpts class.

__init__()[source]

Initialize FakeOpts class.

unittests.test_dependencies_core.test_import_afs_module()[source]

Verify that all modules can be imported within ‘afs’ module and ‘AFS’ object can be created.

unittests.test_dependencies_core.test_import_afscross_module()[source]

Verify that all modules can be imported within ‘afs’ module and ‘AFS’ object can be created.

unittests.test_dependencies_core.test_import_clicmd_module()[source]

Verify that all modules can be imported within ‘clicmd’ module and ‘CLICmd’ object can be created.

unittests.test_dependencies_core.test_import_clissh_module()[source]

Verify that all modules can be imported within ‘clissh’ module and ‘CLISSH’ object can be created.

unittests.test_dependencies_core.test_import_common3_module(monkeypatch)[source]

Verify that all modules can be imported within ‘common3’ module and ‘Cross’/’Environment’ objects can be created.

unittests.test_dependencies_core.test_import_connpool_module()[source]

Verify that all modules can be imported within ‘connpool’ module and ‘ConnectionPool’ object can be created.

unittests.test_dependencies_core.test_import_custom_exception_module()[source]

Verify that all modules can be imported within ‘custom_exception’ module and object of classes can be created.

unittests.test_dependencies_core.test_import_dev_basecross_module()[source]

Verify that all modules can be imported within ‘dev_basecross’ module and classes objects can be created.

unittests.test_dependencies_core.test_import_dev_linux_host_module()[source]

Verify that all modules can be imported within ‘dev_linux_host’ module and classes objects can be created.

unittests.test_dependencies_ons

test_dependencies_ons.py

Unittests for dependencies functions

unittests.test_dependencies_ons.check_module_name_error_status(module_name)[source]

The function return True if name error is present in module during importing.

unittests.test_dependencies_ons.test_import_dev_staticcross_ons_module()[source]

Verify that all modules can be imported within ‘dev_staticcross_ons’ module and classes objects can be created.

unittests.test_dependencies_ons.test_import_dev_switches_module()[source]

Verify that all modules can be imported within ‘dev_switches’ module and classes objects can be created.

unittests.test_dependencies_ons.test_import_helpers_module()[source]

Verify that all modules can be imported within ‘helpers’ module and classes objects can be created.

unittests.test_dev_linux_host

test_dev_linux_host.py

Unittests for dev_linux_host.py

class unittests.test_dev_linux_host.FakeCLISSH(*args, **kwargs)[source]

Bases: object

cmd_list = []
exec_command(command, time=0)[source]
login_status = True
shell_read = None
class unittests.test_dev_linux_host.FakeOpts[source]

Bases: object

env = 'setup.json'
get_only = False
lhost_ui = 'linux_bash'
setup = 'setup.json'
unittests.test_dev_linux_host.lh(monkeypatch, patch_clissh)[source]
unittests.test_dev_linux_host.patch_clissh(request, monkeypatch)[source]
unittests.test_dev_linux_host.test_add_mgmt_bridge_1(monkeypatch)[source]

Verify that add_mgmt_bridge function generate correct set of commands to add mgmt bridge.

unittests.test_dev_linux_host.test_add_mgmt_bridge_2(monkeypatch)[source]

Verify that add_mgmt_bridge function return exception when managment bridge can not be created.

unittests.test_dev_linux_host.test_add_mgmt_iface_1(monkeypatch)[source]

Verify that mgmt interface return correct set of commands for adding mgmt interface.

unittests.test_dev_linux_host.test_add_mgmt_iface_2(monkeypatch)[source]

Verify that add_mgmt_iface function return exception when mgmt iface can’t be deleted.

unittests.test_dev_linux_host.test_brctl_1(monkeypatch)[source]

Verify that brctl command return correct set of commands when add parameter with stp_cfg is defined.

unittests.test_dev_linux_host.test_brctl_10()[source]

Verify brctl command return exception when incorrect mode is set.

unittests.test_dev_linux_host.test_brctl_11(monkeypatch)[source]

Verify that brctl function return proper dictionary when ‘stpstat’ parameter is set.

unittests.test_dev_linux_host.test_brctl_12(monkeypatch)[source]

Verify that brctl function return dictionary when “macs” parameter is set.

unittests.test_dev_linux_host.test_brctl_13()[source]

Verify that brctl function with stpstat parameter return exception when bridge name is not set.

unittests.test_dev_linux_host.test_brctl_14()[source]

Verify that brctl function with mac parameter return exception when bridge name is not set.

unittests.test_dev_linux_host.test_brctl_2(monkeypatch)[source]

Verify that brctl command return correct set of commands when add command is used.

unittests.test_dev_linux_host.test_brctl_3(monkeypatch, lh)[source]

Verify that brctl command return correct set of commands when cfg parameter is used.

unittests.test_dev_linux_host.test_brctl_4(monkeypatch)[source]

Verify cfg parameter in brctl return exception when bridge name is not given.

unittests.test_dev_linux_host.test_brctl_5()[source]

Verify brctl command return exception when bridge name is not given as parameter.

unittests.test_dev_linux_host.test_brctl_6()[source]

Verify brctl command with delif parameter return exception when ports are not given.

unittests.test_dev_linux_host.test_brctl_7(monkeypatch, lh)[source]

Verify that brctl command with delif parameter return correct set of commands when all needed parameters are given.

unittests.test_dev_linux_host.test_brctl_8(monkeypatch, lh)[source]

Verify that brctl command with del parameter return correct set of commands when all needed parameters are given.

unittests.test_dev_linux_host.test_brctl_9()[source]

Verify that brctl command return exception when br name is not set.

unittests.test_dev_linux_host.test_check_mgmt_bridge_1(monkeypatch)[source]

Verify that check_mgmt_bridge check that mgmt bridge is already created.

unittests.test_dev_linux_host.test_check_mgmt_bridge_2(monkeypatch)[source]

Verify that check_mgmt_bridge check that mgmt bridge is already created.

unittests.test_dev_linux_host.test_cleanup_1(monkeypatch, lh)[source]

Verify that cleanup function clear route configurations.

unittests.test_dev_linux_host.test_cleanup_2(monkeypatch, lh)[source]

Verify that cleanup function clear ifconfig configurations.

unittests.test_dev_linux_host.test_cleanup_3(monkeypatch, lh)[source]

Verify that cleanup function clear vconfig configurations.

unittests.test_dev_linux_host.test_cleanup_4(monkeypatch, lh)[source]

Verify that cleanup function form correct set of commands to delete brctl bridges.

unittests.test_dev_linux_host.test_del_mgmt_bridge_1(monkeypatch)[source]

Verify that del_mgmt_bridge function return correct set of commands to delete mgmt bridge.

unittests.test_dev_linux_host.test_del_mgmt_bridge_2(monkeypatch)[source]

Verify that del_mgmt_bridge function return exception when management bridge can not be deleted.

unittests.test_dev_linux_host.test_del_mgmt_iface_1(monkeypatch)[source]

Verify that del_mgmt_iface function return correct set of commands and exceptions when mgmt interface can not be deleted.

unittests.test_dev_linux_host.test_del_mgmt_iface_2(monkeypatch)[source]

Verify that del_mgmt_iface function return correct set of commands and exceptions when mgmt interface can not be deleted.

unittests.test_dev_linux_host.test_enable_8021q_1(monkeypatch)[source]

Verify that enable_802q_1 function return exception when 802.1q is not supported by current os.

unittests.test_dev_linux_host.test_enable_8021q_2(monkeypatch)[source]

Verify that enable_802q_1 function return correct set of commands when 8021q is already loaded.

unittests.test_dev_linux_host.test_enable_8021q_3(monkeypatch)[source]

Verify that enable_802q_1 function return exception if 8021q can not be loaded.

unittests.test_dev_linux_host.test_ethtool_1()[source]

Verify ethtool function return exception when improper mode is set.

unittests.test_dev_linux_host.test_ethtool_2(monkeypatch, lh)[source]

Verify ethtool function return correct set of commands when correct parameters are set.

unittests.test_dev_linux_host.test_getmac_1(monkeypatch)[source]

Verify that getmac function return interface mac address.

unittests.test_dev_linux_host.test_ifconfig_1(monkeypatch, lh)[source]

Verify that ifconfig command form correct set of commands when all parameters are used.

unittests.test_dev_linux_host.test_ifconfig_2()[source]

Verify that ifconfig command return exception when lengths of ipaddr, ip6addr, mac parameters is not correct.

unittests.test_dev_linux_host.test_ifconfig_4()[source]

Verify that ifconfig command return exception when stats parameter without ports is given.

unittests.test_dev_linux_host.test_ifconfig_5()[source]

Verify that ifconfig command return exception when improper mode is set.

unittests.test_dev_linux_host.test_ifconfig_6(monkeypatch, lh)[source]

Verify that ifconfig function return proper dictionary when ‘stat’ parameter is used.

unittests.test_dev_linux_host.test_ifconfig_7(monkeypatch, lh)[source]

Verify that exec command works with root privileges.

unittests.test_dev_linux_host.test_init(monkeypatch)[source]
unittests.test_dev_linux_host.test_ipforward_2()[source]

Verify that ipforward command return exception when incorrect version value is set.

unittests.test_dev_linux_host.test_ipforward_3(monkeypatch, lh)[source]

Verify that ipforward command return correct set of commands when version is None.

unittests.test_dev_linux_host.test_ipforwrd_1(monkeypatch, lh)[source]

Verify that ip_forward function return correct set of commands when correct parameters are used.

unittests.test_dev_linux_host.test_routes_1(monkeypatch, lh)[source]

Verify that routes forms correct set of commands when all parameters are used.

unittests.test_dev_linux_host.test_routes_2()[source]

Verify that routes command return exception when incorrect mode value is set.

unittests.test_dev_linux_host.test_routes_3()[source]

Verify that routes function return exception when length of nexthop and netwrk lists is not equal.

unittests.test_dev_linux_host.test_routes_4()[source]

Verify that routes function return exception when length of nexthop and ports lists is not equal.

unittests.test_dev_linux_host.test_routes_5(monkeypatch, lh)[source]

Verify that route function return correct set of commands when ports parameter is None.

unittests.test_dev_linux_host.test_routes_6(monkeypatch, lh)[source]

Verify that route function return correct set of commands.

unittests.test_dev_linux_host.test_routes_7(monkeypatch, lh)[source]

Verify that route function return correct set of commands.

unittests.test_dev_linux_host.test_routes_8(monkeypatch, lh)[source]

Verify that route function return correct set of commands.

unittests.test_dev_linux_host.test_start_1(monkeypatch)[source]

Verify that start function return exception when namespace is already created.

unittests.test_dev_linux_host.test_start_2(monkeypatch)[source]

Verify that start function exception when network namespace was not created.

unittests.test_dev_linux_host.test_vconf_1(monkeypatch)[source]

Verify vconf function return correct command when rem parameter is set.

unittests.test_dev_linux_host.test_vconf_2(monkeypatch)[source]

Verify vconf function return correct command when add parameter is set.

unittests.test_dev_linux_host.test_vconf_3(monkeypatch)[source]

Verify vconf function return exception after creating vlan which is already exist.

unittests.test_dev_linux_host.test_vconf_4(monkeypatch)[source]

Verify that vconfig function return exception when mode is incorrect.

unittests.test_fixtures

test_fixtures.py

Unittests for TAF fixtures`

class unittests.test_fixtures.EnvTest(setup, env)[source]

Bases: object

start(request, monkeypatch, xmlrpcs)[source]
class unittests.test_fixtures.FakeOpts[source]

Bases: object

build_path = ''
env = ''
get_only = True
setup = 'setup.json'
ui = 'ons_xmlrpc'
class unittests.test_fixtures.TestLagIdGenerator[source]

Bases: object

test_handle_same_device_twice()[source]
test_ignore_none()[source]
test_intersection()[source]
test_none_if_no_args()[source]
test_stop_iteration_when_no_intersection()[source]
unittests.test_fixtures.env(request, monkeypatch, fake_xmlrpc)[source]

Fixture of environment with LXC for unittests of methods get_ports and get_speed.

unittests.test_fixtures.env_complex(request, monkeypatch, fake_xmlrpc)[source]

Fixture of environment with LXC for unittests of methods get_ports and get_speed.

unittests.test_fixtures.env_list(request, monkeypatch, fake_xmlrpc)[source]

Fixture of environment with LXC for unittests of methods get_ports and get_speed.

unittests.test_fixtures.fake_xmlrpc(request)[source]
unittests.test_fixtures.test_env_lag_1(request, env)[source]

Verify env_lag fixture adds LAGs into ports list.

unittests.test_fixtures.test_env_lag_10(request, env_list)[source]

Verify env_lag fixture doesn’t add LAGs into ports list in case max_lags is less than links count.

unittests.test_fixtures.test_env_lag_11(request, env)[source]

Verify env_lag fixture doesn’t add LAGs into ports list in case port is already in LAG.

unittests.test_fixtures.test_env_lag_12(request, env_list)[source]

Verify env_lag fixture doesn’t add LAGs into ports list in case port is already in LAG.

unittests.test_fixtures.test_env_lag_13(request, env_complex)[source]

Verify env_lag fixture adds LAGs into ports list in complex setup.

unittests.test_fixtures.test_env_lag_14(request, env_complex)[source]

Verify env_lag fixture adds LAGs into LagsAdmin table in complex setup.

unittests.test_fixtures.test_env_lag_15(request, env_complex)[source]

Verify env_lag fixture adds ports to LAGs in complex setup.

unittests.test_fixtures.test_env_lag_2(request, env)[source]

Verify env_lag fixture adds LAGs into LagsAdmin table.

unittests.test_fixtures.test_env_lag_3(request, env)[source]

Verify env_lag fixture adds ports to LAGs.

unittests.test_fixtures.test_env_lag_4(request, env_list)[source]

Verify env_lag fixture adds LAGs into port_list.

unittests.test_fixtures.test_env_lag_5(request, env_list)[source]

Verify env_lag fixture adds LAGs into LagsAdmin table.

unittests.test_fixtures.test_env_lag_6(request, env_list)[source]

Verify env_lag fixture adds ports to LAGs.

unittests.test_fixtures.test_env_lag_7(request, env)[source]

Verify env_lag fixture changes links in setup file.

unittests.test_fixtures.test_env_lag_8(request, env_list)[source]

Verify env_lag fixture changes links in setup file.

unittests.test_fixtures.test_env_lag_9(request, env)[source]

Verify env_lag fixture doesn’t add LAGs into ports list in case max_lags is less than links count.

unittests.test_getports

test_getports.py

Unittests for getting ports functions

class unittests.test_getports.FakeOpts[source]

Bases: object

FakeOpts class.

build_path = ''
env = 'setup.json'
get_only = False
setup = 'setup.json'
ui = 'ons_xmlrpc'
unittests.test_getports.env_golden(request, monkeypatch)[source]

Fixture of environment for unittests of methods get_ports and get_speed.

unittests.test_getports.env_simple(request, monkeypatch)[source]

Fixture of environment for unittests of methods get_ports and get_speed.

unittests.test_getports.test_getports01(env_golden)[source]
unittests.test_getports.test_getports02(env_simple)[source]
unittests.test_getports.test_getports03(env_golden)[source]

unittests.test_helpers

test_helpers.py

Unittests for helpers functions

class unittests.test_helpers.Config(env)[source]

Bases: object

class unittests.test_helpers.Device(stype)[source]

Bases: object

class unittests.test_helpers.Env(switches, tgs)[source]

Bases: object

class unittests.test_helpers.Options[source]

Bases: object

class unittests.test_helpers.TestGetAttribute[source]

Bases: object

test_is_namedtuple()[source]
test_is_not_found()[source]
test_is_not_found_with_decorator()[source]
test_namedtuple_is_decorator()[source]
test_unnamedtuple()[source]
test_unnamedtuple_is_decorator()[source]
class unittests.test_helpers.TestGetSteppedValue[source]

Bases: object

test_get_stepped_value_int()[source]
test_get_stepped_value_invalid()[source]
test_get_stepped_value_ordereddict()[source]
class unittests.test_helpers.TestGrouper[source]

Bases: object

test_grouper_1()[source]
test_grouper_3()[source]
test_grouper_bigger()[source]
test_grouper_empty()[source]
test_grouper_negative()[source]
unittests.test_helpers.test_realswitch_only_marker()[source]
unittests.test_helpers.test_run_on_platforms_marker()[source]
unittests.test_helpers.test_run_on_tg_marker()[source]
unittests.test_helpers.test_run_on_ui_marker()[source]
unittests.test_helpers.test_simswitch_only_marker()[source]
unittests.test_helpers.test_skip_on_platforms_marker()[source]
unittests.test_helpers.test_skip_on_tg_marker()[source]
unittests.test_helpers.test_skip_on_ui_marker()[source]

unittests.test_linux_host_bash

test_ui_onpss_shell.py

Unittests for UI ONPSS Shell

class unittests.test_linux_host_bash.OnpssRawOutput[source]

Bases: object

unittests.test_linux_host_bash.make_multicall_ports_side_effect(iplink_out, ethtool_out, pci_out)[source]
unittests.test_linux_host_bash.make_ports_side_effect(iplink_out, ethtool_out, pci_out)[source]
unittests.test_linux_host_bash.test_get_table_ports(ui)[source]
unittests.test_linux_host_bash.test_get_table_ports_no_ethtool(ui)[source]
unittests.test_linux_host_bash.ui()[source]

unittests.test_loggers

test_loggers.py

Unittests for logging functionality in TAF

class unittests.test_loggers.TestLogger[source]

Bases: object

classmethod teardown_class()[source]

Removes all created files and directory on teardown of class.

test_debug_log_message(skip_if_no_fixture, caplog, simple_log, request)[source]

Verify that log message for level DEBUG contains correct values.

test_error_log_message(skip_if_no_fixture, caplog, simple_log, request)[source]

Verify that log message for level ERROR contains correct values.

test_info_log_message(skip_if_no_fixture, caplog, simple_log, request)[source]

Verify that log message for level INFO contains correct values.

test_log_message_for_exception(skip_if_no_fixture, caplog, exception_log, request, log_file)[source]

Verify that log message for exception from log files contains correct values.

test_log_message_for_exception_with_trace(skip_if_no_fixture, caplog, request)[source]

Verify that log messages for exception with trace contains correct values.

test_log_message_for_introspection(skip_if_no_fixture, caplog, introspection_log, request, log_file)[source]

Verify that log message for introspection from log files contains correct values.

test_log_message_from_log_file_for_exception_with_trace(skip_if_no_fixture, caplog, request, log_file, exception_log, monkeypatch)[source]

Verify that log messages for exception with trace from log files contains correct values.

test_log_message_from_log_files(skip_if_no_fixture, caplog, file_log, request, log_file)[source]

Verify that log message from log files contains correct values.

test_loggers_mkdir()[source]

Verify that method mkdir_p creates directory.

test_loggers_options_and_mkdir(request)[source]

Verify that dictionary of logging options contains correct values and method mkdir_p is not creates directory if it exists.

test_module_log_message(skip_if_no_fixture, caplog, request, module_log)[source]

Verify that module level logging contains correct values.

test_warning_log_message(skip_if_no_fixture, caplog, simple_log, request)[source]

Verify that log message for level WARNING contains correct values.

unittests.test_loggers.exception_log(request)[source]
unittests.test_loggers.file_log(request)[source]
unittests.test_loggers.introspection_log(request)[source]
unittests.test_loggers.log_file(request)[source]
unittests.test_loggers.module_log(request)[source]
unittests.test_loggers.remove_logger(logger)[source]
unittests.test_loggers.simple_log(request)[source]
unittests.test_loggers.skip_if_no_fixture(request)[source]

unittests.test_multicall

test_multicall.py

Test Multicall

class unittests.test_multicall.TestMultiCall[source]

Bases: object

test_individual_element_lists_in_worst_case()[source]
test_original_list_if_small_enough()[source]
test_single_split()[source]

unittests.test_pwsw

test_pwsw.py

Unittests powerboard functions

unittests.test_pwsw.test_pwsw1()[source]

unittests.test_pytest_helpers

test_pytest_helpers.py

Unittests for helpers functions

unittests.test_pytest_helpers.report()[source]
unittests.test_pytest_helpers.test_bad_fixture_param_value_is_handled()[source]
unittests.test_pytest_helpers.test_bad_fixture_param_value_is_handled_with_parametrize()[source]
unittests.test_pytest_helpers.test_brackets_are_removed()[source]
unittests.test_pytest_helpers.test_get_brief_callspec()[source]
unittests.test_pytest_helpers.test_get_brief_doc()[source]
unittests.test_pytest_helpers.test_get_brief_docstring()[source]
unittests.test_pytest_helpers.test_get_brief_inspect()[source]
unittests.test_pytest_helpers.test_get_failure_reason()[source]

Check if get_failure_reason(data) is work correct.

unittests.test_pytest_helpers.test_get_failure_reason_TypeError_returns_None()[source]
unittests.test_pytest_helpers.test_get_html_xml_path_non_string()[source]
unittests.test_pytest_helpers.test_get_skipped_reason_TypeError_returns_None()[source]
unittests.test_pytest_helpers.test_get_steps_callspec()[source]
unittests.test_pytest_helpers.test_get_steps_doc()[source]
unittests.test_pytest_helpers.test_get_steps_docstring()[source]
unittests.test_pytest_helpers.test_get_steps_inspect()[source]
unittests.test_pytest_helpers.test_get_suite_name()[source]

Test of pytest_helpers.get_tcname() function.

unittests.test_pytest_helpers.test_tcname_argvalue(report)[source]
unittests.test_pytest_helpers.test_tcname_basic(report)[source]

Test of pytest_helpers.get_tcname() function.

unittests.test_pytest_helpers.test_tcname_keywords(report)[source]

unittests.test_pytest_loganalyzer

test_pytest_loganalyzer.py

Unittests for loganalyzer

class unittests.test_pytest_loganalyzer.TestLogAnalyzer[source]

Bases: object

LOG_NO_DUPS = '{ "MESSAGE" : "Removed session 50.", "__REALTIME_TIMESTAMP" : "1432210094242414", "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10b9;b=4931b09af53f45f1b83fe32300ab7609;m=1639692e56;t=5169665e93a6e;x=ed01d0faf5cc1d2c", "__MONOTONIC_TIMESTAMP" : "95452474966", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_CAP_EFFECTIVE" : "3fffffffff", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "_TRANSPORT" : "syslog", "SYSLOG_FACILITY" : "10", "SYSLOG_IDENTIFIER" : "sshd", "_COMM" : "sshd", "_EXE" : "/usr/sbin/sshd", "_SELINUX_CONTEXT" : "system_u:system_r:sshd_t:s0-s0:c0.c1023", "_SYSTEMD_OWNER_UID" : "0", "_SYSTEMD_SLICE" : "user-0.slice", "_AUDIT_LOGINUID" : "0", "_HOSTNAME" : "rr24or", "_CMDLINE" : "sshd: root [priv] ", "SYSLOG_PID" : "15433", "_PID" : "15433", "_AUDIT_SESSION" : "50", "_SYSTEMD_CGROUP" : "/user.slice/user-0.slice/session-50.scope", "_SYSTEMD_SESSION" : "50", "_SYSTEMD_UNIT" : "session-50.scope", "_SOURCE_REALTIME_TIMESTAMP" : "1432210094240765" }\n{ "MESSAGE" : "Removed session 51.", "__REALTIME_TIMESTAMP" : "1432210094242414", "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10b9;b=4931b09af53f45f1b83fe32300ab7609;m=1639692e56;t=5169665e93a6e;x=ed01d0faf5cc1d2c", "__MONOTONIC_TIMESTAMP" : "95452474966", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_CAP_EFFECTIVE" : "3fffffffff", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "_TRANSPORT" : "syslog", "SYSLOG_FACILITY" : "10", "SYSLOG_IDENTIFIER" : "sshd", "_COMM" : "sshd", "_EXE" : "/usr/sbin/sshd", "_SELINUX_CONTEXT" : "system_u:system_r:sshd_t:s0-s0:c0.c1023", "_SYSTEMD_OWNER_UID" : "0", "_SYSTEMD_SLICE" : "user-0.slice", "_AUDIT_LOGINUID" : "0", "_HOSTNAME" : "rr24or", "_CMDLINE" : "sshd: root [priv] ", "SYSLOG_PID" : "15433", "_PID" : "15433", "_AUDIT_SESSION" : "50", "_SYSTEMD_CGROUP" : "/user.slice/user-0.slice/session-50.scope", "_SYSTEMD_SESSION" : "50", "_SYSTEMD_UNIT" : "session-50.scope", "_SOURCE_REALTIME_TIMESTAMP" : "1432210094240765" }\n{ "MESSAGE" : "Removed session 52.", "__REALTIME_TIMESTAMP" : "1432210094242414", "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10b9;b=4931b09af53f45f1b83fe32300ab7609;m=1639692e56;t=5169665e93a6e;x=ed01d0faf5cc1d2c", "__MONOTONIC_TIMESTAMP" : "95452474966", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_CAP_EFFECTIVE" : "3fffffffff", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "_TRANSPORT" : "syslog", "SYSLOG_FACILITY" : "10", "SYSLOG_IDENTIFIER" : "sshd", "_COMM" : "sshd", "_EXE" : "/usr/sbin/sshd", "_SELINUX_CONTEXT" : "system_u:system_r:sshd_t:s0-s0:c0.c1023", "_SYSTEMD_OWNER_UID" : "0", "_SYSTEMD_SLICE" : "user-0.slice", "_AUDIT_LOGINUID" : "0", "_HOSTNAME" : "rr24or", "_CMDLINE" : "sshd: root [priv] ", "SYSLOG_PID" : "15433", "_PID" : "15433", "_AUDIT_SESSION" : "50", "_SYSTEMD_CGROUP" : "/user.slice/user-0.slice/session-50.scope", "_SYSTEMD_SESSION" : "50", "_SYSTEMD_UNIT" : "session-50.scope", "_SOURCE_REALTIME_TIMESTAMP" : "1432210094240765" }\n{ "MESSAGE" : "Removed session 53.", "__REALTIME_TIMESTAMP" : "1432210094242414", "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10b9;b=4931b09af53f45f1b83fe32300ab7609;m=1639692e56;t=5169665e93a6e;x=ed01d0faf5cc1d2c", "__MONOTONIC_TIMESTAMP" : "95452474966", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_CAP_EFFECTIVE" : "3fffffffff", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "_TRANSPORT" : "syslog", "SYSLOG_FACILITY" : "10", "SYSLOG_IDENTIFIER" : "sshd", "_COMM" : "sshd", "_EXE" : "/usr/sbin/sshd", "_SELINUX_CONTEXT" : "system_u:system_r:sshd_t:s0-s0:c0.c1023", "_SYSTEMD_OWNER_UID" : "0", "_SYSTEMD_SLICE" : "user-0.slice", "_AUDIT_LOGINUID" : "0", "_HOSTNAME" : "rr24or", "_CMDLINE" : "sshd: root [priv] ", "SYSLOG_PID" : "15433", "_PID" : "15433", "_AUDIT_SESSION" : "50", "_SYSTEMD_CGROUP" : "/user.slice/user-0.slice/session-50.scope", "_SYSTEMD_SESSION" : "50", "_SYSTEMD_UNIT" : "session-50.scope", "_SOURCE_REALTIME_TIMESTAMP" : "1432210094240765" }'
LOG_UNICODE_WITH_DUP = '{ "__CURSOR" : "s=da7709dd847c44dfab1720741ac0cb4d;i=987;b=614a99edf174450881448555522144d8;m=831cd2;t=5179273d9ed40;x=e2f17e57f07304e2", "__REALTIME_TIMESTAMP" : "1433292659879232", "__MONOTONIC_TIMESTAMP" : "8592594", "_BOOT_ID" : "614a99edf174450881448555522144d8", "PRIORITY" : "6", "_TRANSPORT" : "driver", "MESSAGE" : "Runtime journal is using 8.0M (max allowed 399.0M, trying to leave 598.5M free of 3.8G available \uffffffe2\uffffff86\uffffff92 current limit 399.0M).", "MESSAGE_ID" : "ec387f577b844b8fa948f33cad9a75e6", "_PID" : "338", "_UID" : "0", "_GID" : "0", "_COMM" : "systemd-journal", "_EXE" : "/usr/lib/systemd/systemd-journald", "_CMDLINE" : "/usr/lib/systemd/systemd-journald", "_CAP_EFFECTIVE" : "4402800cf", "_SYSTEMD_CGROUP" : "/system.slice/systemd-journald.service", "_SYSTEMD_UNIT" : "systemd-journald.service", "_SYSTEMD_SLICE" : "system.slice", "_SELINUX_CONTEXT" : "system_u:system_r:syslogd_t:s0", "_MACHINE_ID" : "da7c5b98bff3487ab53fcda4b679bc88", "_HOSTNAME" : "rr12or" }\n{ "__CURSOR" : "s=da7709dd847c44dfab1720741ac0cb4d;i=988;b=614a99edf174450881448555522144d8;m=831fee;t=5179273d9f05c;x=e2f17e57f07304e2", "__REALTIME_TIMESTAMP" : "1433292659880028", "__MONOTONIC_TIMESTAMP" : "8593390", "_BOOT_ID" : "614a99edf174450881448555522144d8", "PRIORITY" : "6", "_TRANSPORT" : "driver", "MESSAGE" : "Runtime journal is using 8.0M (max allowed 399.0M, trying to leave 598.5M free of 3.8G available \uffffffe2\uffffff86\uffffff92 current limit 399.0M).", "MESSAGE_ID" : "ec387f577b844b8fa948f33cad9a75e6", "_PID" : "338", "_UID" : "0", "_GID" : "0", "_COMM" : "systemd-journal", "_EXE" : "/usr/lib/systemd/systemd-journald", "_CMDLINE" : "/usr/lib/systemd/systemd-journald", "_CAP_EFFECTIVE" : "4402800cf", "_SYSTEMD_CGROUP" : "/system.slice/systemd-journald.service", "_SYSTEMD_UNIT" : "systemd-journald.service", "_SYSTEMD_SLICE" : "system.slice", "_SELINUX_CONTEXT" : "system_u:system_r:syslogd_t:s0", "_MACHINE_ID" : "da7c5b98bff3487ab53fcda4b679bc88", "_HOSTNAME" : "rr12or" }\n'
LOG_WITH_DIFF_CASES = '{ "MESSAGE" : "Removed session 50.", "__REALTIME_TIMESTAMP" : "1432210094242414", "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10b9;b=4931b09af53f45f1b83fe32300ab7609;m=1639692e56;t=5169665e93a6e;x=ed01d0faf5cc1d2c", "__MONOTONIC_TIMESTAMP" : "95452474966", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_CAP_EFFECTIVE" : "3fffffffff", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "_TRANSPORT" : "syslog", "SYSLOG_FACILITY" : "10", "SYSLOG_IDENTIFIER" : "sshd", "_COMM" : "sshd", "_EXE" : "/usr/sbin/sshd", "_SELINUX_CONTEXT" : "system_u:system_r:sshd_t:s0-s0:c0.c1023", "_SYSTEMD_OWNER_UID" : "0", "_SYSTEMD_SLICE" : "user-0.slice", "_AUDIT_LOGINUID" : "0", "_HOSTNAME" : "rr24or", "_CMDLINE" : "sshd: root [priv] ", "SYSLOG_PID" : "15433", "_PID" : "15433", "_AUDIT_SESSION" : "50", "_SYSTEMD_CGROUP" : "/user.slice/user-0.slice/session-50.scope", "_SYSTEMD_SESSION" : "50", "_SYSTEMD_UNIT" : "session-50.scope", "_SOURCE_REALTIME_TIMESTAMP" : "1432210094240765" }\n{ "MESSAGE" : "Removed session 51.", "__REALTIME_TIMESTAMP" : "1432210094242414", "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10b9;b=4931b09af53f45f1b83fe32300ab7609;m=1639692e56;t=5169665e93a6e;x=ed01d0faf5cc1d2c", "__MONOTONIC_TIMESTAMP" : "95452474966", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_CAP_EFFECTIVE" : "3fffffffff", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "_TRANSPORT" : "syslog", "SYSLOG_FACILITY" : "10", "SYSLOG_IDENTIFIER" : "sshd", "_COMM" : "sshd", "_EXE" : "/usr/sbin/sshd", "_SELINUX_CONTEXT" : "system_u:system_r:sshd_t:s0-s0:c0.c1023", "_SYSTEMD_OWNER_UID" : "0", "_SYSTEMD_SLICE" : "user-0.slice", "_AUDIT_LOGINUID" : "0", "_HOSTNAME" : "rr24or", "_CMDLINE" : "sshd: root [priv] ", "SYSLOG_PID" : "15433", "_PID" : "15433", "_AUDIT_SESSION" : "50", "_SYSTEMD_CGROUP" : "/user.slice/user-0.slice/session-50.scope", "_SYSTEMD_SESSION" : "50", "_SYSTEMD_UNIT" : "session-50.scope", "_SOURCE_REALTIME_TIMESTAMP" : "1432210094240765" }\n{ "MESSAGE" : " Removed session 52. ", "__REALTIME_TIMESTAMP" : "1432210094242414", "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10b9;b=4931b09af53f45f1b83fe32300ab7609;m=1639692e56;t=5169665e93a6e;x=ed01d0faf5cc1d2c", "__MONOTONIC_TIMESTAMP" : "95452474966", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_CAP_EFFECTIVE" : "3fffffffff", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "_TRANSPORT" : "syslog", "SYSLOG_FACILITY" : "10", "SYSLOG_IDENTIFIER" : "sshd", "_COMM" : "sshd", "_EXE" : "/usr/sbin/sshd", "_SELINUX_CONTEXT" : "system_u:system_r:sshd_t:s0-s0:c0.c1023", "_SYSTEMD_OWNER_UID" : "0", "_SYSTEMD_SLICE" : "user-0.slice", "_AUDIT_LOGINUID" : "0", "_HOSTNAME" : "rr24or", "_CMDLINE" : "sshd: root [priv] ", "SYSLOG_PID" : "15433", "_PID" : "15433", "_AUDIT_SESSION" : "50", "_SYSTEMD_CGROUP" : "/user.slice/user-0.slice/session-50.scope", "_SYSTEMD_SESSION" : "50", "_SYSTEMD_UNIT" : "session-50.scope", "_SOURCE_REALTIME_TIMESTAMP" : "1432210094240765" }\n{ "MESSAGE" : "removed session 52.", "__REALTIME_TIMESTAMP" : "1432210094242414", "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10b9;b=4931b09af53f45f1b83fe32300ab7609;m=1639692e56;t=5169665e93a6e;x=ed01d0faf5cc1d2c", "__MONOTONIC_TIMESTAMP" : "95452474966", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_CAP_EFFECTIVE" : "3fffffffff", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "_TRANSPORT" : "syslog", "SYSLOG_FACILITY" : "10", "SYSLOG_IDENTIFIER" : "sshd", "_COMM" : "sshd", "_EXE" : "/usr/sbin/sshd", "_SELINUX_CONTEXT" : "system_u:system_r:sshd_t:s0-s0:c0.c1023", "_SYSTEMD_OWNER_UID" : "0", "_SYSTEMD_SLICE" : "user-0.slice", "_AUDIT_LOGINUID" : "0", "_HOSTNAME" : "rr24or", "_CMDLINE" : "sshd: root [priv] ", "SYSLOG_PID" : "15433", "_PID" : "15433", "_AUDIT_SESSION" : "50", "_SYSTEMD_CGROUP" : "/user.slice/user-0.slice/session-50.scope", "_SYSTEMD_SESSION" : "50", "_SYSTEMD_UNIT" : "session-50.scope", "_SOURCE_REALTIME_TIMESTAMP" : "1432210094240765" }'
LOG_WITH_DUPS = '{ "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10b9;b=4931b09af53f45f1b83fe32300ab7609;m=1639692e56;t=5169665e93a6e;x=ed01d0faf5cc1d2c", "__REALTIME_TIMESTAMP" : "1432210094242414", "__MONOTONIC_TIMESTAMP" : "95452474966", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_CAP_EFFECTIVE" : "3fffffffff", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "_TRANSPORT" : "syslog", "SYSLOG_FACILITY" : "10", "SYSLOG_IDENTIFIER" : "sshd", "_COMM" : "sshd", "_EXE" : "/usr/sbin/sshd", "_SELINUX_CONTEXT" : "system_u:system_r:sshd_t:s0-s0:c0.c1023", "_SYSTEMD_OWNER_UID" : "0", "_SYSTEMD_SLICE" : "user-0.slice", "_AUDIT_LOGINUID" : "0", "_HOSTNAME" : "rr24or", "_CMDLINE" : "sshd: root [priv] ", "MESSAGE" : "Removed session 50.", "SYSLOG_PID" : "15433", "_PID" : "15433", "_AUDIT_SESSION" : "50", "_SYSTEMD_CGROUP" : "/user.slice/user-0.slice/session-50.scope", "_SYSTEMD_SESSION" : "50", "_SYSTEMD_UNIT" : "session-50.scope", "_SOURCE_REALTIME_TIMESTAMP" : "1432210094240765" }\n{ "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10ba;b=4931b09af53f45f1b83fe32300ab7609;m=1648a3182f;t=5169675232447;x=b51015aee07e27a2", "__REALTIME_TIMESTAMP" : "1432210349696071", "__MONOTONIC_TIMESTAMP" : "95707928623", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_CAP_EFFECTIVE" : "3fffffffff", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "_TRANSPORT" : "syslog", "SYSLOG_FACILITY" : "10", "SYSLOG_IDENTIFIER" : "sshd", "_COMM" : "sshd", "_EXE" : "/usr/sbin/sshd", "_SELINUX_CONTEXT" : "system_u:system_r:sshd_t:s0-s0:c0.c1023", "_SYSTEMD_OWNER_UID" : "0", "_SYSTEMD_SLICE" : "user-0.slice", "_AUDIT_LOGINUID" : "0", "_HOSTNAME" : "rr24or", "MESSAGE" : "Removed session 50.", "_CMDLINE" : "sshd: root@notty ", "SYSLOG_PID" : "15433", "_PID" : "15433", "_AUDIT_SESSION" : "50", "_SYSTEMD_CGROUP" : "/user.slice/user-0.slice/session-50.scope", "_SYSTEMD_SESSION" : "50", "_SYSTEMD_UNIT" : "session-50.scope", "_SOURCE_REALTIME_TIMESTAMP" : "1432210349692776" }\n{ "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10bb;b=4931b09af53f45f1b83fe32300ab7609;m=1648a3459e;t=51696752351b6;x=8bbc2fb7e2219d64", "__REALTIME_TIMESTAMP" : "1432210349707702", "__MONOTONIC_TIMESTAMP" : "95707940254", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_SYSTEMD_SLICE" : "system.slice", "_TRANSPORT" : "journal", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "SYSLOG_FACILITY" : "4", "SYSLOG_IDENTIFIER" : "systemd-logind", "_COMM" : "systemd-logind", "_EXE" : "/usr/lib/systemd/systemd-logind", "_CMDLINE" : "/usr/lib/systemd/systemd-logind", "_CAP_EFFECTIVE" : "4420002f", "_SYSTEMD_CGROUP" : "/system.slice/systemd-logind.service", "_SYSTEMD_UNIT" : "systemd-logind.service", "_SELINUX_CONTEXT" : "system_u:system_r:systemd_logind_t:s0", "CODE_FILE" : "../src/login/logind-session.c", "USER_ID" : "root", "CODE_LINE" : "660", "CODE_FUNCTION" : "session_finalize", "MESSAGE_ID" : "3354939424b4456d9802ca8333ed424a", "_HOSTNAME" : "rr24or", "_PID" : "429", "SESSION_ID" : "50", "LEADER" : "15433", "MESSAGE" : "Removed session 50.", "_SOURCE_REALTIME_TIMESTAMP" : "1432210349706418" }'
LOG_WITH_ONE_LINE = '{ "MESSAGE" : "Removed session 50.", "__CURSOR" : "s=a16de58e54a64618a703efd350407e90;i=10b9;b=4931b09af53f45f1b83fe32300ab7609;m=1639692e56;t=5169665e93a6e;x=ed01d0faf5cc1d2c", "__REALTIME_TIMESTAMP" : "1432210094242414", "__MONOTONIC_TIMESTAMP" : "95452474966", "_BOOT_ID" : "4931b09af53f45f1b83fe32300ab7609", "PRIORITY" : "6", "_UID" : "0", "_GID" : "0", "_CAP_EFFECTIVE" : "3fffffffff", "_MACHINE_ID" : "e784f69e5d1747ebb7f6e829df0b0a0b", "_TRANSPORT" : "syslog", "SYSLOG_FACILITY" : "10", "SYSLOG_IDENTIFIER" : "sshd", "_COMM" : "sshd", "_EXE" : "/usr/sbin/sshd", "_SELINUX_CONTEXT" : "system_u:system_r:sshd_t:s0-s0:c0.c1023", "_SYSTEMD_OWNER_UID" : "0", "_SYSTEMD_SLICE" : "user-0.slice", "_AUDIT_LOGINUID" : "0", "_HOSTNAME" : "rr24or", "_CMDLINE" : "sshd: root [priv] ", "SYSLOG_PID" : "15433", "_PID" : "15433", "_AUDIT_SESSION" : "50", "_SYSTEMD_CGROUP" : "/user.slice/user-0.slice/session-50.scope", "_SYSTEMD_SESSION" : "50", "_SYSTEMD_UNIT" : "session-50.scope", "_SOURCE_REALTIME_TIMESTAMP" : "1432210094240765" }'
log_analyzer = <plugins.pytest_loganalyzer.LogAnalyzer object>
test_check_log_duplicate_one_line()[source]
test_check_log_duplicates_handles_unicode()[source]
test_check_log_duplicates_no_dups()[source]
test_check_log_duplicates_non_exact_duplicates_are_ignored()[source]
test_check_log_duplicates_raises_assertion_error()[source]
test_ignore_finds_regexps()[source]

unittests.test_reporting_server

test_reporting_server.py

Unittests for reporting server functions

unittests.test_reporting_server.reporting_server()[source]
unittests.test_reporting_server.reporting_server_with_config(reporting_server)[source]
unittests.test_reporting_server.test_client_config(reporting_server)[source]

Verify that client config can be created and reports can be removed.

unittests.test_reporting_server.test_cmdproc(reporting_server_with_config)[source]

Verify that operation with cmdproc is work.

unittests.test_reporting_server.test_post(reporting_server_with_config)[source]

Verify that post command is True.

unittests.test_reporting_server.test_queue(reporting_server_with_config)[source]

Verify that operation with queue is working.

unittests.test_service_lib

test_service_lib.py

SystemD service library unittests

class unittests.test_service_lib.TestSpecificServiceManager(methodName='runTest')[source]

Bases: unittest.case.TestCase

setUp()[source]
test_list_is_not_present_in_SpecificServiceManager()[source]
test_start()[source]
test_stop_with_args()[source]
class unittests.test_service_lib.TestSystemd(methodName='runTest')[source]

Bases: unittest.case.TestCase

setUp()[source]
test_all_commands()[source]
class unittests.test_service_lib.TestSystemdServiceManager(methodName='runTest')[source]

Bases: unittest.case.TestCase

setUp()[source]
test_list()[source]
test_set_default_runlevel()[source]
test_start()[source]

unittests.test_staticcross

test_staticcross.py

Unittests for static cross functions

class unittests.test_staticcross.FakeDev1[source]

Bases: object

connect_port(port)[source]
disconnect_port(port)[source]
class unittests.test_staticcross.FakeDev2[source]

Bases: object

connect_port(port)[source]
disconnect_port(port)[source]
class unittests.test_staticcross.FakeOpts[source]

Bases: object

build_path = ''
env = ''
get_only = False
setup = 'setup.json'
unittests.test_staticcross.cross(request, monkeypatch)[source]
unittests.test_staticcross.test_staticcross_connect(cross)[source]
unittests.test_staticcross.test_staticcross_disconnect(cross)[source]
unittests.test_staticcross.test_staticcross_get_device(cross)[source]

unittests.test_switch_driver

test_switch_driver.py

Unittests for Switch Driver

class unittests.test_switch_driver.TestSwitchDriver[source]

Bases: object

test_autodetect_failure_raises_exception_when_name_not_found()[source]
test_autodetect_failure_raises_exception_when_no_path()[source]
test_autodetect_fm10kd()[source]

unittests.test_synapsert

test_synapsert.py

Unittests for synapsert functions

unittests.test_synapsert.synapsert(request)[source]

Return synapsert instant.

“–synapsert_config” option set path to synapsert config file, if not defined, current synapsert config will be use

unittests.test_synapsert.tc_issue(synapsert, tc_name)[source]
unittests.test_synapsert.tc_name(synapsert, request)[source]
unittests.test_synapsert.test_create_test_case(synapsert, tc_issue)[source]
unittests.test_synapsert.test_create_test_plan(synapsert, tp_issue)[source]
unittests.test_synapsert.test_get_custom_fields(synapsert)[source]

Verify if all customfields created.

unittests.test_synapsert.test_get_issue_type(synapsert)[source]

Verify if issue exists.

unittests.test_synapsert.test_get_suite_value(synapsert)[source]

Verify value of suites.

unittests.test_synapsert.test_get_tc_by_auto_tc_name(synapsert, tc_issue)[source]
unittests.test_synapsert.test_jira_exist(synapsert)[source]
unittests.test_synapsert.test_set_suite(synapsert, tc_issue)[source]
unittests.test_synapsert.test_transition(synapsert, tc_issue, tp_issue)[source]
unittests.test_synapsert.tp_issue(synapsert, tp_name)[source]
unittests.test_synapsert.tp_name(synapsert, request)[source]

unittests.test_ui_helpers

test_ui_helpers.py

Unittests for ui_helpers.py

class unittests.test_ui_helpers.TestLagHelpers[source]

Bases: object

create_ui(ui_onpss)[source]
test_is_lag_added()[source]
test_is_port_added_to_lag()[source]
unittests.test_ui_helpers.ui_onpss()[source]

unittests.test_ui_iss_cli

test_ui_iss_cli.py

ISS CLI UI wrappers.unittests

unittests.test_ui_onpss_jsonrpc

test_ui_onp_jsonrpc.py

Unittests for JSONRPC UI wrappers

class unittests.test_ui_onpss_jsonrpc.TestClientRequest[source]

Bases: object

test_multicall(ui, server)[source]

Verify that request() method sends and receives the JSON-RPC strings.

test_multicall_reply_with_error(ui, server)[source]

Verify UIException in case server is responding with error for multicall.

test_request(ui, server)[source]

Verify that request() method sends and receives the JSON-RPC strings.

test_response_with_error(ui, server)[source]

Verify UIException in case server is responding with error.

unittests.test_ui_onpss_jsonrpc.server(request)[source]

Fixture of environment with Json-Rpc server.

unittests.test_ui_onpss_jsonrpc.ui(request, server)[source]

Fixture of environment for unittests JSONRPC UI wrappers.

unittests.test_ui_onpss_shell

test_ui_onpss_shell.py

Unittests for UI ONPSS Shell

class unittests.test_ui_onpss_shell.OnpssRawOutput[source]

Bases: object

IP_SHOW_STATS_OUTPUT = ' 2: em1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP mode DEFAULT group default qlen 1000\n link/ether d4:c9:ef:52:7c:4d brd ff:ff:ff:ff:ff:ff\n RX: bytes packets errors dropped overrun mcast\n 2781945429 3202213 0 0 0 30131\n RX errors: length crc frame fifo missed\n 0 0 0 0 0\n TX: bytes packets errors dropped carrier collsns\n 646221183 2145799 0 0 0 0\n TX errors: aborted fifo window heartbeat\n 0 0 0 0\n '
NETWORKCTL_SAMPLE_1 = ' ● 6: enp0s29u1u7u1\n Type: ether\n State: degraded (configured)\n Path: pci-0000:00:1d.0-usb-0:1.7.1:1.0\n Driver: asix\n Vendor: ASIX Electronics Corp.\n HW Address: 0:60:63:43:78:95\n MTU: 1500\n Address: fe80::260:63ff:fe43:7895\n '
NETWORKCTL_SAMPLE_2 = ' ● 8: p1p1\n Link File: /usr/lib/systemd/network/99-default.link\n Network File: n/a\n Type: ether\n State: n/a (n/a)\n Path: pci-0000:02:00.0\n Driver: igb\n Vendor: Intel Corporation\n Model: I210 Gigabit Network Connection (Ethernet Server Adapter I210-T1)\n HW Address: a0:36:9f:5c:ff:8e\n MTU: 1500\n Address: 192.168.1.1\n fe80::a236:9fff:fe5c:ff8e\n '
RAW_ARP_OUTPUT = '10.20.30.40 dev sw0p5 lladdr 00:AA:AA:AA:AA:12 STALE\n192.168.10.60 dev sw0p6 lladdr 00:BB:BB:BB:BB:13 STALE\n192.168.10.65 dev sw0p5 lladdr 00:BB:BB:BB:BB:AA PERMANENT\n2001::2000 dev sw0p6 lladdr 00:12:12:12:12:12 STALE\n'
RAW_FDB_OUTPUT = '33:33:00:00:00:01 dev pep8 self permanent\n01:00:5e:00:00:01 dev pep8 self permanent\n33:33:ff:60:b4:14 dev pep8 self permanent\n33:33:00:00:00:01 dev pep4 self permanent\n33:33:00:00:00:01 dev enp0s20f0 self permanent\n33:33:00:00:00:01 dev enp0s20f1 self permanent\n33:33:00:00:00:01 dev enp0s20f2 self permanent\n33:33:00:00:00:01 dev enp0s20f3 self permanent\n33:33:00:00:00:01 dev p1p1 self permanent\n01:00:5e:00:00:01 dev p1p1 self permanent\n33:33:ff:5c:ff:fe dev p1p1 self permanent\n00:11:22:33:44:55 dev sw0p5 vlan 2 self permanent\n00:11:22:33:44:55 dev sw0p5 vlan 22 self permanent\n00:11:22:33:44:55 dev sw0p5 vlan 222 self \n00:11:22:33:44:55 dev sw0p5 vlan 2222 self permanent\n55:44:33:22:11:00 dev sw0p6 vlan 3 self permanent\n55:44:33:22:11:00 dev sw0p6 vlan 33 self permanent\n55:44:33:22:11:00 dev sw0p6 vlan 333 self \n55:44:33:22:11:00 dev sw0p6 vlan 3333 self permanent\n'
RAW_PING_OUTPUT = 'PATTERN: 0xff\nPING 192.168.10.2 (192.168.10.2) from 192.168.10.1 sw0p0: 56(84) bytes of data.\n64 bytes from 192.168.10.2: icmp_seq=1 ttl=64 time=0.647 ms\nTS: 3812901 absolute\n 64484186\n 0\n -64484185\n\n64 bytes from 192.168.10.2: icmp_seq=1 ttl=64 time=0.542 ms\nTS: 3813902 absolute\n 64484185\n 0\n -64484184\n\n64 bytes from 192.168.10.2: icmp_seq=1 ttl=64 time=0.544 ms\nTS: 3814902 absolute\n 64484185\n 0\n -64484184\n\n--- 192.168.10.2 ping statistics ---\n3 packets transmitted, 3 received, 0% packet loss, time 4000ms\nrtt min/avg/max/mdev = 0.525/0.562/0.647/0.048 ms'
RAW_VLAN_OUTPUT = 'port\tvlan ids\nsw0p1\tNone\nsw0p2\tNone\nsw0p3\tNone\nsw0p4\tNone\nsw0p5\t 2\n\t 3 Egress Untagged\n\t 33\n\t 222 PVID\n\t 2222\n\t 3333\n\nsw0p6\t 3\n\t 33\n\t 333 Egress Untagged\n\t 3333 PVID\n\nsw0p7\tNone\nsw0p8\tNone\nsw0p9\tNone\nsw0p10\tNone\nsw0p11\tNone\nsw0p12\t44 PVID Egress Untagged\nsw0p13\tNone\nsw0p14\tNone\nsw0p15\tNone\nsw0p16\tNone\nsw0p19\tNone\nsw0p20\tNone\nsw0p21\tNone\nsw0p22\tNone\nsw0p23\tNone\nsw0p24\tNone\nsw0p25\tNone\nsw0p0\tNone\nteam25\t 4\n\t 44\n\t 444 Egress Untagged\n\t 4444 PVID\n\n'
SWITCH_ATTRIBUTES = 'autoneg\x00bcast_capacity\x00bcast_flooding\x00bcast_pruning\x00bcast_rate\x00def_cfi\x00'
class unittests.test_ui_onpss_shell.TestAbstractError[source]

Bases: object

test_access_error()[source]
test_boundary_error()[source]
class unittests.test_ui_onpss_shell.TestCliSendCommand[source]

Bases: object

create_ui(ui)[source]
test_cli_send_command_handles_ssh_no_exit_status()[source]
test_cli_send_command_passes_with_expected_rc_int()[source]
test_cli_send_command_passes_with_expected_rc_set()[source]
test_cli_send_command_raises_exception_when_unexpected_rc()[source]
test_cli_send_command_raises_exception_when_unexpected_rc_defaults()[source]
class unittests.test_ui_onpss_shell.TestCreateMatchApi[source]

Bases: object

create_ui(ui)[source]
test_create_match_api_rule()[source]
test_create_match_api_tcam_subtable()[source]
test_get_rules_match_api()[source]
test_get_table_match_api()[source]
class unittests.test_ui_onpss_shell.TestGenerateVlanCommand[source]

Bases: object

create_ui(ui)[source]
test_generate_bridge_vlan_commands_add_multiport_singlevlan()[source]
test_generate_bridge_vlan_commands_add_singleport_multiplevlan()[source]
test_generate_bridge_vlan_commands_del_multiport_multivlan()[source]
test_generate_bridge_vlan_commands_del_singleport_singlevlan()[source]
class unittests.test_ui_onpss_shell.TestGetTableBridgeInfo[source]

Bases: object

create_ui(ui)[source]
test_lookup_port_when_int()[source]
test_param_is_not_none()[source]
test_raises_exception_when_no_port_and_no_mgmt_ports()[source]
test_use_port_itself_when_string()[source]
test_uses_mgmt_port_when_no_port_specified()[source]
class unittests.test_ui_onpss_shell.TestImportHWModule[source]

Bases: object

test_cpu_rate_limiting_fm10k()[source]
test_cpu_rate_limiting_fm6k()[source]
class unittests.test_ui_onpss_shell.TestInvalidPort[source]

Bases: object

create_ui(ui)[source]
test_list_of_ports()[source]
test_no_clobber()[source]
test_number_of_ports()[source]
class unittests.test_ui_onpss_shell.TestNetworkCtl[source]

Bases: object

create_ui(ui)[source]
test_get_ufd_networkctl_status_1()[source]
test_get_ufd_networkctl_status_2()[source]
class unittests.test_ui_onpss_shell.TestPortConfigSnapshot[source]

Bases: object

create_ui(ui)[source]
class unittests.test_ui_onpss_shell.TestPortMapping[source]

Bases: object

create_ui(ui)[source]
static my_side_effect(*args, **kwargs)[source]
test_nametoportidmap()[source]
test_nametoportidmap_adding_new_active_lag()[source]
test_nametoportidmap_removing_active_lag()[source]
test_portmap()[source]
test_portmap_adding_new_active_lag()[source]
test_portmap_removing_active_lag()[source]
class unittests.test_ui_onpss_shell.TestStaticMacs[source]

Bases: object

test_create_static_mac_raises_uiexcepion_if_not_in_device()[source]
test_delete_static_mac_raises_uiexcepion_if_not_in_device()[source]
ui = <testlib.ui_onpss_shell.ui_onpss_shell.UiOnpssShell object>
unittests.test_ui_onpss_shell.multicall_ports_side_effect(*args, **kwargs)[source]
unittests.test_ui_onpss_shell.ports_side_effect(*args, **kwargs)[source]
unittests.test_ui_onpss_shell.test_cli_get_all(ui)[source]
unittests.test_ui_onpss_shell.test_get_icmp_ping_result(ui)[source]
unittests.test_ui_onpss_shell.test_get_table_arp(ui)[source]
unittests.test_ui_onpss_shell.test_get_table_fdb(ui)[source]
unittests.test_ui_onpss_shell.test_get_table_lags(ui)[source]
unittests.test_ui_onpss_shell.test_get_table_ports2lag(ui)[source]
unittests.test_ui_onpss_shell.test_get_table_ports2vlans(ui)[source]
unittests.test_ui_onpss_shell.test_modify_vlan_ports_pvid(ui)[source]
unittests.test_ui_onpss_shell.test_modify_vlan_ports_tagged(ui)[source]
unittests.test_ui_onpss_shell.test_parse_ip_show_stats()[source]
unittests.test_ui_onpss_shell.ui()[source]
unittests.test_ui_onpss_shell.vlan_ports_side_effect(*args, **kwargs)[source]

unittests.test_ui_xmlrpc

test_ui_xmlrpc.py

XMLRPC UI wrappers.unittests

class unittests.test_ui_xmlrpc.TestInvalidPorts[source]

Bases: object

create_ui(ui)[source]
test_create_invalid_ports_multiple_port()[source]
test_create_invalid_ports_port_ids()[source]
unittests.test_ui_xmlrpc.ui()[source]

unittests.test_ui_xmlrpc

test_ui_xmlrpc.py

XMLRPC UI wrappers.unittests

class unittests.test_ui_xmlrpc.TestInvalidPorts[source]

Bases: object

create_ui(ui)[source]
test_create_invalid_ports_multiple_port()[source]
test_create_invalid_ports_port_ids()[source]
unittests.test_ui_xmlrpc.ui()[source]

unittests.linux.test_commands

test_commands.py

Command helpers Unittests

class unittests.linux.test_commands.TestCmdHelperBasic[source]

Bases: object

test_copy()[source]
test_empty_argparse_raises_exception(*args, **kwargs)[source]
test_get()[source]
test_inits()[source]
test_iter()[source]
test_update()[source]
test_validate_dict()[source]
class unittests.linux.test_commands.TestCommandHelper[source]

Bases: object

test_cmd_helper_mkdir_kwarg_unset()[source]
test_cmd_helper_mkdir_kwargs_extend()[source]
test_cmd_helper_mkdir_kwargs_set()[source]
test_cmd_helper_mkdir_kwargs_update()[source]
test_cmd_helper_mkdir_merge()[source]
test_cmd_helper_start_empty()[source]
test_cmd_helper_start_ok()[source]
test_cmd_helper_start_unknown()[source]
unittests.linux.test_commands.pytest_raises(exc_iter)[source]

unittests.linux.test_iperf

test_iperf.py

IPerfRunner Unittests

class unittests.linux.test_iperf.TestIPerfParser[source]

Bases: object

test_multi_thread_client_output()[source]
test_multi_thread_server_output()[source]
test_single_thread_client_output()[source]
test_single_thread_client_output_gbytes_format()[source]
test_single_thread_server_output()[source]
test_single_thread_server_output_gbits_format()[source]
class unittests.linux.test_iperf.TestIperf[source]

Bases: object

classmethod _parse_linux_cmd(cmd_str, name=None)[source]

Extract the command arguments from a linux command line string.

host = '127.0.0.1'
password = 'admin'
runner()[source]
test_iperf_runner_get_results(runner)[source]
test_iperf_runner_parse(runner)[source]
test_iperf_runner_parse_gbytes_format(runner)[source]
test_iperf_runner_start_client_command(runner)[source]
test_iperf_runner_start_client_empty(runner)[source]
test_iperf_runner_start_client_kwargs(runner)[source]
test_iperf_runner_start_client_with_options(runner)[source]
test_iperf_runner_start_empty(runner)[source]
test_iperf_runner_start_server_and_client(runner)[source]
test_iperf_runner_start_server_command(runner)[source]
test_iperf_runner_start_server_command_kwargs(runner)[source]
test_iperf_runner_start_server_empty(runner)[source]
test_iperf_runner_start_server_kwargs(runner)[source]
test_iperf_runner_start_server_with_options(runner)[source]
test_iperf_runner_start_unknown(runner)[source]
user = 'admin'

unittests.linux.test_networkd

test_networkd.py

NetworkD Unittests

class unittests.linux.test_networkd.TestNetworkD[source]

Bases: object

test_empty_list()[source]
test_extra_excludes_are_appended()[source]
test_just_extra_excludes()[source]
test_multiple_mgmt_port()[source]
test_single_mgmt_port()[source]

unittests.linux.test_parser

test_parser.py

Parse tools Unittests

class unittests.linux.test_parser.TestParser[source]

Bases: object

HOST_LEXER = <pygments.lexers.HostLexer>
HOST_PARSER = <testlib.linux.suricata.parser.HostParser object>
test_lexer_lexem_error()[source]
test_lexer_lexem_ok()[source]
test_lexer_tokenizer()[source]
test_parser_semantics_fail_duplicates()[source]
test_parser_semantics_ok()[source]
test_parser_synax_error_unexpected_token()[source]
test_parser_syntax_error_tokens_overflow()[source]
test_parser_syntax_error_tokens_underflow()[source]
test_parser_syntax_ok()[source]

unittests.linux.test_tool_general

test_tool_generate.py

ToolGeneral Unittests

class unittests.linux.test_tool_general.CmdExecSimul(cmd_exec_sim, cycle=False)[source]

Bases: object

Simulate clissh (blackbox).

Specify commands behavior - an (sequence of) input command(s) to output(s) including side effect(s).

exception InputCommandNoMatch(command)[source]

Bases: Exception

CmdExecSimul.MAKE_ITER_MAP = [<built-in function iter>, <class 'itertools.cycle'>]
CmdExecSimul.__call__(*args, **kwargs)[source]

Any exception raised is considered an expected behavior (side_effect).

The mocked signature: exec_command(command, timeout=None)

CmdExecSimul.set_simul(cmd_exec_sim, cycle=False)[source]
class unittests.linux.test_tool_general.FakeLinuxHost(config=None, opts=None)[source]

Bases: testlib.dev_linux_host.GenericLinuxHost

FAKE_CFG = {'ssh_pass': 'fake_pass', 'instance_type': 'generic_linux_host', 'ssh_user': 'fake_user', 'ipaddr': 'localhost', 'name': 'FakeHost', 'id': 'FakeID'}
class FakeOpts[source]

Bases: object

env = 'fake.env.json'
gen_only = False
lhost_ui = 'linux_bash'
setup = 'fake.setup.json'
class unittests.linux.test_tool_general.FakeSSH(*args, **kwargs)[source]

Bases: testlib.clissh.CLISSH

exec_command(*args, **kwargs)[source]
login_status = True
shell_read(*args, **kwargs)[source]
class unittests.linux.test_tool_general.MockSSH(*args, **kwargs)[source]

Bases: unittests.linux.test_tool_general.FakeSSH

class unittests.linux.test_tool_general.SimulatedSSH(*args, **kwargs)[source]

Bases: unittests.linux.test_tool_general.MockSSH, unittests.linux.test_tool_general.CmdExecSimul

class unittests.linux.test_tool_general.TestToolGeneral[source]

Bases: object

ARG_SYSTEMCTL_STOP = 'systemctl stop generic_tool.service'
RET_VAL_INACTIVE = {'return_value': CmdStatus(stdout='stdout', stderr='stderr', rc=5)}
test_start_with_prefix(lh, gen_tool, tool)[source]
test_stop_doesnt_raises_when_ignore_true(lh, gen_tool, tool)[source]
test_stop_raises_when_ignore_false(lh, gen_tool, tool)[source]
test_stop_succeds_when_no_exception(gen_tool)[source]
unittests.linux.test_tool_general.gen_tool(request, lh)[source]
unittests.linux.test_tool_general.lh(request, patch_clissh_sim)[source]
unittests.linux.test_tool_general.patch_clissh_sim(monkeypatch)[source]
unittests.linux.test_tool_general.systemctl(gen_tool)[source]
unittests.linux.test_tool_general.tool(gen_tool, systemctl)[source]

unittests.switches.conftest

conftest.py

Test switches common options

unittests.switches.conftest.pytest_addoption(parser)[source]

TAF specific options.

unittests.switches.test_switches

test_switches.py

Switch’s unittests

unittests.switches.test_switches.switch(request)[source]
unittests.switches.test_switches.test_check_app_table_1(switch)[source]

Test check_app_table function.

unittests.switches.test_switches.test_check_app_table_2(switch)[source]

Test check_app_table function negative.

unittests.switches.test_switches.test_check_app_table_3(switch)[source]

Test check_app_table function negative.

unittests.switches.test_switches.test_clearconfig_1(switch)[source]

Test clearconfig function negative.

unittests.switches.test_switches.test_clearconfig_2(switch)[source]

Test clearconfig function.

unittests.switches.test_switches.test_delprop_row(switch)[source]

Test delprop_row function.

unittests.switches.test_switches.test_existsprop(switch)[source]

Test existsprop function.

unittests.switches.test_switches.test_findprop(switch)[source]

Test findprop function.

unittests.switches.test_switches.test_get_port_for_probe(switch)[source]

Test _get_port_for_probe function.

unittests.switches.test_switches.test_get_speed_ports_1(switch)[source]

Test _get_speed_ports function if ‘ports’ in config.

unittests.switches.test_switches.test_get_speed_ports_2(switch)[source]

Test _get_speed_ports function if ‘port_list’ in config.

unittests.switches.test_switches.test_get_speed_ports_3(switch)[source]

Test _get_speed_ports function if ‘ports’ and ‘port_list’ in config.

unittests.switches.test_switches.test_get_speed_ports_4(switch)[source]

Test _get_speed_ports function if ‘ports_map’ in config.

unittests.switches.test_switches.test_getprop(switch)[source]

Test getprop function.

unittests.switches.test_switches.test_getprop_field_info(switch)[source]

Test getprop_field_info function.

unittests.switches.test_switches.test_getprop_method_help_1(switch)[source]

Test getprop_method_help function.

unittests.switches.test_switches.test_getprop_method_help_2(switch)[source]

Test getprop_method_help function negative.

unittests.switches.test_switches.test_getprop_row(switch)[source]

Test getprop_row function.

unittests.switches.test_switches.test_getprop_size(switch)[source]

Test getprop_size function.

unittests.switches.test_switches.test_getprop_table(switch)[source]

Test getprop_table function.

unittests.switches.test_switches.test_getprop_table_info(switch)[source]

Test getprop_table_info function.

unittests.switches.test_switches.test_multicall_1(switch)[source]

Test multicall function.

unittests.switches.test_switches.test_multicall_2(switch)[source]

Test multicall function negative.

unittests.switches.test_switches.test_multicall_3(switch)[source]

Test multicall function negative.

unittests.switches.test_switches.test_probe_1(switch)[source]

Test probe function negative.

unittests.switches.test_switches.test_probe_2(switch)[source]

Test probe function.

unittests.switches.test_switches.test_set_app_log_level_1(switch)[source]

Test set_app_log_level function.

unittests.switches.test_switches.test_set_app_log_level_2(switch)[source]

Test set_app_log_level function negative.

unittests.switches.test_switches.test_setprop(switch)[source]

Test setprop function.

unittests.switches.test_switches.test_setprop_row(switch)[source]

Test setprop_row function.

unittests.switches.test_switches.test_switch_init(switch)[source]
unittests.switches.test_switches.test_waitoff_1(switch)[source]

Test waitoff function.

unittests.switches.test_switches.test_waitoff_2(switch)[source]

Test waitoff function negative.

unittests.switches.test_switches.test_waiton_1(switch)[source]

Test waiton function.

unittests.switches.test_switches.test_waiton_2(switch)[source]

Test waiton function negative.

unittests.test_plugins.test_multiple_run

test_multiple_run.py

Unittests for pytest_multiple_run plugin

class unittests.test_plugins.test_multiple_run.TestPluginMultipleRun[source]

Bases: object

test_rerun_all_test_suite(testdir)[source]

Verify that can rerun 3 times test module.

test_rerun_extra_keyword_1(testdir)[source]

Verify that can rerun 3 times only matched extra keywords(option “-k”).

test_rerun_extra_keyword_2(testdir)[source]

Verify that can rerun 3 times only matched extra keywords(option “-k”).

test_rerun_failed_test(testdir)[source]

Verify that can rerun failed test 7 times.

test_rerun_parametrizing_tests(testdir)[source]

Verify that can rerun parametrizing test 2 times.

test_rerun_passed_test(testdir)[source]

Verify that can rerun success test 7 times.

test_rerun_skipped_test(testdir)[source]

Verify that can rerun skipped test 5 times.

test_rerun_test_with_collectonly_option(testdir)[source]

Verify that only collects tests, don’t executes them.

test_rerun_test_with_exitfirst_option(testdir)[source]

Verify that exits instantly on first error or failed test.

test_rerun_test_with_fixture(testdir)[source]

Verify that can rerun test 2 times with fixture.

test_rerun_test_with_own_mark(testdir)[source]

Verify that can rerun test 2 times with own mark.

test_rerun_test_with_runxfail_option(testdir)[source]

Verify that can rerun test 5 times with mark xfail and option runxfail.

test_rerun_test_with_skipif_mark(testdir)[source]

Verify that can rerun test 5 times with mark skipif.

test_rerun_test_with_xfail_mark(testdir)[source]

Verify that can rerun test 5 times with mark xfail.

unittests.test_plugins.test_multipletg

test_multipletg.py

Unittests for pytest_multipletg plugin

class unittests.test_plugins.test_multipletg.EnvTest(setup, env)[source]

Bases: object

start(request, monkeypatch)[source]
class unittests.test_plugins.test_multipletg.FakeOpts[source]

Bases: object

build_path = ''
env = ''
get_only = True
setup = 'setup.json'
ui = 'ons_xmlrpc'
unittests.test_plugins.test_multipletg.env(request, monkeypatch)[source]

Fixture of environment with LXC for unittests of methods get_ports and get_speed.

unittests.test_plugins.test_multipletg.env_complex(request, monkeypatch)[source]

Fixture of environment with LXC for unittests of methods get_ports and get_speed.

unittests.test_plugins.test_multipletg.env_list(request, monkeypatch)[source]

Fixture of environment with LXC for unittests of methods get_ports and get_speed.

unittests.test_plugins.test_multipletg.test_multipletg_1(request, env)[source]

Verify pytest_multipletg plugin creates MultipleTG instance and modifies env.tg dict.

unittests.test_plugins.test_multipletg.test_multipletg_2(request, env)[source]

Verify pytest_multipletg plugin modifies cross section.

unittests.test_plugins.test_multipletg.test_multipletg_3(request, env)[source]

Verify pytest_multipletg plugin restores env.tg dict on teardown.

unittests.test_plugins.test_multipletg.test_multipletg_4(request, env)[source]

Verify pytest_multipletg plugin restores cross section on teardown.

unittests.test_plugins.test_multipletg.test_multipletg_5(request, env_list)[source]

Verify pytest_multipletg plugin creates port_list if it is in setup.

unittests.test_plugins.test_multipletg.test_multipletg_6(request, env_complex)[source]

Verify pytest_multipletg plugin modifies cross section in complex setup.

unittests.test_plugins.test_pidchecker

test_pidchecker.py

Unittests for pytest_pidchecker.py

class unittests.test_plugins.test_pidchecker.Config(pidchecker)[source]

Bases: object

__call__(x)[source]
class unittests.test_plugins.test_pidchecker.FakeItem(monkeypatch, pidchecker)[source]

Bases: object

class unittests.test_plugins.test_pidchecker.FakeOpts[source]

Bases: object

__init__()[source]
class unittests.test_plugins.test_pidchecker.GetMarker[source]

Bases: object

__call__(x)[source]
class unittests.test_plugins.test_pidchecker.GetProc[source]

Bases: object

__call__(x, skip_prcheck=None)[source]
class unittests.test_plugins.test_pidchecker.Option(pidchecker)[source]

Bases: object

class unittests.test_plugins.test_pidchecker.Setup[source]

Bases: object

__call__(x)[source]
unittests.test_plugins.test_pidchecker.test_pidchecker_setup_false(monkeypatch)[source]
unittests.test_plugins.test_pidchecker.test_pidchecker_setup_true(monkeypatch)[source]
unittests.test_plugins.test_pidchecker.test_pidchecker_teardown_false(monkeypatch)[source]
unittests.test_plugins.test_pidchecker.test_pidchecker_teardown_true(monkeypatch)[source]

unittests.test_plugins.test_pytest_skip_filter

test_skip_filter.py

Unittests for pytest_skip_filter plugin

class unittests.test_plugins.test_pytest_skip_filter.TestPluginSkipFilter[source]

Bases: object

class Param(name, file_text, counts)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, name, file_text, counts)

Create new instance of Param(name, file_text, counts)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values.

classmethod _make(iterable, new=<built-in method __new__ of type object at 0xa395c0>, len=<built-in function len>)

Make a new Param object from a sequence or iterable

_replace(_self, **kwds)

Return a new Param object replacing specified fields with new values

counts

Alias for field number 2

file_text

Alias for field number 1

name

Alias for field number 0

TestPluginSkipFilter.argnames = ('name', 'file_text', 'counts')
TestPluginSkipFilter.argvalues = [Param(name='test_skip_test', file_text='\n import pytest\n\n def test_fail():\n assert 0\n\n def test_pass():\n assert 1\n\n @pytest.mark.skipif("True", reason="Skip")\n def test_skip():\n assert 1\n ', counts=[1, 0, 1]), Param(name='test_skip_in_class', file_text='\n import pytest\n\n class TestClass(object):\n def test_fail(self):\n assert 0\n\n def test_pass(self):\n assert 1\n\n @pytest.mark.skipif("True", reason="Skip")\n def test_skip(self):\n assert 1\n ', counts=[1, 0, 1]), Param(name='test_double_skip_in_class', file_text='\n import pytest\n\n class TestClass(object):\n def test_fail(self):\n assert 0\n\n def test_pass(self):\n assert 1\n\n @pytest.mark.skipif("True", reason="Skip")\n @pytest.mark.skipif("False", reason="Skip")\n def test_skip(self):\n assert 1\n ', counts=[1, 0, 1]), Param(name='test_double_skip_class_level_1', file_text='\n import pytest\n\n @pytest.mark.skipif("True", reason="Skip")\n class TestClass(object):\n def test_fail(self):\n assert 0\n\n def test_pass(self):\n assert 1\n\n @pytest.mark.skipif("False", reason="Skip")\n def test_skip(self):\n assert 1\n ', counts=[0, 0, 0]), Param(name='test_double_skip_class_level_2', file_text='\n import pytest\n\n @pytest.mark.skipif("True", reason="Skip")\n @pytest.mark.skipif("False", reason="Skip")\n class TestClass(object):\n def test_fail(self):\n assert 0\n\n def test_pass(self):\n assert 1\n\n def test_skip(self):\n assert 1\n ', counts=[0, 0, 0]), Param(name='test_parametrize_skip_1', file_text='\n import pytest\n\n class TestClass(object):\n def test_fail(self):\n assert 0\n\n def test_pass(self):\n assert 1\n\n @pytest.mark.skipif("True", reason="Skip")\n @pytest.mark.parametrize("a", [1,2])\n def test_skip(self, a):\n assert a == 2\n ', counts=[1, 0, 1]), Param(name='test_parametrize_skip_2', file_text='\n import pytest\n\n class TestClass(object):\n def test_fail(self):\n assert 0\n\n def test_pass(self):\n assert 1\n\n @pytest.mark.parametrize("a", [1,pytest.mark.skipif("True", reason="Skip")(2)])\n def test_skip(self, a):\n assert a == 2\n ', counts=[1, 0, 2]), Param(name='test_parametrize_skip_3', file_text='\n import pytest\n\n class TestClass(object):\n def test_fail(self):\n assert 0\n\n def test_pass(self):\n assert 1\n\n @pytest.mark.skipif("True", reason="Skip")\n @pytest.mark.parametrize("a", [1,pytest.mark.skipif("False", reason="Skip")(2)])\n def test_skip(self, a):\n assert a == 2\n ', counts=[1, 0, 1]), Param(name='test_parametrize_skip_4', file_text='\n import pytest\n\n @pytest.mark.skipif("True", reason="Skip")\n class TestClass(object):\n def test_fail(self):\n assert 0\n\n def test_pass(self):\n assert 1\n\n @pytest.mark.parametrize("a", [1,pytest.mark.skipif("False", reason="Skip")(2)])\n def test_skip(self, a):\n assert a == 2\n ', counts=[0, 0, 0])]
TestPluginSkipFilter.ids = ['test_skip_test', 'test_skip_in_class', 'test_double_skip_in_class', 'test_double_skip_class_level_1', 'test_double_skip_class_level_2', 'test_parametrize_skip_1', 'test_parametrize_skip_2', 'test_parametrize_skip_3', 'test_parametrize_skip_4']
TestPluginSkipFilter.test_boolean_condition(testdir, name, file_text, counts)[source]

Test if we can use boolean conditions instead of string conditions.

TestPluginSkipFilter.test_boolean_config_getoption(testdir, name, file_text, counts)[source]

Test if we can use pytest.config.getoption in a non-string condition.

TestPluginSkipFilter.test_string_condition(testdir, name, file_text, counts)[source]

unittests.test_plugins.test_tc_duration

test_tc_duration.py

Unittests for pytest_test_duration plugin

class unittests.test_plugins.test_tc_duration.TestCountTiming[source]

Bases: object

Class verifies count and timing parameters of test duration.

test_count()[source]

Verify that test ends after value of count when count is passed.

test_count_float()[source]

Verify that test ends after integer value of count when float value of count is passed.

test_count_less_than_timing()[source]

Verify that test ends after value of count when time of count less than timing is passed.

test_count_negative()[source]

Verify that test ends after 1 count when negative value of count is passed.

test_count_when_timing_zero()[source]

Verify that test ends after value of count when zero value of timing is passed.

test_count_zero()[source]

Verify that test ends after 1 count when zero value of count is passed.

test_no_parameter()[source]

Verify that test ends after 1 count when no parameter is passed.

test_nonexistent_time()[source]

Verify that test ends after 1 count when timing by nonexistent time is passed.

test_timing_float()[source]

Verify that test ends after value of timing when float value of timing is passed.

test_timing_less_than_count()[source]

Verify that test ends after value of timing when timing less than time of count is passed.

test_timing_negative()[source]

Verify that test ends after 1 count when negative value of timing is passed.

test_timing_second()[source]

Verify that test ends after value of timing when timing is passed.

test_timing_when_count_zero()[source]

Verify that test ends after value of timing when zero value of count is passed.

test_timing_zero()[source]

Verify that test ends after 1 count when zero value of timing is passed.

class unittests.test_plugins.test_tc_duration.TestOptionCountTiming[source]

Bases: object

Class verifies count, timing and option parameters of test duration.

test_count_less_than_option()[source]

Verify that test ends after value of count when time of count less than option is passed.

test_count_less_than_option_and_timing()[source]

Verify that test ends after value of count when count less than option and timing is passed.

test_count_when_option_zero()[source]

Verify that test ends after value of count when zero value of option is passed.

test_count_when_timing_and_option_zero()[source]

Verify that test ends after value of count when zero value of option and timing is passed.

test_option_float()[source]

Verify that test ends after value of option when float value of option is passed.

test_option_less_than_count()[source]

Verify that test ends after value of option when option less than time of count is passed.

test_option_less_than_timing()[source]

Verify that test ends after value of option when option less than timing is passed.

test_option_less_timing_and_count()[source]

Verify that test ends after value of option when option less than timing and count is passed.

test_option_negative()[source]

Verify that test ends after 1 count when negative value of option is passed.

test_option_second()[source]

Verify that test ends after value of option when option is passed.

test_option_when_count_zero()[source]

Verify that test ends after value of option when zero value of count is passed.

test_option_when_timing_and_count_zero()[source]

Verify that test ends after value of option when zero value of timing and count is passed.

test_option_when_timing_zero()[source]

Verify that test ends after value of option when zero value of timing is passed.

test_option_zero()[source]

Verify that test ends after 1 count when zero value of option is passed.

test_timing_less_than_option()[source]

Verify that test ends after value of timing when timing less than option is passed.

test_timing_less_than_option_and_count()[source]

Verify that test ends after value of timing when timing less than option and count is passed.

test_timing_when_option_and_count_zero()[source]

Verify that test ends after value of timing when zero value of option and count is passed.

test_timing_when_option_zero()[source]

Verify that test ends after value of timing when zero value of option is passed.

unittests.test_plugins.test_tempest

test_tempest.py

Unittests for pytest_tempest.py.

unittests.test_plugins.test_tempest.test_init_tempest()[source]

unittests.traffic_generator.conftest

conftest.py

Configuration for traffic generator unittests

class unittests.traffic_generator.conftest.FakeOpts[source]

Bases: object

unittests.traffic_generator.conftest.pytest_addoption(parser)[source]
unittests.traffic_generator.conftest.tg(request, traffic_generator)[source]
unittests.traffic_generator.conftest.traffic_generator(request)[source]

unittests.traffic_generator.packet_constants

packet_constants.py

Packet constants

unittests.traffic_generator.test_ixnet

test_ixnet.py

Unittests for IxNetwork

class unittests.traffic_generator.test_ixnet.Tg(*args, **kwargs)[source]

Bases: testlib.Ixia.IxiaHLT.IxiaHLTMixin, testlib.packet_processor.PacketProcessor

DEFAULT_MAX_SNIFF_TIME = 3600
static _get_port_to_string(iface)[source]

Simple helper which allows to get string representation for interface

Parameters:iface (list) – Which IXIA interface to use for packet sending (list in format [chassis_id, card_id, port_id])
Returns:string in format “chassis_id/card_id/port_id”
Return type:str
_get_speed_ports(args)[source]

Get ports with speed from config.

Notes

This function check if port has speed in config file.

Returns:List of ports used in real config
_set_filter_params(layer)[source]

Configures filter parameters for specified layer.

Parameters:Layer (str) – compatible with pypacker “layers”
Returns:None
class_logger = <logging.LoggerAdapter object>
connect_hal()[source]

Logs in to IXIA and takes ports ownership.

Returns:None
disconnect_hal(mode='fast')[source]

Logs out from IXIA and clears ports ownership.

Returns:None
start_sniff(ifaces, sniffing_time=None, packets_count=0, filter_layer=None, src_filter=None, dst_filter=None)[source]

Starts sniffing on specified interfaces.

Parameters:
  • ifaces (list) – List of TG interfaces for capturing.
  • sniffing_time (int) – Time in seconds for sniffing.
  • packets_count (int) – Count of packets to sniff (no count limitation in case 0).
  • filter_layer (str) – Name of predefined sniffing filter criteria.
  • src_filter (str) – Sniff only packet with defined source MAC.
  • dst_filter (str) – Sniff only packet with defined destination MAC.
Returns:

None

Notes

This method introduces additional 1.5 seconds timeout after capture enabling. It’s required by Ixia sniffer to wait until capturing is started.

Examples:

env.tg[1].start_sniff(['eth0', ], filter_layer='IP', src_filter='00:00:00:01:01:01', dst_filter='00:00:00:22:22:22')
stop_sniff(ifaces, force=False, drop_packets=False, sniff_packet_count=1000)[source]

Stops sniffing on specified interfaces and returns captured data.

Parameters:
  • ifaces (list) – List of interfaces where capturing has to be stopped.
  • force (bool) – Stop capturing even if time or packet count criteria isn’t achieved.
  • drop_packets (bool) – Don’t return sniffed data (used in case you need just read statistics).
  • sniff_packet_count (int) – Default number of packets to return (used to avoid test hanging in case storm).
Returns:

Dictionary where key = interface name, value = list of sniffed packets.

Return type:

dict

unittests.traffic_generator.test_ixnet._get_port_to_string(iface)[source]

Simple helper which allows to get string representation for interface.

Parameters:iface (list) – Which IXIA interface to use for packet sending (list in format [chassis_id, card_id, port_id])
Returns:string in format “chassis_id/card_id/port_id”
Return type:str
unittests.traffic_generator.test_ixnet._get_speed_ports()[source]

Get ports with speed from config.

Notes

This function check if port has speed in config file.

Returns:List of ports used in real config
Return type:list
unittests.traffic_generator.test_ixnet.test_connect_disconnect(request)[source]
unittests.traffic_generator.test_ixnet.test_stp(request)[source]

unittests.traffic_generator.test_pp

test_pp.py

Packet Processor’s unittests

class unittests.traffic_generator.test_pp.TestPacketProcessor[source]

Bases: object

test_assembling_dot1q_icmp_packet_1(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet.

test_assembling_dot1q_icmp_packet_10(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet. Duplicate fragment 2.

test_assembling_dot1q_icmp_packet_11(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet. Two packets.

test_assembling_dot1q_icmp_packet_12(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet. Four packets (fragmented and not).

test_assembling_dot1q_icmp_packet_2(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet. Overlapped fragments.

test_assembling_dot1q_icmp_packet_3(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet. Missed fragment 1.

test_assembling_dot1q_icmp_packet_4(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet. Wrong frag in Fragment 1.

test_assembling_dot1q_icmp_packet_5(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet. Wrong flags in Fragment 1.

test_assembling_dot1q_icmp_packet_6(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet. Missed last fragment.

test_assembling_dot1q_icmp_packet_7(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet. Wrong frag in last fragment.

test_assembling_dot1q_icmp_packet_8(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet. Wrong flags in last fragment.

test_assembling_dot1q_icmp_packet_9(tg)[source]

Check assembling of Dot1Q.ICMP fragmented packet. Missed fragment after 2.

test_check_packet_field_1(tg)[source]
test_get_lcount(tg)[source]

Verify that method get_lcount returns correct count of layers.

test_get_lfield(tg)[source]

Verify that method get_lfield returns correct value.

test_get_packet_field_1(tg)[source]

Verify that method get_packet_field returns correct value(1).

test_get_packet_field_2(tg)[source]

Verify that method get_packet_field returns correct value(2).

test_get_packet_field_3(tg)[source]

Verify that method get_packet_field returns correct value(3).

test_get_packet_field_4(tg)[source]

Verify that method get_packet_field returns correct value(4).

test_get_packet_field_negative_1(tg)[source]

Verify that method get_packet_field returns Error message when layer is not defined in packet(1).

test_get_packet_field_negative_2(tg)[source]

Verify that method get_packet_field returns Error message when layer is not defined in packet(2).

test_get_packet_field_negative_3(tg)[source]

Verify that method get_packet_field returns Error message when field is not defined in packet(1).

test_packet_with_empty_layer(tg)[source]

Test building packet with empty layer.

test_pproc_packet_field_setting(tg)[source]

Check packet field setting.

test_rechecksum(tg)[source]

Verify that method rechecksum returns correct value.

test_set_field(tg)[source]

Verify that method set_field sets correct field value.

unittests.traffic_generator.test_tg

test_tg.py

Traffic generator’s unittests

class unittests.traffic_generator.test_tg.TestTGs[source]

Bases: object

pytestmark = [<MarkDecorator 'unittests' {'args': (), 'kwargs': {}}>]
test_arp_incrementation_dot1q_disabled_1(tg)[source]

Check arp incrementation. Count == Increment count. Dot1Q disabled.

test_arp_incrementation_dot1q_disabled_2(tg)[source]

Check arp incrementation. Count == 2*Increment count. Dot1Q disabled.

test_arp_incrementation_dot1q_enabled(tg)[source]

Check arp incrementation. Count == Increment count. Dot1Q enabled.

test_arp_sniff_pattern(tg)[source]

Verify ARP sniff pattern.

test_build_bgp_notification_packet(tg)[source]

Check building BGPNotification packet.

test_build_bgp_packet_as_path(tg)[source]

Check building BGP packet with multiple as_path.

test_build_bgp_packet_simple(tg)[source]

Check building BGP packet.

test_check_increment_arp_hwsrc(tg)[source]

Check all fields in incremented packet. APR.hwsrc increment.

test_check_increment_arp_psrc(tg)[source]

Check all fields in incremented packet. APR.psrc increment.

test_check_increment_dot1q_vlan_double(tg)[source]

Check all fields in incremented packet. Dot1Q.vlan increment.

test_check_increment_dot1q_vlan_single(tg)[source]

Check all fields in incremented packet. Dot1Q.vlan increment.

test_check_increment_igmp_ip(tg)[source]

Check all fields in incremented packet. IGMP.ip increment.

test_check_increment_ip_dscp(tg)[source]

Check all fields in incremented packet. IP.tos increment.

test_check_increment_ip_dst(tg)[source]

Check all fields in incremented packet. IP.dst increment.

test_check_increment_ip_icmp(tg)[source]

Check all fields in incremented packet. IP.src increment.

test_check_increment_ip_proto(tg)[source]

Check all fields in incremented packet. IP.proto increment.

test_check_increment_ip_src(tg)[source]

Check all fields in incremented packet. IP.src increment.

test_check_increment_udp_dport(tg)[source]

Check all fields in incremented packet. UDP.dport increment.

test_check_increment_udp_sport(tg)[source]

Check all fields in incremented packet. UDP.sport increment.

test_check_statistics(tg)[source]

Send 100 packets and check statistics.

test_clear_and_check_statistics(tg)[source]

Send 100 packets, clear and check statistics.

test_da_incrementation_1(tg)[source]

Check DA incrementation. Count == Increment count.

test_da_incrementation_2(tg)[source]

Check DA incrementation. Count > Increment count.

test_da_incrementation_continuous_traffic(tg)[source]

Check DA incrementation. Continuous traffic.

test_default_ether_type(tg)[source]

Verify that default Ether type for tagged packets is equal to 0x8100.

test_dhcp_ip_incrementation(tg)[source]

Check dhcp ip incrementation. Count == Increment count.

test_dot1q_arp_custom_filter(tg)[source]

Check Dot1Q.ARP filter.

test_dot1q_arp_filter(tg)[source]

Check Dot1Q.ARP filter.

test_dot1q_custom_filter(tg)[source]

Check Dot1Q filter.

test_dot1q_filter(tg)[source]

Check Dot1Q filter.

test_dot1q_icmp_custom_filter(tg)[source]

Check Dot1Q.ICMP filter.

test_dot1q_icmp_filter(tg)[source]

Check Dot1Q.ICMP filter.

test_dot1q_ip_custom_filter(tg)[source]

Check Dot1Q.IP filter.

test_dot1q_ip_filter(tg)[source]

Check Dot1Q.IP filter.

test_dot1q_tcp_custom_filter(tg)[source]

Check Dot1Q.TCP filter.

test_dot1q_tcp_filter(tg)[source]

Check Dot1Q.TCP filter.

test_dot1q_udp_custom_filter(tg)[source]

Check Dot1Q.UDP filter.

test_dot1q_udp_filter(tg)[source]

Check Dot1Q.UDP filter.

test_double_tagged_packet_1(tg)[source]

Verify that pypacker can recognize QinQ packets type 0x9100.

test_double_tagged_packet_2(tg)[source]

Verify that pypacker can recognize QinQ packets type 0x88A8.

test_dscp_incrementation_dot1q_disabled_1(tg)[source]

Check dscp incrementation. Count == Increment count. Dot1Q disabled.

test_dscp_incrementation_dot1q_disabled_2(tg)[source]

Check dscp incrementation. Count = 2*Increment count. Dot1Q disabled.

test_dst_ip_incrementation_dot1q_disabled_1(tg)[source]

Check destination_ip incrementation. Count == Increment count. Dot1Q disabled.

test_dst_ip_incrementation_dot1q_disabled_2(tg)[source]

Check destination_ip incrementation. Count = 2*Increment count. Dot1Q disabled.

test_dst_ip_incrementation_dot1q_enabled_1(tg)[source]

Check destination_ip incrementation. Count == Increment count. Dot1Q enabled.

test_dst_ip_incrementation_dot1q_enabled_2(tg)[source]

Check destination_ip incrementation. Count = 2*Increment count. Dot1Q enabled.

test_dst_ipv6_incrementation_dot1q_disabled_1(tg)[source]

Check DST IPv6 incrementation. Count == Increment count. Dot1Q disabled.

test_dst_ipv6_incrementation_dot1q_disabled_2(tg)[source]

Check DST IPv6 incrementation. Count = 2*Increment count. Dot1Q disabled.

test_dst_ipv6_incrementation_dot1q_enabled_1(tg)[source]

Check DST IPv6 incrementation. Count == Increment count. Dot1Q enabled.

test_dst_ipv6_incrementation_dot1q_enabled_2(tg)[source]

Check DST IPv6 incrementation. Count == Increment count. Dot1Q enabled.

test_dst_udp_incrementation_dot1q_disabled_1(tg)[source]

Check destination_udp incrementation. Count == Increment count. Dot1Q disabled.

test_dst_udp_incrementation_dot1q_disabled_2(tg)[source]

Check destination_udp incrementation. Count = 2*Increment count. Dot1Q disabled.

test_dst_udp_incrementation_dot1q_enabled(tg)[source]

Check destination_udp incrementation. Count == Increment count. Dot1Q enabled.

test_dstmac_filter(tg)[source]

Check dstMac filter.

test_ether_incrementation_dot1q_disabled_1(tg)[source]

Check ether type incrementation. Count == Increment count. Dot1Q disabled.

test_ether_incrementation_dot1q_disabled_2(tg)[source]

Check ip protocol incrementation. Count = 2*Increment count. Dot1Q disabled.

test_exact_packets_delivery(tg)[source]

Verify that send stream send exact packets count.

test_flow_label_dst_ipv6_incrementation(tg)[source]

Check Flow Label and DST IPv6 incrementation. Count == Increment count. Dot1Q enabled.

test_flow_label_ipv6_incrementation_dot1q_disabled_1(tg)[source]

Check Flow Label IPv6 incrementation. Count == Increment count. Dot1Q disabled.

test_flow_label_ipv6_incrementation_dot1q_disabled_2(tg)[source]

Check Flow Label incrementation. Count = 2*Increment count. Dot1Q disabled.

test_flow_label_ipv6_incrementation_dot1q_enabled(tg)[source]

Check Flow Label IPv6 incrementation. Count == Increment count. Dot1Q enabled.

test_flow_label_src_ipv6_incrementation(tg)[source]

Check Flow Label with SRC IPv6 incrementation. Count = 2*Increment count. Dot1Q disabled.

test_get_rate_stat(tg)[source]

Check transmit rate reading.

test_icmp_custom_filter(tg)[source]

Check ICMP filter.

test_icmp_filter(tg)[source]

Check ICMP filter.

test_incrementation_negative_1(tg)[source]

Verify that method set_stream returns Error message when layer is not defined in packet(1).

test_incrementation_negative_2(tg)[source]

Verify that method set_stream returns Error message when when layer is not defined in packet(2).

test_incremented_streams(tg)[source]

Send incremented streams.

test_ip_custom_filter(tg)[source]

Check IP filter.

test_ip_dip_and_sip_increment_udf_dependant(tg)[source]

Check ip dip and sip_increment incrementation. Dip increment dependant from sip increment.

test_ip_dscp_and_sip_increment_dot1q_disabled_1(tg)[source]

Check ip dscp and sip_increment incrementation. Count == Increment count. Dot1Q disabled.

test_ip_dscp_and_sip_increment_dot1q_disabled_2(tg)[source]

Check ip dscp and sip_increment incrementation. Count == Increment count. Dot1Q disabled.

test_ip_dscp_dip_sip_increment_udf_dependant(tg)[source]

Check ip dscp, dip and sip_increment incrementation. Dependant increments.

test_ip_dscp_dip_sip_increment_udf_one_dependant(tg)[source]

Check ip dscp, dip and sip_increment incrementation. Dependant increments form sip.

test_ip_dscp_incrementation_dot1q_enabled(tg)[source]

Check ip dscp incrementation. Count == Increment count. Dot1Q enabled.

test_ip_filter(tg)[source]

Check IP filter.

test_ip_protocol_and_sip_increment_dot1q_disabled(tg)[source]

Check ip protocol and sip_increment incrementation. Count = 2*Increment count. Dot1Q disabled.

test_ip_protocol_and_sip_increment_dot1q_enabled(tg)[source]

Check ip protocol and sip_increment incrementation. Count == Increment count. Dot1Q enabled.

test_ip_protocol_incrementation_dot1q_disabled(tg)[source]

Check ip protocol incrementation. Count == Increment count. Dot1Q disabled.

test_ip_protocol_incrementation_dot1q_disabled_2(tg)[source]

Check ip protocol incrementation. Count = 2*Increment count. Dot1Q disabled.

test_ip_protocol_incrementation_dot1q_enabled(tg)[source]

Check destination_udp incrementation. Count == Increment count. Dot1Q enabled.

test_lacp_layers(tg)[source]

Verify that LACP packets are built and captured correctly.

test_lldp_build_capture(tg)[source]

Verify that LLDP packets are builded and sniffed correctly.

test_lldp_dcbx(tg)[source]

Verify that DCBX packets are built and captured correctly.

test_lldp_dcbx_app_prio_table(tg)[source]

Verify that DCBX packets with Application Priority Tables are built and captured correctly.

test_lldp_incrementation_continuous_traffic_1(tg)[source]

Check lldp incrementation. Continuous traffic.

test_lldp_incrementation_continuous_traffic_2(tg)[source]

Check lldp incrementation. Continuous traffic.

test_lldp_incrementation_increment_count_1(tg)[source]

Check lldp incrementation. Count == Increment count.

test_lldp_incrementation_increment_count_2(tg)[source]

Check lldp incrementation. Count == 2*Increment count.

test_lldp_sys_capabilities(tg)[source]

Verify that LLDP packets with full System capabilities list are built and captured correctly.

test_lldp_with_padding(tg)[source]

Verify that LLDP packets with with padding are built and captured correctly.

test_multistreams_and_multifaces(tg)[source]

Multiple streams and multiple ifaces.

test_multistreams_and_one(tg)[source]

Multiple streams and one on same iface.

test_multistreams_on_single_iface(tg)[source]

Multiple streams and one iface.

test_next_header_ipv6_incrementation_dot1q_disabled(tg)[source]

Check next header incrementation. Count == Increment count. Dot1Q disabled.

test_next_header_ipv6_incrementation_dot1q_disabled_2(tg)[source]

Check next header incrementation. Count = 2*Increment count. Dot1Q disabled.

test_next_header_ipv6_incrementation_dot1q_enabled(tg)[source]

Check next header IPv6 incrementation. Count == Increment count. Dot1Q enabled.

test_not_arp_filter(tg)[source]

Check notARP filter.

test_not_stp_filter(tg)[source]

Check notSTP filter.

test_packet_fragmentation(tg)[source]

Check packet fragmentation.

test_packet_random_size_1(tg)[source]

Check packet random size setting. Count=1.

test_packet_random_size_2(tg)[source]

Check packet random size setting. Count=5.

test_packet_size_decrementing(tg)[source]

Check packet size decrementing. Count=9, decrement count=9.

test_packet_size_incrementing_1(tg)[source]

Check packet size incrementing. Count=1, increment count=5.

test_packet_size_incrementing_2(tg)[source]

Check packet size incrementing. Count=5, increment count=5.

test_packet_with_ipoption(tg)[source]

Test building packet with IPOption.

test_pause_frames_0001(tg)[source]

Verify that MAC Control Pause frames with opcode 0x0001 are builded and sniffed correctly.

test_pause_frames_0101(tg)[source]

Verify that MAC Control Pause frames with opcode 0x0101 are builded and sniffed correctly.

test_pause_frames_ffff(tg)[source]

Verify that MAC Control Pause frames with unknown are builded and sniffed correctly.

test_pproc_packet_dictionary(tg)[source]

Check packet dictionary. Fragsize is None.

test_pproc_packet_fragmentation_1(tg)[source]

Check packet fragmentation.

test_pproc_packet_fragmentation_2(tg)[source]

Check packet fragmentation. fragsize is None.

test_qinq_packets_sniffer(tg)[source]

Check QinQ packet send.

test_qos_iptos_stat(tg)[source]

Check Ixia QoS IP TOS stat reading.

test_qos_vlan_stat(tg)[source]

Check Ixia QoS vlan stat reading.

test_sa_incrementation_1(tg)[source]

Check SA incrementation. Count == Increment count.

test_sa_incrementation_2(tg)[source]

Check SA incrementation. Count > Increment count.

test_sa_incrementation_and_packet_fragmentation(tg)[source]

Check SA incrementation + packet fragmentation. Count == Increment count.

test_send_several_streams(tg)[source]

Send several streams.

test_send_sniff_max_min_packets(tg, padding_size)[source]

Verify sending and sniffing of packets with minimal and maximal size.

test_send_stream_several_times(tg)[source]

Send stream several times and check statistics.

test_single_packet(tg)[source]

Single packet.

test_single_stream(tg)[source]

Single stream.

test_sniffed_packets_timestamp(tg)[source]

Check sniffed packets timestamp.

test_sniffing_negative(tg)[source]

Sniff for one packet, but sniff nothing.

test_src_and_dst_ipv6_incrementation_dot1q_disabled(tg)[source]

Check SRC and DST IPv6 incrementation. Count == Increment count. Dot1Q disabled.

test_src_and_dst_ipv6_incrementation_dot1q_enabled(tg)[source]

Check SRC and DST IPv6 incrementation. Count == Increment count. Dot1Q enabled.

test_src_ip_incrementation_dot1q_disabled_1(tg)[source]

Check source_ip incrementation. Count == Increment count. Dot1Q disabled.

test_src_ip_incrementation_dot1q_disabled_2(tg)[source]

Check source_ip incrementation. Count = 2*Increment count. Dot1Q disabled.

test_src_ip_incrementation_dot1q_enabled_1(tg)[source]

Check source_ip incrementation. Count == Increment count. Dot1Q enabled.

test_src_ip_incrementation_dot1q_enabled_2(tg)[source]

Check source_ip incrementation. Count = 2*Increment count. Dot1Q enabled.

test_src_ipv6_incrementation_dot1q_disabled_1(tg)[source]

Check SRC IPv6 incrementation. Count == Increment count. Dot1Q disabled.

test_src_ipv6_incrementation_dot1q_disabled_2(tg)[source]

Check SRC IPv6 incrementation. Count = 2*Increment count. Dot1Q disabled.

test_src_ipv6_incrementation_dot1q_enabled_1(tg)[source]

Check SRC IPv6 incrementation. Count == Increment count. Dot1Q enabled.

test_src_ipv6_incrementation_dot1q_enabled_2(tg)[source]

Check SRC IPv6 incrementation. Count > Increment count. Dot1Q enabled.

test_src_tcp_and_dst_tcp_incrementation_dot1q_disabled_1(tg)[source]

Check source_tcp and destination_tcp incrementation. Count == Increment count. Dot1Q disabled.

test_src_tcp_and_dst_tcp_incrementation_dot1q_disabled_2(tg)[source]

Check source_tcp and destination_tcp incrementation. Count = 2*Increment count. Dot1Q disabled.

test_src_tcp_and_dst_tcp_incrementation_dot1q_enabled(tg)[source]

Check source_tcp and destination_tcp incrementation. Count == Increment count. Dot1Q enabled.

test_src_tcp_incrementation_dot1q_disabled_1(tg)[source]

Check source_tcp incrementation. Count == Increment count. Dot1Q disabled.

test_src_tcp_incrementation_dot1q_disabled_2(tg)[source]

Check source_tcp incrementation. Count = 2*Increment count. Dot1Q disabled.

test_src_tcp_incrementation_dot1q_enabled(tg)[source]

Check source_tcp incrementation. Count == Increment count. Dot1Q enabled.

test_src_udp_and_dst_udp_incrementation_dot1q_disabled_1(tg)[source]

Check source_udp and destination_udp incrementation. Count == Increment count. Dot1Q disabled.

test_src_udp_and_dst_udp_incrementation_dot1q_disabled_2(tg)[source]

Check source_udp and destination_udp incrementation. Count = 2*Increment count. Dot1Q disabled.

test_src_udp_and_dst_udp_incrementation_dot1q_enabled(tg)[source]

Check source_udp and destination_udp incrementation. Count == Increment count. Dot1Q enabled.

test_src_udp_incrementation_dot1q_disabled_1(tg)[source]

Check source_udp incrementation. Count == Increment count. Dot1Q disabled.

test_src_udp_incrementation_dot1q_disabled_2(tg)[source]

Check source_udp incrementation. Count = 2*Increment count. Dot1Q disabled.

test_src_udp_incrementation_dot1q_enabled(tg)[source]

Check source_udp incrementation. Count == Increment count. Dot1Q enabled.

test_srcmac_and_dstmac_filter(tg)[source]

Check srcMac and dstMac filter.

test_srcmac_and_dstmac_wrong_layer_filter(tg)[source]

Check srcMac and dstMac filter with wrong filter_layer.

test_srcmac_filter(tg)[source]

Check srcMac filter.

test_start_stop_parallel_and_independent_continuous_streams(tg)[source]

Verify parallel and independent streams starts and stops.

test_start_stop_parallel_and_independent_set_quantity_streams(tg)[source]

Verify parallel and independent set quantity of streams.

test_stop_all_streams(tg)[source]

Verify that stop_streams stop all streams by default.

test_stop_sniffing(tg)[source]

Start continuous stream and stop sniffing.

test_stp_custom_filter(tg)[source]

Check STP filter.

test_stp_filter(tg)[source]

Check STP filter.

test_stream(tg)[source]

Verify that send stream send exact packets count.

test_streams_corruption_1(tg)[source]

Verify that set_stream does not corrupt already started streams.

test_streams_corruption_2(tg)[source]

Verify that set_stream does not corrupt already started streams.

test_tcp_custom_filter(tg)[source]

Check TCP filter.

test_tcp_filter(tg)[source]

Check TCP filter.

test_traffic_class_ipv6_incrementation_dot1q_disabled(tg)[source]

Check traffic class incrementation. Count == Increment count. Dot1Q disabled.

test_traffic_class_ipv6_incrementation_dot1q_disabled_2(tg)[source]

Check traffic class incrementation. Count = 2*Increment count. Dot1Q disabled.

test_traffic_class_ipv6_incrementation_dot1q_enabled(tg)[source]

Check traffic class IPv6 incrementation. Count == Increment count. Dot1Q enabled.

test_udp_custom_filter(tg)[source]

Check UDP filter.

test_udp_filter(tg)[source]

Check UDP filter.

test_vlan_incrementation_increment_count_1(tg)[source]

Check vlan incrementation. Count == Increment count.

test_vlan_incrementation_increment_count_2(tg)[source]

Check vlan incrementation. Count == 2*Increment count.

test_xstp_build_capture(tg)[source]

Check stp/rstp/mstp build and detection.

static verify_packets_data(initial_packet_def, received_packet_def)[source]

Check 2 packet definitions.

unittests.traffic_generator.test_trextg

test_trextg.py

TRex traffic generator’s unittests

Notes

To run TRex unittests:
  1. Install TRex client package
  2. Configure and start TRex server
  3. Specify IP address and ports in the following variables: TREX_HLT_CONFIG, TREX_CONFIG
class unittests.traffic_generator.test_trextg.TestTrexHLTTg[source]

Bases: object

pytestmark = [<MarkDecorator 'skipif' {'args': (False,), 'kwargs': {'reason': 'Need to install TRex client package'}}>]
test_simple_udp(trex_hlt)[source]

Send bidirectional UDP stream.

class unittests.traffic_generator.test_trextg.TestTrexTg[source]

Bases: object

packet_definition = ({'Ether': {'src': '00:00:00:00:00:02', 'dst': 'ff:ff:ff:ff:ff:ff'}}, {'IP': {'src': '10.1.1.1', 'dst': '20.1.1.1'}}, {'UDP': {'dport': 50, 'sport': 10}})
packet_definition_tcp = ({'Ether': {'src': '00:00:00:00:00:02', 'dst': 'ff:ff:ff:ff:ff:ff'}}, {'IP': {'src': '10.1.1.1', 'dst': '20.1.1.1'}}, {'TCP': {'dport': 50, 'sport': 10}})
pytestmark = [<MarkDecorator 'skipif' {'args': (False,), 'kwargs': {'reason': 'Need to install TRex client package'}}>]
test_all_supported_increments_simultaneously(trex)[source]

Set stream with all supported increments simultaneously and send it

Notes

Two TRex interfaces should be connected to each other

test_clear_statistics(trex)[source]

Clear interface statistics.

test_increment_dst_ip(trex)[source]

Set stream with destination ip increment and send it.

Notes

Two TRex interfaces should be connected to each other

test_increment_required_size_1(trex)[source]

Set stream with ‘required_size’ increment and send it.

Notes

Two TRex interfaces should be connected to each other

test_increment_required_size_in_loop(trex)[source]

Set stream with increment ‘required_size’ and verify that size is wrapped back to min value.

Notes

Two TRex interfaces should be connected to each other

test_increment_src_ip(trex)[source]

Set stream with source ip increment and send it.

Notes

Two TRex interfaces should be connected to each other

test_increment_tcp_dst_port(trex)[source]

Set stream with TCP destination port increment and send it.

Notes

Two TRex interfaces should be connected to each other

test_increment_tcp_src_port(trex)[source]

Set stream with TCP source port increment and send it.

Notes

Two TRex interfaces should be connected to each other

test_increment_udp_dst_port(trex)[source]

Set stream with UDP destination port increment and send it.

Notes

Two TRex interfaces should be connected to each other

test_increment_udp_src_port(trex)[source]

Set stream with UDP source port increment and send it.

Notes

Two TRex interfaces should be connected to each other

test_random_required_size(trex)[source]

Set stream with random ‘required_size’ and send it.

Notes

Two TRex interfaces should be connected to each other

test_send_2_streams_on_same_interface_trex(trex)[source]

Send 2 streams using send_stream method on same interface.

test_send_2_streams_trex(trex)[source]

Send 2 streams using send_stream method on different interfaces.

test_single_stream_trex(trex)[source]

Send single stream using send_stream method.

test_single_stream_with_packet_interval_trex(trex)[source]

Send single stream with packets interval.

test_start_2_continuous_on_same_interface_streams_trex(trex)[source]

Send 2 continuous streams using start_streams method on same interface.

test_start_2_continuous_streams_trex(trex)[source]

Send 2 continuous streams using start_streams method on different interfaces.

test_start_2_streams_on_same_interface_trex(trex)[source]

Send 2 streams using start_streams method on same interface.

test_start_2_streams_trex(trex)[source]

Send 2 streams using start_streams method on different interfaces.

unittests.traffic_generator.test_trextg.trex(request)[source]
unittests.traffic_generator.test_trextg.trex_hlt(request)[source]