Source code for taf.plugins.pytest_loganalyzer

# coding: utf-8
# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``pytest_loganalyzer.py``

`Plugin is checking device's log after each test case is executed`

"""

import re
import os
import json
from itertools import tee
from collections import defaultdict

import pytest

from . import loggers
from .pytest_onsenv import setup_scope


[docs]def pytest_addoption(parser): """Describe plugin specified options. """ group = parser.getgroup("log_analyzer", "plugin log analyzer") group.addoption( "--log_analyzer", action="store", default="True", choices=["False", "True"], help="Enable/Disable device log analyzer tool(False | True). '%default' by default.")
[docs]def pytest_configure(config): """Registering plugin. """ if config.option.log_analyzer == "True": config.pluginmanager.register(LogAnalyzerPlugin(), "_log_analyzer")
[docs]def pytest_unconfigure(config): """Unregistering plugin. """ log_analyzer = getattr(config, "_log_analyzer", None) if log_analyzer == "True": del config._log_analyzer config.pluginmanager.unregister(log_analyzer)
[docs]def pairwise(iterable): """s -> (s0,s1), (s1,s2), (s2, s3), ... """ a, b = tee(iterable) next(b, None) return zip(a, b)
[docs]class LogAnalyzer(object): class_logger = loggers.ClassLogger() start_time = {} command = "date '+%Y-%m-%d %T'" runtest_setup_flag = {} # Error records with specific error level to show. E.g.: '3' means any error messages with # priorities 0, 1, 2, 3. error_priority = 3 IGNORE_RES = [ re.compile('^Runtime journal is using '), ]
[docs] def __init__(self, switch): """Initialize LogAnalyzer instance. Args: switch(dict{SwitchGeneral}): switch dictionary """ self.switch = switch self.item_skip_log_marker = None
[docs] def ignore(self, entry): """Check if duplicate can be ignored. Args: entry(dict): journalctl json decoded dict Returns: bool: if we should ignore this message when checking duplicates """ # Ignore these duplicates. First in a long line of forthcoming false positives # Jun 03 00:50:59 rr12or systemd-journal[338]: Runtime journal is using 8.0M (max allowed 399.0M, trying to leave 598.5M free of 3.8G available → current limit 399.0M). # Jun 03 00:50:59 rr12or systemd-journal[338]: Runtime journal is using 8.0M (max allowed 399.0M, trying to leave 598.5M free of 3.8G available → current limit 399.0M). return any(regexp.search(entry['MESSAGE']) for regexp in self.IGNORE_RES)
[docs] def _check_log_duplicates(self, log_output): """Check if duplicated records are present in device log. Args: log_output(str): output of device log """ json_lines = (json.loads(x, encoding='utf-8') for x in log_output.splitlines()) duplicates = defaultdict(list) for lineno, log_entry in enumerate(json_lines): # use the exact message, no need to lower or strip # message is unicode if not self.ignore(log_entry): duplicates[log_entry["MESSAGE"]].append(lineno) for duplicate, duplicated_indexes in duplicates.items(): if len(duplicated_indexes) > 1: for a, b in pairwise(duplicated_indexes): assert abs(a - b) > 1, "Log duplication found in journalctl:\n %s" % duplicate
[docs] def _check_log_errors(self, log_output): """Check if errors with specific priority are present in device log. Args: log_output(str): output of device log """ # just count the newlines lines = log_output.count(os.linesep) assert lines <= 1, "%s error message(s) with priority in range 0..%s are present in " \ "journalctl: \n\n%s" % \ (lines - 1, self.error_priority, log_output)
[docs] def setup(self): """Initialize start time of test run. """ for switch_id, switch in self.switch.items(): switch.ssh.login() switch.ssh.open_shell() try: self.start_time[switch_id] = switch.ssh.exec_command(self.command, 5).stdout.strip() self.runtest_setup_flag[switch_id] = True except Exception as err: self.runtest_setup_flag[switch_id] = False self.class_logger.debug( "Command %s was not executed successfully." "Therefore loganalyzer plugin checks will not be executed for switch %s. " "\n%s", self.command, switch_id, err) finally: switch.ssh.close_shell()
[docs] def teardown(self): """Check for errors and duplicated records in device log (journalctl). """ journalctl_all_cmd = "journalctl -o json --since '{0}' --until '{1}'" journalctl_priority_cmd = "journalctl --priority={0} --since '{1}' --until '{2}'" end_time = {} # item_skip_log_marker value should be set before teardown skip_flag = self.item_skip_log_marker is not None for switch_id, switch in self.switch.items(): if self.runtest_setup_flag[switch_id]: switch.ssh.login() switch.ssh.open_shell() try: try: end_time[switch_id] = switch.ssh.exec_command(self.command, 5).stdout.strip() if not skip_flag: output = switch.ssh.exec_command( journalctl_all_cmd.format(self.start_time[switch_id], end_time[switch_id]), 5).stdout errors_output = switch.ssh.exec_command( journalctl_priority_cmd.format(self.error_priority, self.start_time[switch_id], end_time[switch_id]), 5).stdout self.start_time[switch_id] = end_time[switch_id] except Exception as err: self.class_logger.debug( "Command was not executed successfully, therefore skipping " "loganalyzer plugin... \n%s ", err) else: if not skip_flag: try: self._check_log_duplicates(output) self._check_log_errors(errors_output) except AssertionError as err: # log errors are unicode, decode to string pytest.fail(str(err).encode('utf-8')) finally: switch.ssh.close_shell()
[docs]class LogAnalyzerPlugin(object): @pytest.fixture(scope=setup_scope(), autouse=True)
[docs] def log_analyzer_setup(self, env_main): """Setup LogAnalyzer plugin after devices had been started. Args: env_main(common3.Env): pytest fixture Returns: LogAnalyzer: LogAnalyzer instance """ log_wrapper = LogAnalyzer(env_main.switch) log_wrapper.setup() return log_wrapper
@pytest.fixture(autouse=True)
[docs] def log_analyzer(self, request, env, log_analyzer_setup): """Validate device's logs on test teardown. Args: request(pytest.request): pytest request log_analyzer_setup(LogAnalyzer): LogAnalyzer instance """ # Set item_skip_log_marker for LogAnalyzer instance before teardown log_analyzer_setup.item_skip_log_marker = request.node.get_marker("skip_loganalyzer") request.addfinalizer(log_analyzer_setup.teardown)