# coding: utf-8
# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""``pytest_loganalyzer.py``
`Plugin is checking device's log after each test case is executed`
"""
import re
import os
import json
from itertools import tee
from collections import defaultdict
import pytest
from . import loggers
from .pytest_onsenv import setup_scope
[docs]def pytest_addoption(parser):
"""Describe plugin specified options.
"""
group = parser.getgroup("log_analyzer", "plugin log analyzer")
group.addoption(
"--log_analyzer", action="store", default="True",
choices=["False", "True"],
help="Enable/Disable device log analyzer tool(False | True). '%default' by default.")
[docs]def pairwise(iterable):
"""s -> (s0,s1), (s1,s2), (s2, s3), ...
"""
a, b = tee(iterable)
next(b, None)
return zip(a, b)
[docs]class LogAnalyzer(object):
class_logger = loggers.ClassLogger()
start_time = {}
command = "date '+%Y-%m-%d %T'"
runtest_setup_flag = {}
# Error records with specific error level to show. E.g.: '3' means any error messages with
# priorities 0, 1, 2, 3.
error_priority = 3
IGNORE_RES = [
re.compile('^Runtime journal is using '),
]
[docs] def __init__(self, switch):
"""Initialize LogAnalyzer instance.
Args:
switch(dict{SwitchGeneral}): switch dictionary
"""
self.switch = switch
self.item_skip_log_marker = None
[docs] def ignore(self, entry):
"""Check if duplicate can be ignored.
Args:
entry(dict): journalctl json decoded dict
Returns:
bool: if we should ignore this message when checking duplicates
"""
# Ignore these duplicates. First in a long line of forthcoming false positives
# Jun 03 00:50:59 rr12or systemd-journal[338]: Runtime journal is using 8.0M (max allowed 399.0M, trying to leave 598.5M free of 3.8G available → current limit 399.0M).
# Jun 03 00:50:59 rr12or systemd-journal[338]: Runtime journal is using 8.0M (max allowed 399.0M, trying to leave 598.5M free of 3.8G available → current limit 399.0M).
return any(regexp.search(entry['MESSAGE']) for regexp in self.IGNORE_RES)
[docs] def _check_log_duplicates(self, log_output):
"""Check if duplicated records are present in device log.
Args:
log_output(str): output of device log
"""
json_lines = (json.loads(x, encoding='utf-8') for x in log_output.splitlines())
duplicates = defaultdict(list)
for lineno, log_entry in enumerate(json_lines):
# use the exact message, no need to lower or strip
# message is unicode
if not self.ignore(log_entry):
duplicates[log_entry["MESSAGE"]].append(lineno)
for duplicate, duplicated_indexes in duplicates.items():
if len(duplicated_indexes) > 1:
for a, b in pairwise(duplicated_indexes):
assert abs(a - b) > 1, "Log duplication found in journalctl:\n %s" % duplicate
[docs] def _check_log_errors(self, log_output):
"""Check if errors with specific priority are present in device log.
Args:
log_output(str): output of device log
"""
# just count the newlines
lines = log_output.count(os.linesep)
assert lines <= 1, "%s error message(s) with priority in range 0..%s are present in " \
"journalctl: \n\n%s" % \
(lines - 1, self.error_priority, log_output)
[docs] def setup(self):
"""Initialize start time of test run.
"""
for switch_id, switch in self.switch.items():
switch.ssh.login()
switch.ssh.open_shell()
try:
self.start_time[switch_id] = switch.ssh.exec_command(self.command, 5).stdout.strip()
self.runtest_setup_flag[switch_id] = True
except Exception as err:
self.runtest_setup_flag[switch_id] = False
self.class_logger.debug(
"Command %s was not executed successfully."
"Therefore loganalyzer plugin checks will not be executed for switch %s. "
"\n%s", self.command, switch_id, err)
finally:
switch.ssh.close_shell()
[docs] def teardown(self):
"""Check for errors and duplicated records in device log (journalctl).
"""
journalctl_all_cmd = "journalctl -o json --since '{0}' --until '{1}'"
journalctl_priority_cmd = "journalctl --priority={0} --since '{1}' --until '{2}'"
end_time = {}
# item_skip_log_marker value should be set before teardown
skip_flag = self.item_skip_log_marker is not None
for switch_id, switch in self.switch.items():
if self.runtest_setup_flag[switch_id]:
switch.ssh.login()
switch.ssh.open_shell()
try:
try:
end_time[switch_id] = switch.ssh.exec_command(self.command,
5).stdout.strip()
if not skip_flag:
output = switch.ssh.exec_command(
journalctl_all_cmd.format(self.start_time[switch_id],
end_time[switch_id]),
5).stdout
errors_output = switch.ssh.exec_command(
journalctl_priority_cmd.format(self.error_priority,
self.start_time[switch_id],
end_time[switch_id]),
5).stdout
self.start_time[switch_id] = end_time[switch_id]
except Exception as err:
self.class_logger.debug(
"Command was not executed successfully, therefore skipping "
"loganalyzer plugin... \n%s ", err)
else:
if not skip_flag:
try:
self._check_log_duplicates(output)
self._check_log_errors(errors_output)
except AssertionError as err:
# log errors are unicode, decode to string
pytest.fail(str(err).encode('utf-8'))
finally:
switch.ssh.close_shell()
[docs]class LogAnalyzerPlugin(object):
@pytest.fixture(scope=setup_scope(), autouse=True)
[docs] def log_analyzer_setup(self, env_main):
"""Setup LogAnalyzer plugin after devices had been started.
Args:
env_main(common3.Env): pytest fixture
Returns:
LogAnalyzer: LogAnalyzer instance
"""
log_wrapper = LogAnalyzer(env_main.switch)
log_wrapper.setup()
return log_wrapper
@pytest.fixture(autouse=True)
[docs] def log_analyzer(self, request, env, log_analyzer_setup):
"""Validate device's logs on test teardown.
Args:
request(pytest.request): pytest request
log_analyzer_setup(LogAnalyzer): LogAnalyzer instance
"""
# Set item_skip_log_marker for LogAnalyzer instance before teardown
log_analyzer_setup.item_skip_log_marker = request.node.get_marker("skip_loganalyzer")
request.addfinalizer(log_analyzer_setup.teardown)