Source code for taf.testlib.syslog

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``syslog.py``

`Class for remote loggin functionality`

"""

import re
import time

from testlib import loggers
from testlib import clissh
from .custom_exceptions import SysLogException


[docs]class SystemLog(object): """Syslog object class. """ class_logger = loggers.ClassLogger()
[docs] def __init__(self, host=None, user=None, pasw=None, log_path=None): """Initialize SystemLog class. Args: host(str): Host IP address user(str): Host user pasw(str): Host password log_path(str): Path to logs """ self.host = host self.user = user self.pasw = pasw self.log_path = log_path self.ssh = clissh.CLISSH(self.host)
[docs] def __del__(self): """Try to close CLI connection on object destroy. """ self.ssh.close()
[docs] def get_checkpoint(self): """Get syslog checkpoint. Notes: Checkpoint it's an integer value which can be used for log reading from checkpoint till the end. Returns: int: checkpoint Examples:: syslog_1 = syslog.SystemLog("1.1.1.1", "usr", "PaSsWd", "/some/path") cp = syslog_1.get_checkpoint() # cp = 57899427 """ try: check_alive_ssh = self.ssh.client.open_sftp() check_alive_ssh.close() self.class_logger.debug('SSH session is active.') except Exception: self.class_logger.debug('ERROR: No active SSH.') self.class_logger.debug('Connecting...') self.ssh.login(self.user, self.pasw) # open sftp: sftp = self.ssh.client.open_sftp() # get remote logfile: remote_log_file = sftp.open(self.log_path) # go to end of remote log file: remote_log_file.seek(0, 2) # get end position for logfile checkpoint = remote_log_file.tell() remote_log_file.close() sftp.close() # return end position for logfile return checkpoint
[docs] def get_log(self, checkpoint=None, timeout=60): """Read log from remote host. Args: checkpoint(int): Log file checkpoint timeout(int): Get log timeout Raises: SysLogException: syslog timeout exceeded Returns: tuple: log_list (list), new_checkpoint (integer) Notes: If checkpoint present read from checkpoint till the end, if not read full file. Examples:: log_list, checkpoint = get_log(34134654, 30) """ log_list = [] checkpoint_temp = 0 try: check_alive_ssh = self.ssh.client.open_sftp() check_alive_ssh.close() self.class_logger.debug('SSH session is active.') except Exception: self.class_logger.debug('ERROR: No active SSH.') self.class_logger.debug('Connecting...') self.ssh.login(self.user, self.pasw) sftp = self.ssh.client.open_sftp() remote_log_file = sftp.open(self.log_path, bufsize=50000) if checkpoint is None: self.class_logger.debug('No checkpoint value. Full log file will be download it can take a while.') # when no checkpoint parameter, read full file and return new checkpoint log_list = remote_log_file.readlines() checkpoint_new = remote_log_file.tell() else: # wait until new records appear; compare checkpoint with temp_checkpoint end_time = time.time() + timeout while checkpoint >= checkpoint_temp: if time.time() >= end_time: self.ssh.close() message = "Syslog Timeout exceeded. No Syslog messages in %s second" % (timeout,) raise SysLogException(message) # move in end of file remote_log_file.seek(0, 2) # get end possition checkpoint_temp = remote_log_file.tell() # wait 0.5 sec to repeat operation time.sleep(0.5) # move to checkpoint possition remote_log_file.seek(checkpoint) # read remote log file: log_list = remote_log_file.readlines() # get new_checkpoint - log file end possition: checkpoint_new = remote_log_file.tell() self.class_logger.debug('log file is successfully readed.') remote_log_file.close() sftp.close() # return logfile and end possition for logfile return log_list, checkpoint_new
[docs] def search(self, log_list, paterntuple): """Find command records in log file. Args: log_list(list): logfiles paterntuple(tuple): commands part what function should find in loglist Returns: list: find_list_result Examples:: \>>> test_command = 'sshd', '', 'Accepted password' \>>> syslog_1.search(test_log, test_command) ['Aug 10 08:54:03 <platform> sshd[13275]: Accepted password for root from 127.0.0.1 port 59212 ssh2\n', 'Aug 10 08:54:03 <platform> sshd[13275]: Accepted password for root from 127.0.0.1 port 59212 ssh2\n'] """ # create empty list for search result: find_list_result = [] # backslashed input commands from paterntuple: # Compile a regular expression pattern into a regular expression object, patern consist from findlist elements: log_regexp = re.compile(r"(%s.*?%s.*?%s.*?\n)" % (re.escape(paterntuple[0]), re.escape(paterntuple[1]), re.escape(paterntuple[2]))) # applied regular expression object to input list and if result not empty append it to find_list_result list: find_list_result = [x for x in log_list if log_regexp.findall(x)] return find_list_result
[docs] def find_duplicated_records(self, log_list): """Find duplicated records in log file. Args: log_list(list): logfiles Returns: list: None or list of duplicated records """ # compare list length: if len(set(log_list)) == len(log_list): return None # if present duplicate records, find and return list with duplicated records: log_duplicate = [] for index, row in enumerate(log_list): if row in log_list[:index]: log_duplicate.append(row) return log_duplicate