forked from fgimian/paramiko-expect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
paramiko_expect.py
executable file
·396 lines (343 loc) · 16.3 KB
/
paramiko_expect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
#
# Paramiko Expect
#
# Written by Fotis Gimian
# http://github.com/fgimian
#
# This library works with a Paramiko SSH channel to provide native SSH
# expect-like handling for servers. The library may be used to interact
# with commands like 'configure' or Cisco IOS devices or with interactive
# Unix scripts or commands.
#
# You must have Paramiko installed in order to use this library.
#
from __future__ import unicode_literals
import codecs
import sys
import re
import socket
import struct
import time
import logging
# Windows does not have termios
try:
import termios
import tty
has_termios = True
MAX_TIMEOUT = 2 ** (struct.Struct(str('i')).size * 8 - 1) - 1
except ImportError: # pragma: no cover
import threading
has_termios = False
MAX_TIMEOUT = threading.TIMEOUT_MAX
import select
def strip_ansi_codes(s):
return re.sub(r'\x1b\[([0-9,A-Z]{1,2}(;[0-9]{1,2})?(;[0-9]{3})?)?[m|K]?|\?(1049|2004)[hl]', '', s)
def default_output_func(msg):
sys.stdout.write(msg)
sys.stdout.flush()
class SSHClientInteraction(object):
"""
This class allows an expect-like interface to Paramiko which allows
coders to interact with applications and the shell of the connected
device.
:param client: A Paramiko SSHClient object
:param timeout: The connection timeout in seconds
:param newline: The newline character to send after each command
:param buffer_size: The amount of data (in bytes) that will be read at
a time after a command is run
:param display: Whether or not the output should be displayed in
real-time as it is being performed (especially useful
when debugging)
:param encoding: The character encoding to use.
:param lines_to_check: The number of last few lines of the output to
look at, while matching regular expression(s)
:param term: terminal type used with invoke shell
"""
def __init__(
self, client, timeout=60, newline='\r', buffer_size=1024,
display=False, encoding='utf-8', output_callback=default_output_func,
tty_width=80, tty_height=24, lines_to_check=1, term='vt100'
):
self.channel = client.invoke_shell(term=term, width=tty_width, height=tty_height)
self.timeout = timeout
self.newline = newline
self.buffer_size = buffer_size
self.display = display
self.encoding = encoding
self.output_callback = output_callback
self.lines_to_check = lines_to_check
self.current_output = ''
self.current_output_clean = ''
self.current_send_string = ''
self.last_match = ''
# If the output is long, multi-byte encoded characters may be split
# across calls to recv, so decode incrementally.
self.decoder = codecs.getincrementaldecoder(self.encoding)()
def __del__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def close(self):
"""Attempts to close the channel for clean completion."""
try:
self.channel.close()
except Exception:
pass
def expect(
self, re_strings='', timeout=None, output_callback=None, default_match_prefix='.*\n',
strip_ansi=True, ignore_decode_error=True, lines_to_check=None
):
"""
This function takes in a regular expression (or regular expressions)
that represent the last line of output from the server. The function
waits for one or more of the terms to be matched. The regexes are
matched using expression \n<regex>$ so you'll need to provide an
easygoing regex such as '.*server.*' if you wish to have a fuzzy
match.
:param re_strings: Either a regex string or list of regex strings
that we should expect; if this is not specified,
then EOF is expected (i.e. the shell is completely
closed after the exit command is issued)
:param timeout: Timeout in seconds. If this timeout is exceeded,
then an exception is raised.
:param output_callback: A function used to print ssh output. Printed to stdout
by default. A user-defined logger may be passed like
output_callback=lambda m: mylog.debug(m)
:param default_match_prefix: A prefix to all match regexes, defaults to '.*\n',
can set to '' on cases prompt is the first line,
or the command has no output.
:param strip_ansi: If True, will strip ansi control chars befores regex matching
default to True.
:param ignore_decode_error: If True, will ignore decode errors if any.
default to True.
:param lines_to_check: The number of last few lines of the output to
look at, while matching regular expression(s)
:return: An EOF returns -1, a regex metch returns 0 and a match in a
list of regexes returns the index of the matched string in
the list.
:raises: A TimeoutError exception is raised on timeout.
"""
output_callback = output_callback if output_callback else self.output_callback
# Set the channel timeout
timeout = timeout if timeout else self.timeout
self.channel.settimeout(timeout)
lines_to_check = lines_to_check if lines_to_check else self.lines_to_check
if ignore_decode_error:
self.decoder = codecs.getincrementaldecoder(self.encoding)('ignore')
# Create an empty output buffer
self.current_output = ''
# saves the current buffer to check for re_strings pattern
current_buffer_output_decoded = ''
# This function needs all regular expressions to be in the form of a
# list, so if the user provided a string, let's convert it to a 1
# item list.
if isinstance(re_strings, str) and len(re_strings) != 0:
re_strings = [re_strings]
# to avoid looping in recv_ready()
base_time = time.time()
# Loop until one of the expressions is matched or loop forever if
# nothing is expected (usually used for exit)
while (
len(re_strings) == 0 or
not [re_string
for re_string in re_strings
if re.match(default_match_prefix + re_string + '$',
current_buffer_output_decoded, re.DOTALL)]
):
current_buffer_output_decoded = ''
# avoids paramiko hang when recv is not ready yet
while not self.channel.closed and not self.channel.recv_ready():
time.sleep(.009)
if time.time() >= (base_time + timeout):
logging.info('EXCESS TIME RECV_READY TIMEOUT, did you expect() before a send()?')
raise TimeoutError('{} not in response, {}s timeout exceeded'.format(re_strings,timeout))
# Read some of the output
current_buffer = self.channel.recv(self.buffer_size)
# If we have an empty buffer, then the SSH session has been closed
if len(current_buffer) == 0:
break
# Convert the buffer to our chosen encoding
current_buffer_decoded = self.decoder.decode(current_buffer)
# Strip all ugly \r (Ctrl-M making) characters from the current
# read
current_buffer_decoded = current_buffer_decoded.replace('\r', '')
# Display the current buffer in realtime if requested to do so
# (good for debugging purposes)
if strip_ansi:
current_buffer_decoded = strip_ansi_codes(current_buffer_decoded)
if not current_buffer_decoded:
continue
if self.display:
output_callback(current_buffer_decoded)
# Add the currently read buffer to the output
self.current_output += current_buffer_decoded
current_buffer_output_decoded = '\n' + '\n'.join(self.current_output.splitlines()[-lines_to_check:])
# Grab the first pattern that was matched
if len(re_strings) != 0:
found_pattern = [(re_index, re_string)
for re_index, re_string in enumerate(re_strings)
if re.match(default_match_prefix + re_string + '$',
self.current_output, re.DOTALL)]
# Clean the output up by removing the sent command
self.current_output_clean = self.current_output
if len(self.current_send_string) != 0:
self.current_output_clean = (
self.current_output_clean.replace(
self.current_send_string + self.newline, ''
)
)
# Reset the current send string to ensure that multiple expect calls
# don't result in bad output cleaning
self.current_send_string = ''
# Clean the output up by removing the expect output from the end if
# requested and save the details of the matched pattern
if len(re_strings) != 0 and len(found_pattern) != 0:
self.current_output_clean = (
re.sub(
found_pattern[0][1] + '$', '', self.current_output_clean
)
)
self.last_match = found_pattern[0][1]
return found_pattern[0][0]
else:
# We would socket timeout before getting here, but for good
# measure, let's send back a -1
return -1
def send(self, send_string, newline=None):
"""Saves and sends the send string provided."""
self.current_send_string = send_string
# send_string, _ = codecs.getdecoder(self.encoding)(send_string)
newline = newline if newline is not None else self.newline
# don't send till send_ready
while not self.channel.send_ready():
time.sleep(.009)
self.channel.send(send_string)
self.channel.send(newline)
def tail(
self, line_prefix=None, callback=None, output_callback=None, stop_callback=lambda x: False,
timeout=None
):
"""
This function takes control of an SSH channel and displays line
by line of output as \n is recieved. This function is specifically
made for tail-like commands.
:param line_prefix: Text to append to the left of each line of output.
This is especially useful if you are using my
MultiSSH class to run tail commands over multiple
servers.
:param callback: You may optionally supply a callback function which
takes two paramaters. The first is the line prefix
and the second is current line of output. The
callback should return the string that is to be
displayed (including the \n character). This allows
users to grep the output or manipulate it as
required.
:param output_callback: A function used to print ssh output. Printed to stdout
by default. A user-defined logger may be passed like
output_callback=lambda m: mylog.debug(m)
:param stop_callback: A function usesd to stop the tail, when function retruns
True tail will stop, by default stop_callback=lambda x: False
:param timeout: how much time to wait for data, default to None which
mean almost forever.
"""
output_callback = output_callback if output_callback else self.output_callback
# Set the channel timeout to the maximum allowed value,
# setting this to None breaks the KeyboardInterrupt exception and
# won't allow us to Ctrl+C out of the script
timeout = timeout if timeout else MAX_TIMEOUT
self.channel.settimeout(timeout)
# Create an empty line buffer and a line counter
current_line = b''
line_counter = 0
line_feed_byte = '\n'.encode(self.encoding)
# Loop forever, Ctrl+C (KeyboardInterrupt) is used to break the tail
while True:
# Read the output one byte at a time so we can detect \n correctly
buffer = self.channel.recv(1)
# If we have an empty buffer, then the SSH session has been closed
if len(buffer) == 0:
break
# Add the currently read buffer to the current line output
current_line += buffer
# Display the last read line in realtime when we reach a \n
# character
if buffer == line_feed_byte:
current_line_decoded = self.decoder.decode(current_line)
if line_counter:
if callback:
output_callback(callback(line_prefix, current_line_decoded))
else:
if line_prefix:
output_callback(line_prefix)
output_callback(current_line_decoded)
if stop_callback(current_line_decoded):
break
line_counter += 1
current_line = b''
def take_control(self):
"""
This function is a better documented and touched up version of the
posix_shell function found in the interactive.py demo script that
ships with Paramiko.
"""
if has_termios:
# Get attributes of the shell you were in before going to the
# new one
original_tty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
tty.setcbreak(sys.stdin.fileno())
# We must set the timeout to 0 so that we can bypass times when
# there is no available text to receive
self.channel.settimeout(0)
# Loop forever until the user exits (i.e. read buffer is empty)
while True:
select_read, select_write, select_exception = (
select.select([self.channel, sys.stdin], [], [])
)
# Read any output from the terminal and print it to the
# screen. With timeout set to 0, we just can ignore times
# when there's nothing to receive.
if self.channel in select_read:
try:
buffer = self.channel.recv(self.buffer_size)
if len(buffer) == 0:
break
sys.stdout.write(self.decoder.decode(buffer))
sys.stdout.flush()
except socket.timeout:
pass
# Send any keyboard input to the terminal one byte at a
# time
if sys.stdin in select_read:
buffer = sys.stdin.read(1)
if len(buffer) == 0:
break
self.channel.send(buffer)
finally:
# Restore the attributes of the shell you were in
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, original_tty)
else:
# We must set the timeout to None so that we can bypass times when
# there is no available text to receive
self.channel.settimeout(None)
def writeall(sock):
while True:
buffer = sock.recv(self.buffer_size)
if len(buffer) == 0:
break
sys.stdout.write(self.decoder.decode(buffer))
sys.stdout.flush()
writer = threading.Thread(target=writeall, args=(self.channel,))
writer.start()
try:
while True:
buffer = sys.stdin.read(1)
if len(buffer) == 0:
break
self.channel.send(buffer)
# User has hit Ctrl+Z or F6
except EOFError:
pass