import json import re import ssl import time import traceback import vt100 auth_re = re.compile(b'^hello ([^ ]+) ([^ ]+)$') extra_data_re = re.compile(b'\033\]499;([^\007]*)\007') clear_patterns = [ b"\033[H\033[J", b"\033[H\033[2J", b"\033[2J\033[H", # this one is from tmux - can't possibly imagine why it would choose to do # things this way, but i'm sure there's some kind of reason # it's not perfect (it's not always followed by a \e[H, sometimes it just # moves the cursor to wherever else directly), but it helps a bit lambda handler: b"\033[H\033[K\r\n\033[K" + b"".join([b"\033[1B\033[K" for i in range(handler.rows - 2)]) + b"\033[H", ] class Handler(object): def __init__(self, rows, cols): self.created_at = time.time() self.idle_since = time.time() self.rows = rows self.cols = cols self.buf = b'' self.prev_read = b'' self.vt = vt100.vt100(rows, cols) def process(self, data): to_process = self.prev_read + data processed = self.vt.process(to_process) self.prev_read = to_process[processed:] self.buf += data extra_data = {} while True: m = extra_data_re.search(self.buf) if m is None: break try: extra_data_json = m.group(1).decode('utf-8') extra_data = json.loads(extra_data_json) except Exception as e: print("failed to parse metadata: %s" % e, file=sys.stderr) pass self.buf = self.buf[:m.start(0)] + self.buf[m.end(0):] if "geometry" in extra_data: self.rows = extra_data["geometry"][1] self.cols = extra_data["geometry"][0] self.vt.set_window_size(self.rows, self.cols) for pattern in clear_patterns: if type(pattern) == type(lambda x: x): pattern = pattern(self) clear = self.buf.rfind(pattern) if clear != -1: print("found a clear") self.buf = self.buf[clear + len(pattern):] self.idle_since = time.time() def get_term(self): term = [] for i in range(0, self.rows): term.append([]) for j in range(0, self.cols): cell = self.vt.cell(i, j) term[i].append({ "c": cell.contents(), "f": cell.fgcolor(), "b": cell.bgcolor(), "o": cell.bold(), "i": cell.italic(), "u": cell.underline(), "n": cell.inverse(), "w": cell.is_wide(), }) return term def get_term_updates(self, screen): if self.rows != len(screen) or self.cols != len(screen[0]): return None changes = [] for i in range(0, self.rows): for j in range(0, self.cols): cell = self.vt.cell(i, j) cell_changes = self._diff_cell( screen[i][j], { "c": cell.contents(), "f": cell.fgcolor(), "b": cell.bgcolor(), "o": cell.bold(), "i": cell.italic(), "u": cell.underline(), "n": cell.inverse(), "w": cell.is_wide(), } ) if len(cell_changes) > 0: changes.append({ "row": i, "col": j, "cell": cell_changes, }) return changes def _diff_cell(self, prev_cell, cur_cell): cell_changes = {} for key in cur_cell: if cur_cell[key] != prev_cell[key]: cell_changes[key] = cur_cell[key] if "f" in cell_changes: cell_changes["b"] = cur_cell["b"] cell_changes["o"] = cur_cell["o"] cell_changes["n"] = cur_cell["n"] if "b" in cell_changes: cell_changes["f"] = cur_cell["f"] cell_changes["n"] = cur_cell["n"] if "o" in cell_changes: cell_changes["f"] = cur_cell["f"] if "n" in cell_changes: cell_changes["f"] = cur_cell["f"] cell_changes["b"] = cur_cell["b"] return cell_changes class Connection(object): def __init__(self, client, connection_id, publisher, pemfile): self.client = client self.connection_id = connection_id self.publisher = publisher self.pemfile = pemfile self.viewers = 0 self.context = None def run(self): auth = self._readline() if auth is None: print("no authentication found") return print(auth) if auth == b"starttls": if not self._starttls(): print("TLS connection failed") return auth = self._readline() m = auth_re.match(auth) if m is None: print("no authentication found (%s)" % auth) return print(b"got auth: " + auth) self.name = m.group(1) self.client.send(b"hello, " + self.name + b"\n") extra_data, buf = self._try_read_metadata() if "geometry" in extra_data: self.handler = Handler( extra_data["geometry"][1], extra_data["geometry"][0] ) else: self.handler = Handler(24, 80) self.handler.process(buf) while True: buf = b'' try: buf = self.client.recv(1024) except Exception as e: print(traceback.format_exc()) print('*** recv failed: ' + str(e)) if len(buf) > 0: prev_screen = self.handler.get_term() self.handler.process(buf) self.publisher.notify( "new_data", self.connection_id, self.handler.buf, buf, self.handler.get_term(), self.handler.get_term_updates(prev_screen) ) else: self.publisher.notify("streamer_disconnect", self.connection_id) return def msg_new_viewer(self, connection_id): if connection_id != self.connection_id: return self.viewers += 1 self.publisher.notify( "new_data", self.connection_id, self.handler.buf, b'', self.handler.get_term(), None ) try: self.client.send(b"msg watcher connected\n") except Exception as e: print("*** send failed (watcher connect message): " + str(e)) def msg_viewer_disconnect(self, connection_id): if connection_id != self.connection_id: return try: self.client.send(b"msg watcher disconnected\n") except Exception as e: print("*** send failed (watcher disconnect message): " + str(e)) self.viewers -= 1 def request_get_streamers(self): return { "name": self.name, "id": self.connection_id, "rows": self.handler.rows, "cols": self.handler.cols, "idle_since": self.handler.idle_since, "created_at": self.handler.created_at, "viewers": self.viewers, } def _readline(self): buf = b'' while len(buf) < 1024 and b"\n" not in buf: byte = self.client.recv(1) if len(byte) == 0: raise Exception("Connection closed unexpectedly") buf += byte pos = buf.find(b"\n") if pos == -1: return line = buf[:pos] if line[-1:] == b"\r": line = line[:-1] return line def _starttls(self): if self.context is None: self.context = ssl.create_default_context( purpose=ssl.Purpose.CLIENT_AUTH ) self.context.load_cert_chain(certfile=self.pemfile) try: self.client = self.context.wrap_socket( self.client, server_side=True ) except Exception as e: print(traceback.format_exc()) print('*** TLS connection failed: ' + str(e)) return False return True def _try_read_metadata(self): buf = b'' while len(buf) < 6: more = self.client.recv(6 - len(buf)) if len(more) > 0: buf += more else: return {}, buf if buf != b'\033]499;': return {}, buf while len(buf) < 4096 and b"\007" not in buf: buf += self.client.recv(1) if b"\007" not in buf: return {}, buf extra_data = {} m = extra_data_re.match(buf) if m is not None: try: extra_data_json = m.group(1).decode('utf-8') extra_data = json.loads(extra_data_json) except Exception as e: print("failed to parse metadata: %s" % e, file=sys.stderr) pass buf = buf[len(m.group(0)):] return extra_data, buf