# Copyright (c) 2015 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""``virtual_env.py``
`Tempest API UI specific functionality`
"""
# TEMPEST_VERSION 201610100101
###############################################################################
# Requires settings environment json entry. Example:
# {
# "name": "settings",
# "entry_type": "settings",
# "instance_type": "settings",
# "id": "993",
# "images_share_path": "/mnt/berta/oses/openstack",
# "mgmt_ip_cidr": "11.212.23.11/8",
# "other_configs": {"ovs_type": "ovs",}
# }
###############################################################################
import time
import re
import os
import json
import pprint
import itertools
from functools import wraps
import traceback
import netaddr
import pytest
from . import loggers
from . import environment
from .custom_exceptions import TAFCoreException
from .dev_linux_host_vm import GenericLinuxVirtualHost
from .helpers import merge_dicts
from .common3 import Environment, custom_classes
[docs]class OpenStackNoSuchImage(Exception):
pass
[docs]def only_with_neutron_extension(extension):
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self.has_neutron_extension(extension):
return func(self, *args, **kwargs)
else:
self.class_logger.error(('Function %s called that expects neutron extension '
'%s which is not installed.'), func.__name__, extension)
return wrapper
return decorator
[docs]def only_with_service(service):
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self.has_service(service):
return func(self, *args, **kwargs)
else:
self.class_logger.error(('Function %s called that expects %s service '
' which is not available.'), func.__name__, service)
return wrapper
return decorator
[docs]class VirtualEnv(object):
"""Main class of all test virtual environment using tempest.
Notes:
This class has to be used as base fixture in all tempest test cases.
It provides number of common methods to initialize, shutdown,
cleanup environment functions which basically call appropriate methods
of particular device classes.
"""
class_logger = loggers.ClassLogger()
_DEFAULT_FLAVOR_SPEC = {
'ram': 1024,
'disk': 10,
'vcpus': 4,
}
DPDK_EXTRA_SPECS = {'hw:mem_page_size': 'large'}
OVS_FLAVOR_SPEC = merge_dicts(
_DEFAULT_FLAVOR_SPEC,
{'name': 'venv-ovs-flavor'},
)
DPDK_FLAVOR_SPEC = merge_dicts(
_DEFAULT_FLAVOR_SPEC,
DPDK_EXTRA_SPECS,
{'name': 'venv-dpdk-flavor'},
)
# Example: my_vIPS_image-fedora-bare.qcow2
IMAGE_NAME_PATTERN = r'\S*{0}\S*-(?P<user>\w+)-(?P<cont_frmt>\w+)\.(?P<disk_frmt>\w+)'
def __init__(self, opts=None, external_router=True):
super(VirtualEnv, self).__init__()
self.class_logger.info('Initializing virtual environment...')
self.opts = opts
self.env_settings = self._get_settings(self.opts.env)
self.tempest_path = self.opts.tempest_path
self.reuse_venv = self.opts.reuse_venv
self.neutron_extensions = None
self.services = None
import tempest
from tempest.scenario.manager import NetworkScenarioTest
self.tempest_lib = tempest.lib
NetworkScenarioTest.runTest = None
self.config = tempest.config.CONF
try:
self.handle = NetworkScenarioTest()
self.handle.set_network_resources()
self.handle.setUpClass()
self.handle._resultForDoCleanups = self.handle.defaultTestResult()
self.handle.setUp()
except Exception:
self.class_logger.exception(
'Could not create tempest handle object. \
Please re-check tempest config in %s', self.tempest_path)
pytest.exit("VirtualEnv::__init__")
if self.has_neutron_extension('sfc'):
self._add_sfc_client()
if self.has_service('magnum'):
self._add_magnum_clients()
self.other_config = self.env_settings.get('other_configs', {})
self.ovs_type = self.other_config.get('ovs_type')
# create initial parameters required by all instances
self.images_path = self.env_settings.get("images_share_path")
self.create_loginable_secgroup_rule()
self.ensure_keypair()
self.tenant_id = self.handle.networks_client.tenant_id
_default_spec = self.DPDK_FLAVOR_SPEC if self.is_DPDK() else self.OVS_FLAVOR_SPEC
self.DEFAULT_FLAVOR = self.get_flavor_by_spec(_default_spec)
assert self.DEFAULT_FLAVOR
public_access_kwargs = {
'try_reuse': self.reuse_venv,
'name': self.tempest_lib.common.utils.data_utils.rand_name('tempest-public-net'),
'tenant_id': self.tenant_id,
'create_external_router': external_router,
}
assert self.ensure_public_access(**public_access_kwargs)
# Prepare environment objects
self.env = Environment(self.opts)
self.get_ports = self.env.get_ports
instances = (entry['entry_type'] for entry in self.env.config if 'entry_type' in entry)
for entry_type in instances:
entry_name = custom_classes[entry_type]['NAME']
setattr(self, entry_name, getattr(self.env, entry_name))
[docs] def ensure_keypair(self):
generate = False
key = {}
try:
keyfile = self.env_settings['ssh_key']
pubkeyfile = self.env_settings['ssh_pubkey']
except KeyError:
self.class_logger.debug("ssh key not provided, a new will be generated")
generate = True
else:
self.class_logger.debug("Reading private key from %s", keyfile)
self.class_logger.debug("Reading public key from %s", pubkeyfile)
try:
with open(keyfile, 'r') as stream:
key['private_key'] = stream.read()
with open(pubkeyfile, 'r') as stream:
key['public_key'] = stream.read()
except IOError as e:
self.class_logger.info("ssh key provided but can't be used! (%s)", format(e))
generate = True
else:
client = self.handle.manager.keypairs_client
name = self.tempest_lib.common.utils.data_utils.rand_name('tempest-key')
self.key = client.create_keypair(name=name,
public_key=key['public_key'])['keypair']
self.key['private_key'] = key['private_key']
if generate:
self.key = self.handle.create_keypair()
assert 'private_key' in self.key
assert 'public_key' in self.key
def _get_neutron_extensions(self):
if self.neutron_extensions is None:
client = self.handle.admin_manager.network_extensions_client
self.neutron_extensions = client.list_extensions()['extensions']
return self.neutron_extensions
def _get_services(self):
if self.services is None:
client = self.handle.admin_manager.identity_services_client
self.services = client.list_services()['OS-KSADM:services']
return self.services
[docs] def has_service(self, service):
services = self._get_services()
return service in [s['name'] for s in services]
[docs] def has_neutron_extension(self, extension):
extensions = self._get_neutron_extensions()
return extension in [e['alias'] for e in extensions]
def _add_sfc_client(self):
from testlib.tempest_clients.sfc_client import SfcClient
self.class_logger.info('Adding SfcClient.')
try:
# FIXME: do I need admin_manager?
self.handle.sfc_client = SfcClient(
self.handle.manager.auth_provider,
self.config.network.catalog_type,
self.config.network.region or self.config.identity.region,
endpoint_type=self.config.network.endpoint_type,
build_interval=self.config.network.build_interval,
build_timeout=self.config.network.build_timeout,
**self.handle.admin_manager.default_params)
except Exception:
self.class_logger.exception('Could not create sfc client!')
def _add_magnum_clients(self):
from .magnum import Magnum
self.class_logger.info('Adding Magnum clients.')
self.magnum = Magnum(self)
[docs] def _get_settings(self, file_name=None):
"""Load environment config from file.
Args:
file_name(str): Name of a json file with a test environment configuration.
Raises:
TAFCoreException: configuration file is not found
IOError: error on reading configuration file
Returns:
dict: dict of the selected configuration.
Notes:
This method shouldn't be used outside this class.
Use "config" attribute to access environment configuration.
"""
if not file_name:
self.class_logger.info("Environment file isn't set." +
" All configurations will be taken from setup file.")
# Return empty dict
return {}
path_to_config = environment.get_conf_file(conf_name=file_name, conf_type="env")
if not path_to_config:
message = "Specified configuration file %s not found." % file_name,
raise TAFCoreException(message)
try:
config = json.loads(open(path_to_config).read())
except:
message = "Cannot read specified configuration: %s" % path_to_config
self.class_logger.error(message)
raise IOError(message)
return next(cfg for cfg in config if cfg['instance_type'] == 'settings')
[docs] def cleanup(self):
self.class_logger.info('Cleaning virtual environment...')
# Order defined by unittest: tearDown -> doCleanups -> tearDownClass
for cleaner in [self.handle.tearDown,
self.handle.doCleanups,
self.handle.tearDownClass]:
try:
cleaner()
except Exception:
self.class_logger.exception('Exception in tempest cleanup -> %s', cleaner.__name__)
continue
[docs] def is_DPDK(self, force_check=False):
if force_check or self.ovs_type is None:
return self._is_DPDK()
return 'dpdk' in self.ovs_type
def _is_DPDK(self):
# TODO assess the presence of DPDK in the OpenStack installation
return False
[docs] def create_loginable_secgroup_rule(self):
return self.handle._create_loginable_secgroup_rule() # pylint: disable=protected-access
[docs] def wait_for_server_status(self, vm_id, status):
# waiters wrapper method
from tempest.common import waiters
return waiters.wait_for_server_status(self.handle.servers_client, vm_id, status)
[docs] def ensure_public_access(self, try_reuse=False, networks_client=None, routers_client=None,
name=None, tenant_id=None, create_external_router=True):
"""Create or reuse public/external router & network.
Args:
try_reuse(bool): attempt at resusing the public router/network or delete it
networks_client:
routers_client:
name:
tenant_id:
create_external_router(bool): whether or not creation/reuse has been successful
"""
if not routers_client:
routers_client = self.handle.os_adm.routers_client
if not networks_client:
networks_client = self.handle.os_adm.networks_client
if not name:
name = self.tempest_lib.common.utils.data_utils.rand_name('tempest-public-network')
if not tenant_id:
tenant_id = self.tenant_id
_net_cfg = self.config.network
_mgmt_ip_cidr = self.env_settings.get('mgmt_ip_cidr')
assert _mgmt_ip_cidr
net_ip = netaddr.IPNetwork(_mgmt_ip_cidr)
# Try to reuse existing stuff that meets requirements, if desirable (devstack)
if try_reuse and self._reuse_public_access(net_ip, routers_client=routers_client):
self.class_logger.debug('Reused')
return True
self.class_logger.debug('No reuse')
# create new public router & network
public_network_kwargs = {
'routers_client': routers_client,
'networks_client': networks_client,
'delete_external': True,
'name': name,
'tenant_id': tenant_id,
}
public_network = self.create_public_network(**public_network_kwargs)
assert public_network
subnet_name = self.tempest_lib.common.utils.data_utils.rand_name('tempest-public-subnet')
allocation_prefix = _mgmt_ip_cidr.rsplit('.', 1)[0]
subnet_kwargs = {
'cidr': '{}/{}'.format(net_ip.network, net_ip.prefixlen),
'routers_client': routers_client,
'name': subnet_name,
'tenant_id': tenant_id,
'ip_version': 4,
'allocation_pools': [{
'start': '{}.100'.format(allocation_prefix),
'end': '{}.254'.format(allocation_prefix)}],
'gateway_ip': net_ip.ip,
'enable_dhcp': False,
}
self._create_subnet(public_network['id'], **subnet_kwargs)
_net_cfg.public_network_id = public_network['id']
if create_external_router:
router_kwargs = {
'routers_client': routers_client,
'network_id': _net_cfg.public_network_id,
'tenant_id': tenant_id,
'enable_snat': True,
}
public_router = self.create_router(**router_kwargs)
assert public_router
_net_cfg.public_router_id = public_router['id']
return _net_cfg.public_network_id and (not create_external_router or _net_cfg.public_router_id)
def _get_external_elements(self, routers_client=None):
if not routers_client:
routers_client = self.handle.os_adm.routers_client
net_filter = {'router:external': True}
nets = [net['id'] for net in self.handle._list_networks(**net_filter)] # pylint: disable=protected-access
router_2_net_map = {}
routers_resp = routers_client.list_routers()
for router in routers_resp['routers']:
ext_gw_info = router.get('external_gateway_info')
if ext_gw_info:
net_id = ext_gw_info.get('network_id')
if net_id and net_id in nets:
router_2_net_map[router['id']] = net_id
return router_2_net_map
[docs] def _delete_external_elements(self, routers_client=None, networks_client=None,
ports_client=None):
"""Look for the external routers & networks and delete them.
"""
if not routers_client:
routers_client = self.handle.os_adm.routers_client
if not networks_client:
networks_client = self.handle.os_adm.networks_client
router_2_net_map = self._get_external_elements(routers_client=routers_client)
for router_id, network_id in router_2_net_map.items():
self.class_logger.debug('Removing external router: (%s)',
routers_client.show_router(router_id)['router']['name'])
self.delete_router(router_id)
self.class_logger.debug('Removing external network: (%s)',
networks_client.show_network(network_id)['network']['name'])
networks_client.delete_network(network_id)
[docs] def _reuse_public_access(self, mgmt_net, routers_client=None):
"""Search for the external routers & networks.
"""
if not routers_client:
routers_client = self.handle.os_adm.routers_client
_net_cfg = self.config.network
router_2_net_map = self._get_external_elements(routers_client=routers_client)
if router_2_net_map:
for router_id, net_id in router_2_net_map.items():
rt_obj = routers_client.show_router(router_id)['router']
ext_ips = rt_obj['external_gateway_info']['external_fixed_ips']
for ip in ext_ips:
if ip['ip_address'] in mgmt_net:
_net_cfg.public_router_id, _net_cfg.public_network_id = router_id, net_id
return True
return False
[docs] def create_router(self, routers_client=None, name=None, network_id=None, tenant_id=None,
enable_snat=False, **kwargs):
"""Create a router.
Args:
network_id(str): id of the network subnet of which to connect to the router
Returns:
dict: the created router
"""
if not routers_client:
routers_client = self.handle.os_adm.routers_client
if not name:
name = self.tempest_lib.common.utils.data_utils.rand_name('tempest-router')
if not tenant_id:
tenant_id = self.tenant_id
ext_gw_info = {}
if network_id:
ext_gw_info['network_id'] = network_id
if enable_snat:
ext_gw_info['enable_snat'] = True
router_kwargs = merge_dicts(
kwargs,
{
'name': name,
'tenant_id': tenant_id,
'external_gateway_info': ext_gw_info,
},
)
router_resp = routers_client.create_router(**router_kwargs)
router = router_resp['router']
if router:
self.handle.addCleanup(routers_client.delete_router, router['id'])
return router
[docs] def add_router_interface(self, router_id, routers_client=None, subnet_id=None):
"""
"""
if not routers_client:
routers_client = self.handle.os_adm.routers_client
routers_client.add_router_interface(router_id, subnet_id=subnet_id)
iface_cleaner = routers_client.remove_router_interface
self.handle.addCleanup(iface_cleaner, router_id, subnet_id=subnet_id)
[docs] def delete_router(self, router_id, routers_client=None, ports_client=None):
"""
"""
if not routers_client:
routers_client = self.handle.os_adm.routers_client
if not ports_client:
ports_client = self.handle.os_adm.ports_client
clients = {
'routers_client': routers_client,
'ports_client': ports_client,
}
self._remove_router_interfaces(router_id, **clients)
routers_client.delete_router(router_id)
[docs] def _remove_router_interfaces(self, router, routers_client=None, ports_client=None):
"""
"""
if not routers_client:
routers_client = self.handle.os_adm.routers_client
if not ports_client:
ports_client = self.handle.os_adm.ports_client
router_id = None
try:
if isinstance(router, dict) and router['id']:
pass
else:
router = routers_client.show_router(router)['router']
router_id = router['id']
except IndexError:
raise Exception("Invalid router parameter specified: {}".format(router))
router_ports = ports_client.list_ports(device_id=router_id)['ports']
for port in router_ports:
try:
routers_client.remove_router_interface(router_id, port_id=port['id'])
ports_client.delete_port(port['id'])
except self.tempest_lib.exceptions.NotFound:
pass
routers_client.update_router(router_id, external_gateway_info={})
[docs] def create_network(self, with_router=False, with_subnet=False, name=None, tenant_id=None,
**kwargs):
"""Create standard network, subnet, router.
Args:
with_router(bool): whether or not to add the network to tenant_id router
with_subnet(bool): whether or not to create a subnet for the network
name:
tenant_id:
kwargs:
"""
if with_router and not with_subnet:
raise Exception('Cannot add network w/o subnet to router (with_router=True,\
with_subnet=False)')
if not name:
name = self.tempest_lib.common.utils.data_utils.rand_name('tempest-network')
if not tenant_id:
tenant_id = self.tenant_id
network_kwargs = merge_dicts(
kwargs,
{
'name': name,
'tenant_id': tenant_id,
},
)
subnet_kwargs = {
'name': self.tempest_lib.common.utils.data_utils.rand_name('tempest-subnet'),
'tenant_id': tenant_id,
'enable_dhcp': True,
}
subnet = None
router = None
network = self._create_bare_network(**network_kwargs)
assert network
if with_subnet:
subnet = self._create_subnet(network['id'], **subnet_kwargs)
assert subnet
if with_router:
router = self.handle._get_router(tenant_id=tenant_id) # pylint: disable=protected-access
self.add_router_interface(router['id'], subnet_id=subnet['id'])
return network, subnet, router
[docs] def create_port(self, network_id, name=None, ports_client=None, tenant_id=None, **kwargs):
if not ports_client:
ports_client = self.handle.os_adm.ports_client
if not name:
name = self.tempest_lib.common.utils.data_utils.rand_name('tempest-port')
if not tenant_id:
tenant_id = self.tenant_id
port_kwargs = merge_dicts(
kwargs,
{
'name': name,
'tenant_id': tenant_id,
'network_id': network_id,
},
)
port = ports_client.create_port(**port_kwargs)['port']
if port:
self.handle.addCleanup(ports_client.delete_port, port['id'])
return port
[docs] def create_public_network(self, routers_client=None, networks_client=None, ports_client=None,
name=None, tenant_id=None, delete_external=False):
"""Creates a public networks with an optional subnet.
Args:
routers_client:
networks_client:
ports_client:
name:
tenant_id:
delete_external(bool): whether or not to delete already existing networks/routers
"""
if not routers_client:
routers_client = self.handle.os_adm.routers_client
if not networks_client:
networks_client = self.handle.os_adm.networks_client
if not ports_client:
ports_client = self.handle.os_adm.ports_client
if not name:
name = self.tempest_lib.common.utils.data_utils.rand_name('tempest-public-network')
if not tenant_id:
tenant_id = self.tenant_id
if delete_external:
del_external_kwargs = {
'routers_client': routers_client,
'networks_client': networks_client,
'ports_client': ports_client,
}
self._delete_external_elements(**del_external_kwargs)
network_kwargs = {
'networks_client': networks_client,
'name': name,
'tenant_id': tenant_id,
'router:external': True,
}
return self._create_bare_network(**network_kwargs)
[docs] def _create_bare_network(self, networks_client=None, name=None, tenant_id=None, **kwargs):
"""Creates a network.
There is no router and no subnet created for this network.
Returns:
dict: the created bare network
"""
if not networks_client:
networks_client = self.handle.os_adm.networks_client
if not name:
name = self.tempest_lib.common.utils.data_utils.rand_name('tempest-network-bare')
if not tenant_id:
tenant_id = self.tenant_id
network_kwargs = merge_dicts(
kwargs,
{
'name': name,
'tenant_id': tenant_id,
},
)
network_resp = networks_client.create_network(**network_kwargs)
network = network_resp['network']
if network:
self.handle.addCleanup(networks_client.delete_network, network['id'])
return network
[docs] def _create_subnet(self, network_id,
routers_client=None, networks_client=None, subnets_client=None,
cidr=None, mask_bits=None, name=None, tenant_id=None,
pool_start=None, pool_end=None, **kwargs):
"""Create subnet for the specified network.
The allocation range is defined for ONP lab environment.
Args:
network_id(str): the network to create the subnet in
cidr(str): IP/mask of the mgmt interface, None for project default
mask_bits(int): subnet mask length in bits
pool_start(int): allocation pool first host ipv4 tuple (host part)
pool_end(int): allocation pool last host ipv4 tuple (host part)
Returns:
DeletableSubnet: the created subnet
"""
if not routers_client:
routers_client = self.handle.os_adm.routers_client
if not subnets_client:
subnets_client = self.handle.os_adm.subnets_client
if not name:
name = self.tempest_lib.common.utils.data_utils.rand_name('tempest-subnet')
if not tenant_id:
tenant_id = self.tenant_id
if cidr is None:
cidr = self.config.network.project_network_cidr
tenant_cidr = netaddr.IPNetwork(cidr)
if mask_bits is None:
mask_bits = self.config.network.project_network_mask_bits
else:
tenant_cidr = netaddr.IPNetwork(cidr)
if mask_bits is None:
mask_bits = tenant_cidr.prefixlen
def cidr_in_use(cidr, tenant_id):
cidr_in_use = self.handle._list_subnets(tenant_id=tenant_id, cidr=cidr) # pylint: disable=protected-access
return len(cidr_in_use) != 0
def alloc_pools(cidr, start, end):
it = cidr.iter_hosts()
if start is not None:
it = itertools.dropwhile(lambda x: not str(x).endswith(str(start)),
cidr.iter_hosts())
if end is not None:
it = itertools.takewhile(lambda x: not str(x).endswith(str(end)), it)
return it
subnet = None
for subnet_cidr in tenant_cidr.subnet(mask_bits):
cidr_str = str(subnet_cidr)
if cidr_in_use(cidr_str, tenant_id=tenant_id):
continue
allocation_pools = {}
if pool_start or pool_end:
hosts_it = alloc_pools(subnet_cidr, pool_start, pool_end)
hosts = [h for h in hosts_it]
if hosts:
pool = {
'start': hosts[0],
'end': hosts[-1],
}
allocation_pools.update({'allocation_pools': [pool]})
subnet_kwargs = merge_dicts(
kwargs,
allocation_pools,
{
'network_id': network_id,
'cidr': cidr_str,
'name': name,
'tenant_id': tenant_id,
'ip_version': 4,
},
)
from tempest.lib import exceptions as lib_exc
try:
subnet_resp = subnets_client.create_subnet(**subnet_kwargs)
self.handle.assertIsNotNone(subnet_resp, 'Unable to allocate tenant network')
subnet = subnet_resp['subnet']
if subnet:
self.handle.addCleanup(subnets_client.delete_subnet, subnet['id'])
break
except lib_exc.Conflict as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise e
return subnet
[docs] def create_image(self, name, fmt, path, disk_format=None):
"""Create OpenStack image.
Args:
name:
fmt:
path:
disk_format:
"""
image_client = self.handle.manager.image_client_v2
self.class_logger.debug('Creating image %s', name)
with open(path, 'rb') as image_file:
params = {
'name': name,
'container_format': fmt,
'disk_format': disk_format if disk_format else fmt,
}
image = image_client.create_image(**params)
assert image['status'] == "queued"
self.class_logger.debug('Storing image ...')
image_client.store_image_file(image['id'], image_file)
return image
[docs] def get_image_by_name(self, img_name):
image_client = self.handle.manager.image_client_v2
regex = re.compile(self.IMAGE_NAME_PATTERN.format(img_name))
try:
image = next(img for img in image_client.list_images()['images']
if img['name'] == img_name or regex.search(img['name']))
except StopIteration:
pass
else:
self.class_logger.debug('Desired image (%s) for instance found', img_name)
return image
# image not in glance (yet) - find it on disk and upload it
try:
image = next(img for img in os.listdir(self.images_path)
if img == img_name or regex.search(img))
except StopIteration:
raise OpenStackNoSuchImage("Image {} not found in {}".format(img_name,
self.images_path))
else:
match = regex.search(image)
image = self.create_image(os.path.basename(image),
match.group('cont_frmt'),
os.path.join(self.images_path, image),
match.group('disk_frmt'))
return image
[docs] def create_flavor(self, name, ram=64, disk=0, vcpus=1, **kwargs):
"""Create flavor.
Args:
name(str):
ram(int):
disk(int):
vcpus(int):
kwargs:
"""
params = {
'name': name,
'ram': ram,
'disk': disk,
'vcpus': vcpus,
}
flavor_id = kwargs.pop('flavor_id', '')
if flavor_id:
params['id'] = flavor_id
flv_client = self.handle.os_adm.flavors_client
rand_prefix = params['name']
num_retry = 10
flavor = None
for _ in range(num_retry):
try:
resp = flv_client.create_flavor(**params)
except self.tempest_lib.exceptions.Conflict as exc:
self.class_logger.debug('Conflict: %s', exc)
params['id'] = None
params['name'] = self.tempest_lib.common.utils.data_utils.rand_name(rand_prefix)
else:
flavor = resp['flavor']
break
assert flavor
self.handle.addCleanup(flv_client.delete_flavor,
flavor['id'])
if kwargs:
try:
flv_client.set_flavor_extra_spec(flavor['id'], **kwargs)
except self.tempest_lib.exceptions.RestClientException:
try:
for key, val in kwargs.items():
resp = flv_client.show_flavor_extra_spec(flavor['id'], key)
if resp.get(key) != val:
flv_client.update_flavor_extra_spec(flavor['id'], key, val)
except self.tempest_lib.exceptions.RestClientException:
pass
return flavor
def _get_flavors(self, flavors_client=None, do_show=False, do_extra_specs=False):
if not flavors_client:
flavors_client = self.handle.os_adm.flavors_client
flavors_map = {}
flavors = flavors_client.list_flavors()['flavors']
for f in flavors:
flavors_map[f['id']] = f_spec = {}
if do_show:
f_show = flavors_client.show_flavor(f['id'])['flavor']
f_spec.update(f_show)
if do_extra_specs:
f_extra = flavors_client.list_flavor_extra_specs(f['id'])
f_spec.update(f_extra)
return flavors_map
[docs] def get_flavors(self, flavors_client=None):
return self._get_flavors(flavors_client=flavors_client, do_show=True, do_extra_specs=True)
[docs] def get_flavor_by_spec(self, flavor_spec, flavors_client=None):
"""Flavor retrieval helper method.
Attempts to satisfy the requirements in the flavor specification input parameter.
The 'name' property is preference only (but the combination of the other properties is
overwhelming) and serves mostly for storage purposes instead, naming the newly created
flavor should the need for one arise - in case of the spec reqs not met by the already
existing ones.
"""
if not flavors_client:
flavors_client = self.handle.os_adm.flavors_client
flavors_map = self.get_flavors(flavors_client=flavors_client)
name = None
if 'name' in flavor_spec:
name = flavor_spec.pop('name')
def cmp_spec(f):
_not_found = object()
for k, v in flavor_spec.items():
if v != f.get(k, _not_found):
return False
return True
matching_specs = list(filter(cmp_spec, flavors_map.values()))
if matching_specs:
if name:
try:
# return THE Desired flavor (with the matching name)
return next(f for f in matching_specs if f['name'] == name)
except StopIteration:
# return A Desired flavor (with a different name)
return matching_specs[0]
else:
# return A Desired flavor (with a different name)
return matching_specs[0]
else:
if name:
try:
if next(f for f in flavors_map.values() if f['name'] == name):
# except on THE Undesired flavor (with the matching name)
raise Exception('Flavor conflict: EEXIST with different specs')
except StopIteration:
pass
else:
name = 'generic-flavor'
# create and return THE Desired flavor
return self.create_flavor(name=name, **flavor_spec)
[docs] def get_server_port_map(self, server, ip_addr=None):
ports = self.handle._list_ports(device_id=server['id'], fixed_ip=ip_addr) # pylint: disable=protected-access
port_map = [
(p['id'], fxip['ip_address'])
for p in ports for fxip in p['fixed_ips']
if netaddr.valid_ipv4(fxip['ip_address']) and fxip['ip_address'] == ip_addr['addr']
]
return port_map
[docs] def allow_forwarding(self, server_id):
"""Allowing forwarding
If a virtual instance is to forward a traffic,
the security extension 'port_security' must be allowed in the
Open Stack. This allows create ports with port_security_enabled
set to False. If this is set to false, then certain iptable
rules are not generated and anti-spoofing checks are not done.
"""
ip_addresses = self.get_interfaces(server_id)
port_ids = (ip['port_id'] for ip in ip_addresses)
kwargs = {
'security_groups': [],
'port_security_enabled': False,
}
for a_port_id in port_ids:
self.handle.ports_client.update_port(a_port_id, **kwargs)
[docs] def get_aggregates(self):
return self.handle.os_adm.aggregates_client.list_aggregates()
[docs] def get_avail_zones(self):
return self.handle.os_adm.availability_zone_client.list_availability_zones()
[docs] def get_compute_nodes(self):
return self.handle.os_adm.hypervisor_client.list_hypervisors()
[docs] def get_hosts(self):
"""Returns an array of dictionaries with available hypervisors.
Example of the output::
[{u'hypervisor_hostname': u'pod4-compute2',
u'id': 1,
u'state': u'up',
u'status': u'enabled'},
{u'hypervisor_hostname': u'pod4-compute1',
u'id': 2,
u'state': u'up',
u'status': u'enabled'}]
"""
client = self.handle.os_adm.hosts_client
hosts = client.list_hosts()
return hosts.get('hosts', hosts)
[docs] def get_interfaces(self, server_id):
interface_client = self.handle.interface_client
return interface_client.list_interfaces(server_id)['interfaceAttachments']
[docs] def get_ips(self, server_id):
iface_client = self.handle.interface_client
return [ip['ip_address']
for val in iface_client.list_interfaces(server_id)['interfaceAttachments']
for ip in val['fixed_ips']]
[docs] def get_host_zone_maps(self, service_type='compute'):
hosts = self.get_hosts()
z2h_map = {}
h2z_map = {}
for h in hosts:
if service_type == h['service']:
h_zone = h['zone']
h_name = h['host_name']
z2h_map.setdefault(h_zone, set()).add(h_name)
h2z_map.setdefault(h_name, set()).add(h_zone)
def dj_rec(map_level, zone_closed, zone_open, host_closed):
"""'disjoint recursive' zone/host maps build helper.
GOAL: disjoint host aggregates to hosts mapping
TODO: Need another helper to traverse the map-tree
TODO: Use host aggregates instead of availability zones?
"""
cnt_level = 0
_cnt_level = 0
zone_not_disjoint = (z for z in zone_open if not z2h_map[z].isdisjoint(host_closed))
for z in zone_not_disjoint:
_hosts = z2h_map[z] - host_closed
if _hosts:
cnt_level = 1
_zone_open = zone_open - {z}
_zone_closed = zone_closed | {z}
_host_closed = host_closed | _hosts
map_level[z] = {}
r = dj_rec(map_level[z], _zone_closed, _zone_open, _host_closed)
_cnt_level = max(_cnt_level, r)
return cnt_level + _cnt_level
the_map = {}
dj_rec(the_map, set(), set(z2h_map.keys()), set())
return z2h_map, h2z_map, the_map
[docs] def get_single_host_zones(self, z2h=None):
if z2h is None:
z2h, h2z, disjoint = self.get_host_zone_maps()
single_host_zones = [zone for zone in z2h.keys() if len(z2h[zone]) == 1 if zone != 'nova']
return single_host_zones
[docs] def get_hosts_without_aggregate(self, z2h=None):
if z2h is None:
z2h, h2z, distinct = self.get_host_zone_maps()
# hosts out of any aggregate are grouped under
# 'nova' availability zone
return z2h.get('nova', [])
def _delete_aggregate(self, aggregate):
self.handle.os_adm.aggregates_client.delete_aggregate(aggregate['id'])
def _create_aggregate(self, **kwargs):
aggregate = self.handle.os_adm.aggregates_client.create_aggregate(**kwargs)
aggregate = aggregate['aggregate']
self.handle.addCleanup(self._delete_aggregate, aggregate)
assert kwargs['name'] == aggregate['name']
assert kwargs['availability_zone'] == aggregate['availability_zone']
return aggregate
[docs] def create_aggregate(self, name="aggr", availability_zone="zone"):
"""Creates an aggregate and availability zone.
"""
aggregate_name = "{}-{}".format(name, int(time.time()))
zone_name = "{}-{}".format(availability_zone, int(time.time()))
kwargs = {
'name': aggregate_name,
'availability_zone': zone_name,
}
self.class_logger.info("Creating new aggregate %s.", aggregate_name)
self.class_logger.info("Creating new availability zone %s.", zone_name)
return self._create_aggregate(**kwargs)
[docs] def remove_host_from_aggregate(self, aggregate_id, host):
aggregate = self.handle.os_adm.aggregates_client.remove_host(aggregate_id, host=host)
assert host not in aggregate['aggregate']['hosts']
[docs] def add_host_to_aggregate(self, aggregate_id, host):
aggregates_client = self.handle.os_adm.aggregates_client
aggregate = aggregates_client.add_host(aggregate_id, host=host)['aggregate']
self.handle.addCleanup(self.remove_host_from_aggregate,
aggregate['id'],
host)
assert host in aggregate['hosts']
return True
[docs] def create_server(self, nets=None, ports=None, zone=None, image=None, flavor=None, **kwargs):
"""Create instance in OpenStack.
Args:
nets(list[dict]representing OpenStack network(,subnet,router)s): networks for the virtual instance to be created
ports(list[dict] representing OpenStack ports): ports for the virtual instance to be createds
zone(dict representing OpenStack availability-zone): availability-zone for the virtual instance to be created
image(dict representing OpenStack image): image for the virtual instance to be created
flavor(dict representing OpenStack flavor): flavor for the virtual instance to be created
Returns:
GenericLinuxVirtualHost
"""
if ports is None:
ports = []
if nets is None:
nets = []
if ports or nets:
_ports = [{'port': port['id']} for port in ports]
_nets = [{'uuid': network['id']} for (network, _, _) in nets]
kwargs['networks'] = _ports + _nets
if zone:
kwargs['availability_zone'] = zone
if self.key['name']:
kwargs['key_name'] = self.key['name']
# If the image name is not given, the img_ref from tempest.conf will be used.
# TODO
if image:
kwargs['image_id'] = image['id']
if flavor:
kwargs['flavor'] = flavor['id']
else:
kwargs['flavor'] = self.DEFAULT_FLAVOR['id']
self.class_logger.debug('Create server with:\n%s', pprint.pformat(kwargs))
server = self.handle.create_server(**kwargs)
self.class_logger.info('Created server:\n%s', pprint.pformat(server))
server_dict = {
'name': server['name'],
'id': server['id'],
'entry_type': 'openstack',
'instance_type': 'instance',
'ipaddr': None,
'ssh_port': image.get('ssh_port', 22),
'ssh_user': image.get('ssh_user', 'root'),
}
_ssh_pass = image.get('ssh_pass')
if _ssh_pass:
server_dict['ssh_pass'] = _ssh_pass
else:
server_dict['ssh_pkey'] = self.key.get('private_key')
new_server = GenericLinuxVirtualHost(server_dict, self.opts)
new_server.tempest_ui = self.handle
new_server.os_networks = nets
new_server.os_ports = ports
return new_server
[docs] def assign_floating_ip(self, host, private_net_name=None, public_network_id=None,
management=False):
"""Assign floating IP to ACTIVE instance.
Args:
host(GenericLinuxVirtualHost): instance to wich assign the floating IP
private_net_name(str): private networks name, if not specified takes last from
host.os_networks
public_network_id: public network id, if not specified using one from tempest.conf
management: is this floating IP be management IP via which we communicate with the
instance
Returns:
generated floating IP
"""
# the management interface is the one into the last (right most network in)
# host.os_networks or the right most port in host.os_ports if host.os_networks is empty.
# That's because the json is constructed as networks = ports + nets (see the create_server)
server = self.get_nova_instance(host.id)
last_port = None
# If private_net_name not defined use the last one
if private_net_name:
list_ip_addresses = server['addresses'].get(private_net_name)
elif host.os_networks:
list_ip_addresses = server['addresses'][host.os_networks[-1][0]['name']]
# we are interested in the first IP only
ip_address_dict = list_ip_addresses[0]
ip_address = ip_address_dict['addr']
elif host.os_ports:
last_port = host.os_ports[-1]
ip_address = last_port['fixed_ips'][-1]['ip_address']
port_id = last_port['id']
if not last_port:
port_map = self.get_server_port_map(server, ip_address_dict)
self.class_logger.debug(port_map)
port_id, _ = port_map[0]
if not public_network_id:
public_network_id = self.config.network.public_network_id
floating_ip = self.handle.create_floating_ip(server, public_network_id, port_id=port_id)
self.class_logger.debug("floating ip: %s <-> fixed ip=%s",
floating_ip['floating_ip_address'],
ip_address)
if management:
host.nated_mgmt = ip_address
host._set_ssh(floating_ip['floating_ip_address']) # pylint: disable=protected-access
return floating_ip['floating_ip_address']
[docs] def get_nova_instance(self, instance_id):
return self.handle.servers_client.show_server(instance_id)['server']
@only_with_neutron_extension('sfc')
[docs] def create_port_pair(self, **kwargs):
sfc_client = self.handle.sfc_client
port_pair = sfc_client.create_port_pair(**kwargs)['port_pair']
self.handle.addCleanup(sfc_client.delete_port_pair, port_pair['id'])
return port_pair
@only_with_neutron_extension('sfc')
[docs] def create_port_pair_group(self, **kwargs):
sfc_client = self.handle.sfc_client
port_pair_group = sfc_client.create_port_pair_group(**kwargs)['port_pair_group']
self.handle.addCleanup(sfc_client.delete_port_pair_group,
port_pair_group['id'])
return port_pair_group
@only_with_neutron_extension('sfc')
[docs] def update_port_pair_group(self, **kwargs):
sfc_client = self.handle.sfc_client
return sfc_client.update_port_pair_group(**kwargs)['port_pair_group']
@only_with_neutron_extension('sfc')
[docs] def update_port_chain(self, **kwargs):
sfc_client = self.handle.sfc_client
return sfc_client.update_port_chain(**kwargs)['port_chain']
@only_with_neutron_extension('sfc')
[docs] def create_flow_classifier(self, **kwargs):
sfc_client = self.handle.sfc_client
flow_classifier = sfc_client.create_flow_classifier(**kwargs)['flow_classifier']
self.handle.addCleanup(sfc_client.delete_flow_classifier,
flow_classifier['id'])
return flow_classifier
@only_with_neutron_extension('sfc')
[docs] def create_port_chain(self, **kwargs):
sfc_client = self.handle.sfc_client
port_chain = sfc_client.create_port_chain(**kwargs)['port_chain']
self.handle.addCleanup(sfc_client.delete_port_chain,
port_chain['id'])
return port_chain
[docs] def list_security_groups(self):
return self.handle.manager.security_groups_client.list_security_groups()
[docs] def create_security_group_rule(self, **kwargs):
return self.handle.manager.security_group_rules_client.create_security_group_rule(**kwargs)
[docs] def delete_security_group_rule(self, rule_id):
return self.handle.manager.security_group_rules_client.delete_security_group_rule(rule_id)
[docs] def get_all_instances(self):
client = self.handle.manager.servers_client
instances = [self.get_nova_instance(i['id'])
for i in client.list_servers()['servers']]
return instances
[docs] def get_host_instance_dict(self, instances=None):
"""Gets a dictionary of details of all instances in the current project grouped by hostId.
"""
if instances is None:
instances = self.get_all_instances()
res = {}
for x in instances:
res.setdefault(x['hostId'], []).append(x)
return res
[docs] def get_services(self, service_filter=None):
client = self.handle.admin_manager.services_client
services = client.list_services()['services']
if service_filter is None:
return services
return filter(service_filter, services)
[docs] def enable_service(self, service):
client = self.handle.admin_manager.services_client
client.enable_service(host=service['host'], binary=service['binary'])
[docs] def disable_service(self, service):
client = self.handle.admin_manager.services_client
client.disable_service(host=service['host'], binary=service['binary'])
self.handle.addCleanup(self.enable_service, service)