1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
import errno
import fcntl
import os
import pty
import signal
import termios
import tty
from . import py2compat
CHILD = pty.CHILD
STDIN_FILENO = pty.STDIN_FILENO
STDOUT_FILENO = pty.STDOUT_FILENO
STDERR_FILENO = pty.STDERR_FILENO
def fork(handle_window_size=False):
# copied from pty.py, with modifications
master_fd, slave_fd = openpty()
slave_name = os.ttyname(slave_fd)
pid = os.fork()
if pid == CHILD:
# Establish a new session.
os.setsid()
os.close(master_fd)
if handle_window_size:
clone_window_size_from(slave_name, STDIN_FILENO)
# Slave becomes stdin/stdout/stderr of child.
os.dup2(slave_fd, STDIN_FILENO)
os.dup2(slave_fd, STDOUT_FILENO)
os.dup2(slave_fd, STDERR_FILENO)
if (slave_fd > STDERR_FILENO):
os.close (slave_fd)
# Explicitly open the tty to make it become a controlling tty.
tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR)
os.close(tmp_fd)
else:
os.close(slave_fd)
# Parent and child process.
return pid, master_fd, slave_name
def openpty():
return pty.openpty()
def spawn(argv, master_read=pty._read, stdin_read=pty._read, handle_window_size=False):
# copied from pty.py, with modifications
# note that it references a few private functions - would be nice to not
# do that, but you know
if type(argv) == type(''):
argv = (argv,)
pid, master_fd, slave_name = fork(handle_window_size)
if pid == CHILD:
os.execlp(argv[0], *argv)
try:
mode = tty.tcgetattr(STDIN_FILENO)
tty.setraw(STDIN_FILENO)
restore = 1
except tty.error: # This is the same as termios.error
restore = 0
if handle_window_size:
signal.signal(
signal.SIGWINCH,
lambda signum, frame: _winch(slave_name, pid)
)
while True:
try:
pty._copy(master_fd, master_read, stdin_read)
except OSError as e:
if e.errno == errno.EINTR:
continue
if restore:
tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
break
os.close(master_fd)
return os.waitpid(pid, 0)[1]
def clone_window_size_from(slave_name, from_fd):
slave_fd = os.open(slave_name, os.O_RDWR)
try:
fcntl.ioctl(
slave_fd,
termios.TIOCSWINSZ,
fcntl.ioctl(from_fd, termios.TIOCGWINSZ, " " * 1024)
)
finally:
os.close(slave_fd)
def _winch(slave_name, child_pid):
clone_window_size_from(slave_name, STDIN_FILENO)
os.kill(child_pid, signal.SIGWINCH)
|