-
Notifications
You must be signed in to change notification settings - Fork 0
/
execution_pty.py
122 lines (91 loc) · 3.66 KB
/
execution_pty.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import fcntl
import logging
import os
import pty
import subprocess
import sys
import time
import execution
script_encodings = {}
def set_script_encoding(command_identifier, encoding):
script_encodings[command_identifier] = encoding
class PtyProcessWrapper(execution.ProcessWrapper):
pty_master = None
pty_slave = None
encoding = None
def __init__(self, command, command_identifier, working_directory):
if command_identifier in script_encodings:
self.encoding = script_encodings[command_identifier]
else:
self.encoding = sys.stdout.encoding
execution.ProcessWrapper.__init__(self, command, command_identifier, working_directory)
def init_process(self, command, working_directory):
master, slave = pty.openpty()
self.process = subprocess.Popen(command,
cwd=working_directory,
stdin=slave,
stdout=slave,
stderr=slave,
start_new_session=True)
self.pty_slave = slave
self.pty_master = master
fcntl.fcntl(self.pty_master, fcntl.F_SETFL, os.O_NONBLOCK)
def write_to_input(self, value):
input_value = value
if not input_value.endswith("\n"):
input_value += "\n"
os.write(self.pty_master, input_value.encode())
def wait_finish(self):
self.process.wait()
def pipe_process_output(self):
try:
while True:
finished = False
wait_new_output = False
max_read_bytes = 1024
if self.is_finished():
data = b""
while True:
try:
chunk = os.read(self.pty_master, max_read_bytes)
data += chunk
if len(chunk) < max_read_bytes:
break
except BlockingIOError:
break
finished = True
else:
data = ""
try:
data = os.read(self.pty_master, max_read_bytes)
if data.endswith(b"\r"):
data += os.read(self.pty_master, 1)
if data and (self.encoding.lower() == "utf-8"):
while data[len(data) - 1] >= 127:
next_byte = os.read(self.pty_master, 1)
if not next_byte:
break
data += next_byte
except BlockingIOError:
if self.is_finished():
finished = True
if not data:
wait_new_output = True
if data:
output_text = data.decode(self.encoding)
self.output.put(output_text)
if finished:
break
if wait_new_output:
time.sleep(0.01)
except:
self.output.put("Unexpected error occurred. Contact the administrator.")
logger = logging.getLogger("execution")
try:
self.kill()
except:
logger.exception("PtyProcessWrapper. Failed to kill a process")
logger.exception("PtyProcessWrapper. Failed to read script output")
finally:
os.close(self.pty_master)
os.close(self.pty_slave)