import multiprocessing
import paramiko
import select
import threading
import time
import traceback
class Connection(object):
def __init__(self, client, connection_id, publisher, keyfile):
self.transport = paramiko.Transport(client)
key = None
with open(keyfile) as f:
header = f.readline()
if header == "-----BEGIN DSA PRIVATE KEY-----\n":
key = paramiko.DSSKey(filename=keyfile)
elif header == "-----BEGIN RSA PRIVATE KEY-----\n":
key = paramiko.RSAKey(filename=keyfile)
if key is None:
raise Exception("%s doesn't appear to be an SSH keyfile" % keyfile)
self.transport.add_server_key(key)
self.connection_id = connection_id
self.publisher = publisher
self.initialized = False
self.watching_id = None
self.rpipe, self.wpipe = multiprocessing.Pipe(False)
def run(self):
self.server = Server()
self.transport.start_server(server=self.server)
self.chan = self.transport.accept(10)
if self.chan is not None:
self.server.pty_event.wait()
while True:
self.initialized = False
self.watching_id = None
streamer = self.select_stream()
if streamer is None:
break
self.watching_id = streamer["id"]
print(
"new viewer watching %s (%s)" % (
streamer["name"], streamer["id"]
)
)
self._send_all(
"\033[1;%d;1;%dr\033[m\033[H\033[2J" % (
streamer["rows"], streamer["cols"]
)
)
self.publisher.notify("new_viewer", self.watching_id)
while True:
rout, wout, eout = select.select(
[self.chan, self.rpipe],
[],
[]
)
if self.chan in rout:
c = self.chan.recv(1)
if c == b'q':
print(
"viewer stopped watching %s (%s)" % (
streamer["name"], streamer["id"]
)
)
self._cleanup_watcher()
break
if self.rpipe in rout:
self._cleanup_watcher()
break
if self.chan is not None:
self.chan.close()
self.transport.close()
def select_stream(self):
key_code = ord('a')
keymap = {}
streamers = self.publisher.request_all("get_streamers")
# XXX this will require pagination
for streamer in streamers:
key = chr(key_code)
if key == "q":
key_code += 1
key = chr(key_code)
streamer["key"] = key
keymap[key] = streamer
key_code += 1
self._display_streamer_screen(streamers)
c = self.chan.recv(1).decode('utf-8', 'ignore')
if c in keymap:
self._send_all("\033[2J\033[H")
return keymap[c]
elif c == 'q':
self._send_all("\r\n")
return None
else:
return self.select_stream()
def msg_new_data(self, connection_id, prev_buf, data, screen, updates):
if self.watching_id != connection_id:
return
if not self.initialized:
self._send_all(prev_buf)
self.initialized = True
self._send_all(data)
def msg_streamer_disconnect(self, connection_id):
if self.watching_id != connection_id:
return
self.wpipe.send("q")
def _send_all(self, data):
total_sent = 0
while total_sent < len(data):
try:
total_sent += self.chan.send(data[total_sent:])
except Exception as e:
print(traceback.format_exc())
print("*** send failed: " + str(e))
break
def _display_streamer_screen(self, streamers):
self._send_all("\033[H\033[2JWelcome to Termcast!")
self._send_all(
"\033[3H %-20s %-15s %-10s %-12s %-15s" % (
"User", "Terminal size", "Viewers", "Idle time", "Total time"
)
)
row = 4
for streamer in streamers:
key = streamer["key"]
name = streamer["name"].decode('utf-8', 'replace')
rows = streamer["rows"]
cols = streamer["cols"]
viewers = streamer["viewers"]
idle = time.time() - streamer["idle_since"]
total = time.time() - streamer["created_at"]
size = "(%dx%d)" % (cols, rows)
size_pre = ""
size_post = ""
if cols > self.server.cols or rows > self.server.rows:
size_pre = "\033[31m"
size_post = "\033[m"
self._send_all(
"\033[%dH%s) %-20s %s%-15s%s %-10s %-12s %-15s" % (
row, key, name, size_pre, size, size_post,
viewers,
self._human_readable_duration(idle),
self._human_readable_duration(total)
)
)
row += 1
self._send_all("\033[%dHChoose a stream: " % (row + 1))
def _human_readable_duration(self, duration):
days = 0
hours = 0
minutes = 0
seconds = 0
if duration > 60*60*24:
days = duration // (60*60*24)
duration -= days * 60*60*24
if duration > 60*60:
hours = duration // (60*60)
duration -= hours * 60*60
if duration > 60:
minutes = duration // 60
duration -= minutes * 60
seconds = duration
ret = "%02ds" % seconds
if minutes > 0 or hours > 0 or days > 0:
ret = ("%02dm" % minutes) + ret
if hours > 0 or days > 0:
ret = ("%02dh" % hours) + ret
if days > 0:
ret = ("%dd" % days) + ret
return ret
def _cleanup_watcher(self):
self.publisher.notify(
"viewer_disconnect", self.watching_id
)
self._send_all(
("\033[1;%d;1;%dr"
+ "\033[m"
+ "\033[?9l\033[?1000l"
+ "\033[H\033[2J") % (
self.server.rows, self.server.cols
)
)
class Server(paramiko.ServerInterface):
def __init__(self):
super()
self.cols = 80
self.rows = 24
self.pty_event = threading.Event()
def check_channel_request(self, kind, chanid):
return paramiko.OPEN_SUCCEEDED
def check_channel_pty_request(
self, channel, term, width, height, pixelwidth, pixelheight, modes
):
self.cols = width
self.rows = height
self.pty_event.set()
return True
def check_channel_window_change_request(
self, channel, width, height, pixelwidth, pixelheight
):
self.cols = width
self.rows = height
return True
def check_channel_shell_request(self, channel):
return True
def check_auth_none(self, username):
return paramiko.AUTH_SUCCESSFUL
def get_allowed_auths(self, username):
return "none"