Source code for taf.testlib.fixtures

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``fixtures.py``

`Useful fixture functions/patterns for TAF`

"""
import functools
import os
import copy
import operator
from collections import namedtuple

import pytest

from .helpers import grouper
from . import loggers
from .dev_linux_host import GenericLinuxHost


[docs]def sshlog(request, env_name=None, env_obj=None, instance_class=GenericLinuxHost): """Register additional file handler for linux_host ssh loggers per test case. Args: request(pytest.request): fixture request object. env_name(str): Name of Environment class instance [optional]. env_obj(Environment): Environment instance [optional]. instance_class(object): SSH logger has to be crated only for environment attributes which are instances of this class [optional]. Examples: Define your fixture based on this function:: # My Environment fixture @pytest.fixture def my_env(request): env = Environment(...) request.addfinalizer(env.shutdown) env.initialize() return env @pytest.fixture(autouse=True) def ssh_logger(request): fixtures.sshlog(request, "my_env") def test_something(my_env): my_env.lhost[1].ssh.exec_command("command_to_be_executed") After that you have to see additional files in defined with \--logdir option folder.\n Also you can use it like function to modify existing env fixture:: @pytest.fixture(autouse=True) def env_new(request, env): fixtures.sshlog(request, env_obj=env) return env """ # Skip fixture if logdir isn't set. if not loggers.LOG_DIR: return def add_handler(log_adapter, log_file): """Register new file handler. """ log_file_handler = loggers.logging.FileHandler(log_file) # Set the same formatter if log_adapter.logger.handlers and log_adapter.logger.handlers[0].formatter: log_file_handler.setFormatter(log_adapter.logger.handlers[0].formatter) log_adapter.logger.addHandler(log_file_handler) return log_file_handler def remove_handlers(env, log_handlers): """Remove all created and saved in log_handlers list additional file handlers. """ for obj in list(env.id_map.values()): if obj.id in log_handlers: log_handlers[obj.id].flush() log_handlers[obj.id].close() obj.ssh.class_logger.logger.removeHandler(log_handlers[obj.id]) log_handlers.pop(obj.id) # log_file_handlers to remove log_handlers = {} # Check if env is used in TC and file logging is enabled if env_obj is None and (env_name not in request.fixturenames or loggers.LOG_DIR is None): return if env_obj is None and env_name: env_obj = request.getfuncargvalue(env_name) request.addfinalizer(lambda: remove_handlers(env_obj, log_handlers)) file_prefix = os.path.join(loggers.LOG_DIR, "{0}_{1}_".format(request.function.__name__, os.getpid())) # Search for LinuxHost objects for obj in list(env_obj.id_map.values()): if isinstance(obj, instance_class): log_file = "{0}_id_{1}_type_{2}.log".format(file_prefix, obj.id, obj.type) log_handlers[obj.id] = add_handler(obj.ssh.class_logger, log_file)
[docs]def autolog(request, logger_name="suite_logger"): """Inject logger object to test class. Args: request(pytest.request): py.test request object. logger_name(str): name of logger class attribute to create Notes: This fixture has to have scope level "class". You do not need to pass this fixture to test function in case you set autouse. Examples:: @pytest.fixture(scope="class", autouse=True) def autolog(request): return fixtures.autolog(request, "wishful_logger_instance_name") """ def remove_logger(cls): """Explicit close log handlers. """ while getattr(cls, logger_name).logger.handlers: getattr(cls, logger_name).logger.handlers[0].flush() getattr(cls, logger_name).logger.handlers[0].close() getattr(cls, logger_name).logger.removeHandler(getattr(cls, logger_name).logger.handlers[0]) delattr(cls, logger_name) taf_logger = loggers.module_logger(request.cls.__module__, request.cls.__name__) request.addfinalizer(lambda: remove_logger(request.cls)) setattr(request.cls, logger_name, taf_logger) getattr(request.cls, logger_name).debug("Starting %s test suite." % (request.cls.__name__, )) del taf_logger
Device_Tuple = namedtuple('Device_Tuple', ['device_ids', 'lag_id']) Ports_Tuple = namedtuple('Port_Tuple', ['ports', 'ports_list'])
[docs]class LagIdGenerator(object): INITIAL_LAG = 3800 def __init__(self): super(LagIdGenerator, self).__init__() self.free_lags = {} @classmethod
[docs] def id_to_key(cls, lag_id): """Get LAG key by LAG ID. Args: lag_id(int): LAG ID Returns: int: LAG key """ return lag_id - cls.INITIAL_LAG
def _default_set(self, max_lags): # create set since we will be intersecting # since we allocate after intersecting, initial order doesn't matter # we have to sort anyway return set(range(self.INITIAL_LAG, self.INITIAL_LAG + max_lags - 1))
[docs] def generate_lag(self, *args): """Get lag ID for specific device. Args: args(list[SwitchGeneral]): list of devices to generate lag ids for Raises: StopIteration: if cross part contains more than 2 devices Returns: int|None: LAG ID """ non_empty = [_f for _f in args if _f] if not non_empty: # in case device is not in cross connection return None lag_sets = [self.free_lags.setdefault(dev.id, self._default_set(dev.hw.max_lags)) for dev in non_empty] # intersect all the sets using reduce # operator.and_ is & is intersection intersection = functools.reduce(operator.and_, lag_sets) # intersection of sets shouldn't have an ordering so we always have to sort # next will raise StopIteration if it can't allocate lag = next(iter(sorted(intersection))) for lag_set in lag_sets: # use discard so we don't raise an error in case we were passed # in the same device twice lag_set.discard(lag) return lag
[docs]class LagPortEnv(object): """Class for fixture that replaces ports with LAGs. """ def __init__(self, request, env): super(LagPortEnv, self).__init__() self.cross_part_copy = copy.deepcopy(env.setup['cross']) self.ports_dict = {} self.request = request self.env = env self.lag_generator = LagIdGenerator() self.lags_to_create = {} self.cleanups = {} self.creates = {}
[docs] def get_cross_part_lag(self, cross_part): """Get LAG ID for cross connection. Args: cross_part(list): connection from setup file, e.g. ["03", 1, "1", 1] Raises: StopIteration: if port is already in LAG Returns: tuple(dict{device_id: link_id}, LAG_ID): dictionary of device_id: port_id and LAG ID """ device_dict = {} # store device.id: link_id pairs get_lag_args = [] # store device objects # iterator over _id, _port_id pairs in cross_part for _id, _port_id in grouper(cross_part, 2): device = self.env.id_map[self.env.get_device_id(_id)] max_lags = getattr(getattr(device, 'hw', None), "max_lags", 0) # check if device supports LAG if max_lags: device_dict.update({device.id: _port_id}) dev = device get_lag_args.append(dev) for dev_id, port_id in device_dict.items(): device = self.env.id_map[self.env.get_device_id(dev_id)] real_port_id = self.env.get_real_port_name(dev_id, port_id) # check if port is not a LAG member ports_lags_table = device.ui.get_table_ports2lag() if any(r for r in ports_lags_table if r['portId'] == real_port_id): raise StopIteration("Port is already in LAG") lag = self.lag_generator.generate_lag(*get_lag_args) return Device_Tuple(device_dict, lag)
[docs] def setup(self): for cross in self.env.setup['cross'].values(): for cross_part in cross: for _id, _port_id in grouper(cross_part, 2): device = self.env.id_map[self.env.get_device_id(_id)] max_lags = getattr(getattr(device, 'hw', None), "max_lags", 0) # check if device supports LAG if max_lags: # Wrap original clearconfig in order to recreate LAGs if device.id not in self.cleanups: self.cleanups[device.id] = device.clearconfig device.clearconfig = self.add_lags(device, device.clearconfig) self.creates[device.id] = device.start device.start = self.add_lags(device, device.start)
[docs] def setup_lags(self): """Define LAGs that will be created. Notes: This method changes initial device's attributes (ports and port_list). Initial configuration should be restored after test execution """ for cross in self.env.setup["cross"].values(): for cross_part in cross: # get LAG ID for specific cross connection try: lag_env = self.get_cross_part_lag(cross_part) except StopIteration: # Restore initial configuration in case of error self.teardown() # Skip test execution pytest.skip("Test case could not be executed on LAGs: Port is already in LAG") if lag_env.lag_id: for dev_id, port_id in lag_env.device_ids.items(): device = self.env.id_map[dev_id] real_port_id = self.env.get_real_port_name(dev_id, port_id) # backup ports and ports_list self.ports_dict.setdefault(device.id, Ports_Tuple(copy.deepcopy(device.ports), copy.deepcopy(device.port_list))) # Add LAGs to be created on device self.lags_to_create.setdefault(device.id, []).append((lag_env.lag_id, real_port_id)) # Add changes in setup # Add LAG ID in ports or port_list # Change link ID in cross connection if getattr(device, "port_list", None): speed = self.env.get_port_speed(dev_id, port_id) device.port_list.append([lag_env.lag_id, speed]) device.ports.append(lag_env.lag_id) cross_part[cross_part.index(dev_id) + 1] = len(device.port_list) elif getattr(device, "ports", None): device.ports.append(lag_env.lag_id) cross_part[cross_part.index(dev_id) + 1] = len(device.ports)
[docs] def teardown(self): """Restore initial configuration. """ # Restore setup JSON file for cross_id in self.env.setup["cross"]: del self.env.setup['cross'][cross_id][:] self.env.setup['cross'][cross_id].extend(self.cross_part_copy[cross_id]) # Restore device's attributes ports and ports_list for dev_id in self.ports_dict: device = self.env.id_map[dev_id] device.ports = self.ports_dict[dev_id].ports device.port_list = self.ports_dict[dev_id].ports_list # Restore original clearconfig for dev_id in self.cleanups: device = self.env.id_map[dev_id] device.clearconfig = self.cleanups[dev_id] device.start = self.creates[dev_id]
[docs] def add_lags(self, device, func): """Wrap original device's method e.g. clearconfig or restart. """ def wrapper(*args, **kwargs): func(*args, **kwargs) if not self.lags_to_create: self.setup_lags() for lag_id, port_id in self.lags_to_create[device.id]: device.ui.create_lag(lag=lag_id, key=self.lag_generator.id_to_key(lag_id), lag_type="Static", hash_mode="None") device.ui.create_lag_ports(ports=[port_id, ], lag=lag_id, key=self.lag_generator.id_to_key(lag_id)) return wrapper
[docs]def env_lag(request, env): """Replace physical ports in setup file with LAGs. Args: request(pytest.Request): pytest request fixture env(Environment): env fixture Notes: For correct functioning new fixture for test cases should be created. Examples:: # Code in conftest file or test module from testlib.fixtures import env_lag @pytest.fixture(scope='module', autouse=True) def env_replace_lag(request, env_init): env_lag(request, env_init) """ lag_env = LagPortEnv(request, env) request.addfinalizer(lag_env.teardown) lag_env.setup()
[docs]def lhost_to_switch(request, env_init): """Add Linux Hosts as Switch devices into environment. Notes: For correct functioning new fixture for test cases should be created. Examples:: # Code in conftest file or test module from testlib.fixtures import lhost_to_switch @pytest.fixture(scope='module', autouse=True) def env_switch_lhost(request, env_init): lhost_to_switch(request, env_init) """ # Get initial version of Environment object added_switches = [] init_dut_map = copy.deepcopy(env_init.dut_map) def setup(): # Add lhost instances into env.switch dictionary for _lhost in getattr(env_init, 'lhost', {}).values(): switches = getattr(env_init, 'switch', {}) eid = len(switches) + 1 env_init.dut_map["sw{}".format(eid)] = _lhost.id if not hasattr(env_init, 'switch'): setattr(env_init, 'switch', {}) env_init.switch[eid] = _lhost added_switches.append(eid) def teardown(): # Rollback all changes env_init.dut_map = init_dut_map for _key in added_switches: env_init.switch.pop(_key) request.addfinalizer(teardown) setup()
[docs]def chef_prep(request, env_main, cll): """Do steps required by configuration management system tests. Notes: For correct functioning new fixture for test cases should be created. Examples:: # Code in conftest file or test module from testlib.fixtures import chef_prep @pytest.fixture(scope='class', autouse=True) def suite_prep(request, env_main): chef_prep(request, env_main) """ def _remove_chef_client(): """Uninstall chef client and do cleanup of chef dir. """ assert env_main.switch[1].ui.cli_send_command('yum erase -y chef*')[-1] == 0 assert env_main.switch[1].ui.cli_send_command('rm -rf /etc/chef/')[-1] == 0 env_main.chef[1].delete_node(cll.fqdn_hostname) def _remove_local_role_files(): for x in os.listdir(cll.configs_path): os.remove(os.path.join(cll.configs_path, x)) os.rmdir(cll.configs_path) def _teardown(): # _remove_chef_client() env_main.chef[1].remove_role() _remove_local_role_files() cll.fqdn_hostname = env_main.switch[1].ui.cli_send_command('hostname -f')[0].strip() cll.configs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'chef_roles') if not os.path.exists(cll.configs_path): os.makedirs(cll.configs_path) # Perform teardown sequence request.addfinalizer(_teardown)