# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""``clissh.py``
`Module contains classes for managing device using SSH connection or SSH connection emulation for Linux Network Namespaces`
"""
import socket
import time
from io import StringIO
import curses.ascii as ascii_char
from contextlib import closing
import paramiko
from . import loggers
from .custom_exceptions import CLISSHException
from .cli_template import CLIGenericMixin
from .cli_template import CmdStatus
[docs]def probe_port(ipaddr, port, logger):
"""Check if device listen on port.
Args:
ipaddr(str): IP address
port(int): SSH port
logger(loggers.ClassLogger): logger instance
Returns:
bool: True or False
Notes:
This verification is necessary before establishing ssh connection.
"""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(3)
try:
time.sleep(1)
sock.connect((ipaddr, port))
except (socket.gaierror, socket.error):
logger.debug("IP address %s port %s doesn't respond" % (ipaddr, port))
return False
else:
logger.debug("IP address %s port %s opened" % (ipaddr, port))
return True
[docs]class CLISSH(CLIGenericMixin):
"""Class for configure device using CLI over ssh with paramiko. Unused parameters added to support the same interface for other CLI classes.
Examples::
client = CLISSH("1.1.1.1", 22)
client.login("username", "paSSword")
"""
class_logger = loggers.ClassLogger()
[docs] def __init__(self, host, port=22, username=None, password=None,
page_break=None, prompt=None, pass_prompt="Password:", sudo_prompt=None, login_prompt=None, page_break_lines=None,
exit_cmd=None, timeout=60, quiet=False, pkey=None, key_filename=None):
"""Initialize CLISSH class.
Agrs:
host(str): Target host IP address.
port(int): SSH port.
username(str): SSH login user.
password(str): SSH user password.
page_break(str): Page brake marker.
prompt(str, list[str]): Shell prompt or list of shell prompts.
pass_prompt(str): Login password prompt.
sudo_prompt(str): Sudo password prompt.
timeout(int): Default timeout for commands.
login_prompt(str): Login prompt.
page_break_lines(int): Number of page brake lines.
exit_cmd(str): Command to perform telnet exit.
quiet(bool): Flag for return code verification.
"""
super(CLISSH, self).__init__()
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.host = host
self.port = port
self.username = username
self.password = password
self.page_break = page_break
self.prompt = prompt
self.passprompt = pass_prompt
self.sudoprompt = sudo_prompt
self.timeout = timeout
if isinstance(pkey, str):
pkey = paramiko.RSAKey.from_private_key(StringIO(str(pkey)))
self.pkey = pkey
self.key_filename = key_filename
self.shell = None
self.prompt_stack = []
self.timesleep = 1
self.delay = None
self.login_status = False
# Default action: raise an exception if command's exit code isn't 0 or not.
self.quiet = quiet
[docs] def login(self, username=None, password=None, timeout=None):
"""Do CLI object login procedure.
Args:
username(str): Host login (string).
password(str): Host password(string).
timeout(int): Time to execute login procedure (integer).
Returns:
None
"""
if self.login_status:
self.class_logger.debug("SSH client is already opened.")
if self.check_client():
return
else:
self.close()
self.class_logger.debug("SSH client is dead. Reconnect...")
username = username or self.username
password = password or self.password
timeout = timeout or self.timeout
self.class_logger.debug("Connecting to {0}@{1}...".format(username, self.host))
self.client.connect(self.host,
self.port,
username,
password,
timeout=timeout,
pkey=self.pkey,
key_filename=self.key_filename,)
transport = self.client.get_transport()
transport.packetizer.REKEY_PACKETS = 2**35
transport.packetizer.REKEY_BYTES = 2**35
self.username = username
self.login_status = True
[docs] def check_client(self):
"""Check if SSH client is alive.
"""
try:
transport = self.client.get_transport()
result = transport.is_active()
return result
except AttributeError:
return False
[docs] def close(self):
"""Close CLI object connection.
"""
self.class_logger.debug("Closing connection to {0}@{1}...".format(self.username, self.host))
self.client.close()
self.shell = None
self.login_status = False
[docs] def open_shell(self, timeout=20, raw_output=False):
"""Create interactive SSH shell on existing connection.
Args:
timeout(int): Timeout until prompt is appeared.
raw_output(bool): Flag whether to read output buffer.
Raises:
CLISSHException: not connected
"""
output = ""
if self.login_status and self.check_client():
if self.check_shell():
self.class_logger.debug("Shell for {0}@{1} is already invoked.".format(self.username, self.host))
return
self.class_logger.debug("Opening shell for {0}@{1} ...".format(self.username, self.host))
# to avoid command line wrapping increase width as big as possible
shell = self.client.invoke_shell(width=1000, height=1000)
self.shell = self.prepare_ssh_shell_obj(shell)
if self.prompt and not raw_output:
alter = []
# Add one or few expected prompt(s) and action(s) to alternatives list
if isinstance(self.prompt, list):
for single_prompt in self.prompt:
alter.append((single_prompt, None, True, False))
elif isinstance(self.prompt, str):
alter.append((self.prompt, None, True, False))
output = self.action_on_expect(self.shell, alter, timeout=timeout)
elif not raw_output:
output = self.shell_read(0.5)
self.class_logger.info("Shell is opened:\n{0}".format(output))
return output
else:
raise CLISSHException("Cannot invoke shell before connecting.")
[docs] def close_shell(self):
"""Close interactive CLI shell on existing connection.
"""
if self.shell and not self.shell.closed:
self.class_logger.debug("Closing shell for {0}@{1} ...".format(self.username, self.host))
self.shell.close()
[docs] def check_shell(self):
"""Check if CLI connection is alive.
"""
return not getattr(self.shell, "closed", True)
[docs] def _check_shell_obj(self):
"""Check if shell object exists.
Raises:
CLISSHException: shell is not open
"""
if not self.check_shell():
raise CLISSHException("Cannot execute command. Shell is not open.")
[docs] def exec_command(self, command, timeout=None):
"""Execute command without shell (tty).
Args:
command(str): Command to be executed.
timeout(int): Timeout for command execution.
Returns:
tuple(str, str, int): tuple of stdout, stderr, rc
"""
self.class_logger.debug("{0}@{1}: {2}".format(self.username, self.host, command))
if timeout is None:
timeout = self.timeout
_, stdout, stderr = self.client.exec_command(command, timeout=timeout)
out = stdout.read().decode('UTF-8')
err = stderr.read().decode('UTF-8')
exit_status = stdout.channel.recv_exit_status()
self.class_logger.debug(self.cmd_output_log(out, err))
return CmdStatus(out, err, exit_status)
[docs] def shell_command(self, command, alternatives=None, timeout=None, sudo=False, ret_code=True, expected_rc="0",
quiet=None, raw_output=False, interval=0.1, tabulation=None):
"""Run interactive command on previously created shell (tty).
Args:
command(str): Command to be executed.
alternatives(tuple): Tuples of ("expected line", "action if line is found", <Exit execution? (bool)>, <Use ones? (bool)>).
action can be:
- str - in case this is just command;
- function - callable object to execute without parameters;
timeout(int): Expecting timeout.
sudo(bool): Flag if sudo should be added to the list of alternatives.
ret_code(bool): Flag if return code should be added to the list of alternatives.
expected_rc(int): Sets return code and verifies if return code of executed command the same as expected return code (int or str).
quiet(bool): Flag to verify if expected return equals expected.
raw_output(bool): Flag whether to return 'pure' output.
interval(int | float): Interval between read data cycles.
tabulation(str): Tabulation characters.
Raises:
CLISSHException: unexpected return code
"""
self._check_shell_obj()
self.class_logger.debug("{0}@{1}: {2}".format(self.username, self.host, command))
if timeout is None:
timeout = self.timeout
if quiet is None:
quiet = self.quiet
if isinstance(expected_rc, int):
expected_rc = str(expected_rc)
data = ""
return_code = None
command, alternatives, end_pattern = self.prepare_alter(command, alternatives, sudo, ret_code)
#
# THIS HAS A BUG WHEN THE COMMAND LINE IS TOO LONG WRAPPING DOENS"T
# WORK.
# DON"T USE THIS!!!!
#
if tabulation:
self.shell.sendall(command + tabulation)
# Three spaces added because some CLI commands after executing return mesh output.
else:
self.shell.sendall(command + " \n")
data = self.action_on_expect(self.shell, alternatives, timeout, interval)
# Clearing console line from previous command.
if tabulation:
self.shell.sendall(ascii_char.ctrl("u"))
if not raw_output:
data, return_code = self.normalize_output(data, command, ret_code, end_pattern)
self.class_logger.debug("Command output:\n{0}".format(data))
if ret_code and not quiet:
if return_code != expected_rc:
raise CLISSHException("Command return unexpected return code: {0}".format(return_code))
return data, return_code
[docs] def shell_read(self, timeout=0, interval=0.1):
"""Read data from output buffer.
Args:
timeout(int): Increases time to read data from output buffer.
interval(int): Time delay between attempts to read data from output buffer.
"""
self._check_shell_obj()
data = ""
# The following loop has to be executed at least one time.
end_time = time.time() + timeout
end_flag = False
while not end_flag:
data += self.shell.read() # pylint: disable=no-member
if time.time() >= end_time:
end_flag = True
else:
time.sleep(interval)
return data
[docs] def send_command(self, command):
"""Run command without waiting response.
Args:
command(str): Command to be executed.
"""
self.class_logger.debug("{0}@{1}: {2}".format(self.username, self.host, command))
self._check_shell_obj()
if isinstance(command, self.Raw):
self.shell.sendall(command)
else:
self.shell.sendall(command + "\n")
[docs] def _transfer_file(self, direction, src, dst, proto="scp"):
"""Transfer file from/to remote host.
Args:
direction(str): transfer direction. set/get.
src(str): Source file location.
dst(str): Destination file location.
proto(str): Protocol to be used for file transfer. scp(default)/sftp.
Raises:
CLISSHException: direction not in {"put", "get"}
"""
if direction not in {"put", "get"}:
raise CLISSHException("Incorrect file transfer direction '%s'." %
(direction, ))
if proto == "scp":
self._scp_trans_file(direction, src, dst)
elif proto == "sftp":
self._sftp_trans_file(direction, src, dst)
[docs] def _scp_trans_file(self, direction, src, dst):
"""Transfer file from/to remote host using scp.
Args:
direction(str): transfer direction. set/get.
src(str): Source file location.
dst(str): Destination file location.
Raises:
CLISSHException: not supported direction "put"
"""
if direction == "get":
with open(dst, 'wb') as local_file:
local_file.write(
self.client.exec_command('cat "{0}"'.format(src))[1].read())
elif direction == "put":
raise CLISSHException("Currently 'put' method is not supported for 'scp'")
[docs] def _sftp_trans_file(self, direction, src, dst):
"""Transfer file from/to remote host using sftp.
Args:
direction(str): transfer direction. set/get.
src(str): Source file location.
dst(str): Destination file location.
"""
ftp = None
try:
ftp = self.client.open_sftp()
if isinstance(src, str):
src = [src, ]
dst = [dst, ]
for _src, _dst in zip(src, dst):
getattr(ftp, direction)(_src, _dst)
finally:
if ftp is not None:
ftp.close()
[docs] def get_file(self, src, dst, proto="sftp"):
"""Get file from remote host using sftp.
Args:
src(str): Source file location.
dst(str): Destination file location.
proto(str): Protocol to be used for file transfer. sftp(default)/scp.
"""
self._transfer_file("get", src, dst, proto=proto)
[docs] def put_file(self, src, dst, proto="sftp"):
"""Put file to remote host using sftp.
Args:
src(str): Source file location.
dst(str): Destination file location.
proto(str): Protocol to be used for file transfer. sftp(default)/scp.
"""
self._transfer_file("put", src, dst, proto=proto)