# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""``caselogger.py``
`Caselogger functionality`
"""
import re
import time
from . import clissh
[docs]class CaseLogger(object):
"""Base class for switch caselogger functionality.
"""
[docs] def __init__(self, sw, logger):
"""Initialize CaseLogger instance.
Args:
sw(SwitchGeneral): Switch instance
logger(ClassLogger): Logger instance
"""
self.sw = sw
self.logger = logger
# Get related config
for conf_id in sw.config["related_conf"]:
if sw.config["related_conf"][conf_id]["name"] == "loghost":
conf = sw.config["related_conf"][conf_id]
self.rhost = conf["ip_host"]
self.rport = conf["sshtun_port"]
self.rlogin = conf["sshtun_user"]
self.rpassw = conf["sshtun_pass"]
self.rssh = clissh.CLISSH(self.rhost, port=self.rport, username=self.rlogin, password=self.rpassw)
self.close_ssh = False
[docs] def switch_connect(self):
"""Open ssh connection to the switch.
"""
if not self.sw.ssh.check_shell():
self.sw.ssh.login()
self.sw.ssh.open_shell()
self.close_ssh = True
[docs] def switch_disconnect(self):
"""Close ssh connection to the switch.
"""
if self.close_ssh:
self.sw.ssh.close()
[docs] def get_test_case_logs(self, test_name, timestamp, logtype):
"""Get test case logs.
Args:
test_name(str): Test case name
timestamp(int): Test case timestamp
logtype(str): Logs type to store (Single, All)
"""
if self.sw.status:
# Open ssh connection and use management tool for log files if exists
self.switch_connect()
out, err, rc = self.sw.ssh.exec_command("journalctl -h")
if rc != 127:
self.sw.ssh.exec_command("journalctl -b > /var/log/messages")
if logtype == "Single":
log_name = "_".join([test_name, str(timestamp).replace('.', '_'), self.sw.name])
assert re.match(r'\w+', log_name), "Log file {0} contains incorrect symbols".format(log_name)
get_log = "/tmp/{0}.gz".format(log_name)
buf_log = "/tmp/{0}.gz".format(log_name)
put_log = "/home/loguser/logs/{0}.gz".format(log_name)
logcomm = "sed -n '/[QA].*started.*{0}/,/[QA].*teardown.*{0}/p' /var/log/messages > /tmp/{1}".format(str(timestamp), log_name)
zipcomm = "gzip '/tmp/{0}'".format(log_name)
command = "{0} && {1}".format(logcomm, zipcomm)
command_timeout = 15
elif logtype == "All":
get_log = "/tmp/logs_{0}_{1}_{2}.tar.gz".format(self.sw.name, test_name, timestamp)
buf_log = "/tmp/logs_{0}_{1}_{2}.tar.gz".format(self.sw.name, test_name, timestamp)
put_log = "/home/loguser/logs/logs_{0}_{1}_{2}.tar.gz".format(self.sw.name, test_name, timestamp)
command = "tar -h -czf '/tmp/logs_{0}_{1}_{2}.tar.gz' /var/log/".format(self.sw.name, test_name, timestamp)
command_timeout = 120
self.sw.ssh.shell_command(command, timeout=command_timeout, alternatives=[
('password', self.sw._sshtun_pass, False, True), ], sudo=True, ret_code=True, quiet=True) # pylint: disable=protected-access
self.sw.ssh.get_file(get_log, buf_log, proto="scp")
self.rssh.login()
self.rssh.open_shell()
try:
self.rssh.put_file(buf_log, put_log, proto="sftp")
finally:
self.rssh.close()
self.sw.ssh.shell_command("rm -- '{0}'".format(get_log), timeout=25, alternatives=[
('password', self.sw._sshtun_pass, False, True), ], sudo=True, ret_code=True, quiet=True) # pylint: disable=protected-access
self.switch_disconnect()
[docs] def get_core_logs(self, suite_name):
"""Get test suite core logs.
Args:
suite_name(str): Test suite name
"""
if self.sw.status:
self.switch_connect()
# Get core
buf_list = []
_core_entry = self.sw.ssh.exec_command("ls /var/preserve")
if _core_entry.stdout != '':
for core_file in _core_entry[0].split('\n'):
if 'gz' in core_file:
get_core = "/var/preserve/{0}".format(core_file)
buf_core = "/tmp/{0}".format(core_file)
buf_list.append(buf_core)
self.sw.ssh.get_file(get_core, buf_core, proto="scp")
self.sw.ssh.shell_command("rm -- '{0}'".format(get_core), alternatives=[
('password', self.sw._sshtun_pass, False, True), ], timeout=15, sudo=True, ret_code=True, quiet=True) # pylint: disable=protected-access
self.switch_disconnect()
self.rssh.login()
self.rssh.open_shell()
try:
# Put cores if exist:
if buf_list:
timestamp = time.time()
self.rssh.exec_command("mkdir /home/loguser/cores/{0}".format(self.sw.name, ))
self.rssh.exec_command("mkdir /home/loguser/cores/{0}/{1}_{2}".format(self.sw.name, suite_name, timestamp))
for core in buf_list:
put_core = "/home/loguser/cores/{0}/{1}_{2}/{3}".format(self.sw.name, suite_name, timestamp, core.split('/')[-1])
self.rssh.put_file(core, put_core, proto="sftp")
finally:
self.rssh.close()