import os, os.path, errno, fcntl import subprocess import datetime, time import hashlib import logging import config from tornado.escape import json_decode, json_encode, xhtml_escape from tornado.ioloop import PeriodicCallback, IOLoop from terminal import TerminalRecorder from connection import WebtilesSocketConnection from util import DynamicTemplateLoader, dgl_format_str, parse_where_data from game_data_handler import GameDataHandler from ws_handler import update_all_lobbys, remove_in_lobbys from inotify import DirectoryWatcher last_game_id = 0 processes = dict() unowned_process_logger = logging.LoggerAdapter(logging.getLogger(), {}) def find_game_info(socket_dir, socket_file): game_id = socket_file[socket_file.index(":")+1:-5] if (game_id in config.games and os.path.abspath(config.games[game_id]["socket_path"]) == os.path.abspath(socket_dir)): config.games[game_id]["id"] = game_id return config.games[game_id] game_info = None for game_id in config.games.keys(): gi = config.games[game_id] if os.path.abspath(gi["socket_path"]) == os.path.abspath(socket_dir): game_info = gi break game_info["id"] = game_id return game_info def handle_new_socket(path, event): dirname, filename = os.path.split(path) if ":" not in filename or not filename.endswith(".sock"): return username = filename[:filename.index(":")] abspath = os.path.abspath(path) if event == DirectoryWatcher.CREATE: if abspath in processes: return # Created by us # Find a game_info with this socket path game_info = find_game_info(dirname, filename) # Create process handler process = CrawlProcessHandler(game_info, username, unowned_process_logger) processes[abspath] = process process.connect(abspath) process.logger.info("Found a %s game.", game_info["id"]) # Notify lobbys if config.dgl_mode: update_all_lobbys(process) elif event == DirectoryWatcher.DELETE: if abspath not in processes: return process = processes[abspath] if process.process: return # Handled by us, will be removed later process.handle_process_end() process.logger.info("Game ended.") remove_in_lobbys(process) del processes[abspath] def watch_socket_dirs(): watcher = DirectoryWatcher() added_dirs = set() for game_id in config.games.keys(): game_info = config.games[game_id] socket_dir = os.path.abspath(game_info["socket_path"]) if socket_dir in added_dirs: continue watcher.watch(socket_dir, handle_new_socket) class CrawlProcessHandlerBase(object): def __init__(self, game_params, username, logger, io_loop=None): self.game_params = game_params self.username = username self.logger = logging.LoggerAdapter(logger, {}) self.logger.process = self._process_log_msg self.io_loop = io_loop or IOLoop.instance() self.queue_messages = False self.process = None self.client_path = self.config_path("client_path") self.crawl_version = None self.where = {} self.wheretime = 0 self.last_milestone = None self.kill_timeout = None now = datetime.datetime.utcnow() self.formatted_time = now.strftime("%Y-%m-%d.%H:%M:%S") self.lock_basename = self.formatted_time + ".ttyrec" self.end_callback = None self._receivers = set() self.last_activity_time = time.time() self.idle_checker = PeriodicCallback(self.check_idle, 10000, io_loop = self.io_loop) self.idle_checker.start() self._was_idle = False self.last_watcher_join = 0 global last_game_id self.id = last_game_id + 1 last_game_id = self.id def _process_log_msg(self, msg, kwargs): return "P%-5s %s" % (self.id, msg), kwargs def format_path(self, path): return dgl_format_str(path, self.username, self.game_params) def config_path(self, key): if key not in self.game_params: return None return self.format_path(self.game_params[key]) def idle_time(self): return int(time.time() - self.last_activity_time) def is_idle(self): return self.idle_time() > 30 def check_idle(self): if self.is_idle() != self._was_idle: self._was_idle = self.is_idle() if config.dgl_mode: update_all_lobbys(self) def flush_messages_to_all(self): for receiver in self._receivers: receiver.flush_messages() def write_to_all(self, msg, send): for receiver in self._receivers: receiver.write_message(msg, send) def send_to_all(self, msg, **data): for receiver in self._receivers: receiver.send_message(msg, **data) def handle_chat_message(self, username, text): chat_msg = ("%s: %s" % (username, xhtml_escape(text))) self.send_to_all("chat", content = chat_msg) def handle_process_end(self): if self.kill_timeout: self.io_loop.remove_timeout(self.kill_timeout) self.kill_timeout = None self.idle_checker.stop() for watcher in list(self._receivers): if watcher.watched_game == self: watcher.send_message("game_ended", reason = self.exit_reason, message = self.exit_message, dump = self.exit_dump_url) watcher.go_lobby() if self.end_callback: self.end_callback() def update_watcher_description(self): try: player_url = config.player_url except: player_url = None def wrap_name(watcher, is_player=False): if is_player: class_type = 'player' else: class_type = 'watcher' if player_url is None: return "{1}".format(class_type, watcher) username = "{2}".format(config.player_url, class_type, watcher) username = username.replace('%s', watcher.lower()) return username player_name = None watchers = [] for w in self._receivers: if not w.username: continue if not w.watched_game: player_name = w.username else: watchers.append(w.username) watchers.sort(key=lambda s:s.lower()) watcher_names = [] if player_name is not None: watcher_names.append(wrap_name(player_name, True)) watcher_names += [wrap_name(w) for w in watchers] anon_count = len(self._receivers) - len(watcher_names) s = ", ".join(watcher_names) if len(watcher_names) > 0 and anon_count > 0: s = s + " and %i Anon" % anon_count elif anon_count > 0: s = "%i Anon" % anon_count self.send_to_all("update_spectators", count = self.watcher_count(), names = s) if config.dgl_mode: update_all_lobbys(self) def add_watcher(self, watcher): self.last_watcher_join = time.time() if self.client_path: self._send_client(watcher) if watcher.watched_game == self: watcher.send_json_options(self.game_params["id"], self.username) self._receivers.add(watcher) self.update_watcher_description() def remove_watcher(self, watcher): self._receivers.remove(watcher) self.update_watcher_description() def watcher_count(self): return len([w for w in self._receivers if w.watched_game]) def send_client_to_all(self): for receiver in self._receivers: self._send_client(receiver) if receiver.watched_game == self: receiver.send_json_options(self.game_params["id"], self.username) def _send_client(self, watcher): h = hashlib.sha1(os.path.abspath(self.client_path)) if self.crawl_version: h.update(self.crawl_version) v = h.hexdigest() GameDataHandler.add_version(v, os.path.join(self.client_path, "static")) templ_path = os.path.join(self.client_path, "templates") loader = DynamicTemplateLoader.get(templ_path) templ = loader.load("game.html") game_html = templ.generate(version = v) watcher.send_message("game_client", version = v, content = game_html) def stop(self): if self.process: self.process.send_signal(subprocess.signal.SIGHUP) t = time.time() + config.kill_timeout self.kill_timeout = self.io_loop.add_timeout(t, self.kill) def kill(self): if self.process: self.logger.info("Killing crawl process after SIGHUP did nothing.") self.process.send_signal(subprocess.signal.SIGTERM) self.kill_timeout = None interesting_info = ("xl", "char", "place", "god", "title") def set_where_info(self, newwhere): interesting = False for key in CrawlProcessHandlerBase.interesting_info: if self.where.get(key) != newwhere.get(key): interesting = True self.where = newwhere if interesting: update_all_lobbys(self) def check_where(self): morgue_path = self.config_path("morgue_path") wherefile = os.path.join(morgue_path, self.username + ".where") try: if os.path.getmtime(wherefile) > self.wheretime: self.wheretime = time.time() with open(wherefile, "r") as f: wheredata = f.read() if wheredata.strip() == "": return try: newwhere = parse_where_data(wheredata) except: self.logger.warning("Exception while trying to parse where file!", exc_info=True) else: if (newwhere.get("status") == "active" or newwhere.get("status") == "saved"): self.set_where_info(newwhere) except (OSError, IOError): pass def lobby_entry(self): entry = { "id": self.id, "username": self.username, "spectator_count": self.watcher_count(), "idle_time": (self.idle_time() if self.is_idle() else 0), "game_id": self.game_params["id"], } for key in CrawlProcessHandlerBase.interesting_info: if key in self.where: entry[key] = self.where[key] if self.last_milestone: entry["milestone"] = self.last_milestone.get("milestone") return entry def human_readable_where(self): try: return "L{xl} {char}, {place}".format(**self.where) except KeyError: return "" def log_milestone(self, milestone): # Use the updated where info in the milestone self.where = milestone self.last_milestone = milestone update_all_lobbys(self) def _base_call(self): game = self.game_params call = [game["crawl_binary"]] if "pre_options" in game: call += game["pre_options"] call += ["-name", self.username, "-rc", os.path.join(self.config_path("rcfile_path"), self.username + ".rc"), "-macro", os.path.join(self.config_path("macro_path"), self.username + ".macro"), "-morgue", self.config_path("morgue_path")] if "options" in game: call += game["options"] return call def note_activity(self): self.last_activity_time = time.time() self.check_idle() def handle_input(self, msg): raise NotImplementedError() class CrawlProcessHandler(CrawlProcessHandlerBase): def __init__(self, game_params, username, logger, io_loop=None): super(CrawlProcessHandler, self).__init__(game_params, username, logger, io_loop) self.socketpath = None self.conn = None self.ttyrec_filename = None self.inprogress_lock = None self.inprogress_lock_file = None self.exit_reason = None self.exit_message = None self.exit_dump_url = None self._stale_pid = None self._stale_lockfile = None self._purging_timer = None self._process_hup_timeout = None def start(self): self._purge_locks_and_start(True) def stop(self): super(CrawlProcessHandler, self).stop() self._stop_purging_stale_processes() self._stale_pid = None def _purge_locks_and_start(self, firsttime=False): # Purge stale locks lockfile = self._find_lock() if lockfile: try: with open(lockfile) as f: pid = f.readline() pid = int(pid) self._stale_pid = pid self._stale_lockfile = lockfile if firsttime: hup_wait = 10 self.send_to_all("stale_processes", timeout=hup_wait, game=self.game_params["name"]) to = self.io_loop.add_timeout(time.time() + hup_wait, self._kill_stale_process) self._process_hup_timeout = to else: self._kill_stale_process() except Exception: self.logger.error("Error while handling lockfile %s.", lockfile, exc_info=True) errmsg = ("Error while trying to terminate a stale process.
" + "Please contact an administrator.") self.send_to_all("stale_process_fail", content=errmsg) self.handle_process_end() else: # No more locks, can start self._start_process() def _stop_purging_stale_processes(self): if not self._process_hup_timeout: return self.io_loop.remove_timeout(self._process_hup_timeout) self._stale_pid = None self._stale_lockfile = None self._purging_timer = None self._process_hup_timeout = None self.handle_process_end() def _find_lock(self): for path in os.listdir(self.config_path("inprogress_path")): if (path.startswith(self.username + ":") and path.endswith(".ttyrec")): return os.path.join(self.config_path("inprogress_path"), path) return None def _kill_stale_process(self, signal=subprocess.signal.SIGHUP): self._process_hup_timeout = None if self._stale_pid == None: return if signal == subprocess.signal.SIGHUP: self.logger.info("Purging stale lock at %s, pid %s.", self._stale_lockfile, self._stale_pid) elif signal == subprocess.signal.SIGTERM: self.logger.warning("Terminating pid %s forcefully!", self._stale_pid) try: os.kill(self._stale_pid, signal) except OSError, e: if e.errno == errno.ESRCH: # Process doesn't exist self._purge_stale_lock() else: self.logger.error("Error while killing process %s.", self._stale_pid, exc_info=True) errmsg = ("Error while trying to terminate a stale process.
" + "Please contact an administrator.") self.send_to_all("stale_process_fail", content=errmsg) self.handle_process_end() else: if signal == subprocess.signal.SIGTERM: self._purge_stale_lock() else: if signal == subprocess.signal.SIGHUP: self._purging_timer = 10 else: self._purging_timer -= 1 if self._purging_timer > 0: self.io_loop.add_timeout(time.time() + 1, self._check_stale_process) else: self.logger.warning("Couldn't terminate pid %s gracefully.", self._stale_pid) self.send_to_all("force_terminate?") def _check_stale_process(self): self._kill_stale_process(0) def _do_force_terminate(self, answer): if answer: self._kill_stale_process(subprocess.signal.SIGTERM) else: self.handle_process_end() def _purge_stale_lock(self): if os.path.exists(self._stale_lockfile): os.remove(self._stale_lockfile) self._purge_locks_and_start(False) def _start_process(self): self.socketpath = os.path.join(self.config_path("socket_path"), self.username + ":" + self.formatted_time + ".sock") try: # Unlink if necessary os.unlink(self.socketpath) except OSError, e: if e.errno != errno.ENOENT: raise game = self.game_params call = self._base_call() + ["-webtiles-socket", self.socketpath, "-await-connection"] ttyrec_path = self.config_path("ttyrec_path") if ttyrec_path: self.ttyrec_filename = os.path.join(ttyrec_path, self.lock_basename) processes[os.path.abspath(self.socketpath)] = self if config.dgl_mode: self.logger.info("Starting %s.", game["id"]) else: self.logger.info("Starting game.") try: self.process = TerminalRecorder(call, self.ttyrec_filename, self._ttyrec_id_header(), self.logger, self.io_loop, config.recording_term_size) self.process.end_callback = self._on_process_end self.process.output_callback = self._on_process_output self.process.activity_callback = self.note_activity self.process.error_callback = self._on_process_error self.gen_inprogress_lock() self.connect(self.socketpath, True) self.logger.info("Crawl FDs: fd%s, fd%s.", self.process.child_fd, self.process.errpipe_read) self.last_activity_time = time.time() self.check_where() except Exception: self.logger.warning("Error while starting the Crawl process!", exc_info=True) if self.process: self.stop() else: self._on_process_end() def connect(self, socketpath, primary = False): self.socketpath = socketpath self.conn = WebtilesSocketConnection(self.io_loop, self.socketpath, self.logger) self.conn.message_callback = self._on_socket_message self.conn.close_callback = self._on_socket_close self.conn.connect(primary) def gen_inprogress_lock(self): self.inprogress_lock = os.path.join(self.config_path("inprogress_path"), self.username + ":" + self.lock_basename) f = open(self.inprogress_lock, "w") fcntl.lockf(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) self.inprogress_lock_file = f cols, lines = self.process.get_terminal_size() f.write("%s\n%s\n%s\n" % (self.process.pid, lines, cols)) f.flush() def remove_inprogress_lock(self): if self.inprogress_lock_file is None: return fcntl.lockf(self.inprogress_lock_file.fileno(), fcntl.LOCK_UN) self.inprogress_lock_file.close() try: os.remove(self.inprogress_lock) except OSError: # Lock already got deleted pass def _ttyrec_id_header(self): clrscr = "\033[2J" crlf = "\r\n" templ = (clrscr + "\033[1;1H" + crlf + "Player: %s" + crlf + "Game: %s" + crlf + "Server: %s" + crlf + "Filename: %s" + crlf + "Time: (%s) %s" + crlf + clrscr) tstamp = int(time.time()) ctime = time.ctime() return templ % (self.username, self.game_params["name"], config.server_id, self.lock_basename, tstamp, ctime) def _on_process_end(self): self.logger.info("Crawl terminated.") self.remove_inprogress_lock() try: del processes[os.path.abspath(self.socketpath)] except KeyError: self.logger.warning("Process entry already deleted: %s", self.socketpath) self.process = None self.handle_process_end() def _on_socket_close(self): self.conn = None self.stop() def handle_process_end(self): if self.conn: self.conn.close_callback = None self.conn.close() self.conn = None super(CrawlProcessHandler, self).handle_process_end() def add_watcher(self, watcher): super(CrawlProcessHandler, self).add_watcher(watcher) if self.conn and self.conn.open: self.conn.send_message('{"msg":"spectator_joined"}') def handle_input(self, msg): obj = json_decode(msg) if obj["msg"] == "input" and self.process: self.last_action_time = time.time() data = "" for x in obj.get("data", []): data += chr(x) data += obj.get("text", u"").encode("utf8") self.process.write_input(data) elif obj["msg"] == "force_terminate": self._do_force_terminate(obj["answer"]) elif obj["msg"] == "stop_stale_process_purge": self._stop_purging_stale_processes() elif self.conn and self.conn.open: self.conn.send_message(msg.encode("utf8")) def handle_chat_message(self, username, text): super(CrawlProcessHandler, self).handle_chat_message(username, text) if self.conn and self.conn.open: self.conn.send_message(json_encode({ "msg": "note", "content": "%s: %s" % (username, text) })) def _on_process_output(self, line): self.check_where() self.write_to_all(line, True) def _on_process_error(self, line): if line.startswith("ERROR"): self.exit_reason = "crash" if line.rfind(":") != -1: self.exit_message = line[line.rfind(":") + 1:].strip() elif line.startswith("Writing crash info to"): self.exit_reason = "crash" url = None if line.rfind("/") != -1: url = line[line.rfind("/") + 1:].strip() elif line.rfind(" ") != -1: url = line[line.rfind(" ") + 1:].strip() if url != None: self.exit_dump_url = self.game_params["morgue_url"].replace("%n", self.username) + os.path.splitext(url)[0] def _on_socket_message(self, msg): # stdout data is only used for compatibility to wrapper # scripts -- so as soon as we receive something on the socket, # we stop using stdout if self.process: self.process.output_callback = None if msg.startswith("*"): # Special message to the server msg = msg[1:] msgobj = json_decode(msg) if msgobj["msg"] == "client_path": if self.client_path == None: self.client_path = self.format_path(msgobj["path"]) if "version" in msgobj: self.crawl_version = msgobj["version"] self.logger.info("Crawl version: %s.", self.crawl_version) self.send_client_to_all() elif msgobj["msg"] == "flush_messages": # only queue, once we know the crawl process asks for flushes self.queue_messages = True; self.flush_messages_to_all() elif msgobj["msg"] == "dump": if "morgue_url" in self.game_params and self.game_params["morgue_url"]: url = self.game_params["morgue_url"].replace("%n", self.username) + msgobj["filename"] if msgobj["type"] == "command": self.send_to_all("dump", url = url) else: self.exit_dump_url = url elif msgobj["msg"] == "exit_reason": self.exit_reason = msgobj["type"] if "message" in msgobj: self.exit_message = msgobj["message"] else: self.exit_message = None else: self.logger.warning("Unknown message from the crawl process: %s", msgobj["msg"]) else: self.check_where() if time.time() > self.last_watcher_join + 2: # Treat socket messages as activity, since it's otherwise # hard to determine activity for games found via # watch_socket_dirs. # But don't if a spectator just joined, since we don't # want that to reset idle time. self.note_activity() self.write_to_all(msg, not self.queue_messages) class DGLLessCrawlProcessHandler(CrawlProcessHandler): def __init__(self, logger, io_loop): game_params = dict( name = "DCSS", ttyrec_path = "./", inprogress_path = "./", socket_path = "./", client_path = "./webserver/game_data") super(DGLLessCrawlProcessHandler, self).__init__(game_params, "game", logger, io_loop) def _base_call(self): return ["./crawl"] def check_where(self): pass class CompatCrawlProcessHandler(CrawlProcessHandlerBase): def __init__(self, game_params, username, logger, io_loop): super(CompatCrawlProcessHandler, self).__init__(game_params, username, logger, io_loop) self.client_path = game_params["client_prefix"] def start(self): game = self.game_params call = self._base_call() self.logger.info("Starting %s (compat-mode).", game["id"]) self.process = subprocess.Popen(call, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) self.io_loop.add_handler(self.process.stdout.fileno(), self.on_stdout, self.io_loop.READ | self.io_loop.ERROR) self.io_loop.add_handler(self.process.stderr.fileno(), self.on_stderr, self.io_loop.READ | self.io_loop.ERROR) self.logger.info("Crawl FDs: fd%s, fd%s, fd%s", self.process.stdin.fileno(), self.process.stdout.fileno(), self.process.stderr.fileno()) self.last_activity_time = time.time() self.create_mock_ttyrec() processes[os.path.abspath(self.ttyrec_filename)] = self self.check_where() def create_mock_ttyrec(self): running_game_path = self.config_path("running_game_path") self.ttyrec_filename = os.path.join(running_game_path, self.username + ":" + self.lock_basename) f = open(self.ttyrec_filename, "w") f.close() def delete_mock_ttyrec(self): if self.ttyrec_filename: os.remove(self.ttyrec_filename) self.ttyrec_filename = None def poll_crawl(self): if self.process is not None and self.process.poll() is not None: self.io_loop.remove_handler(self.process.stdout.fileno()) self.io_loop.remove_handler(self.process.stderr.fileno()) self.process.stdout.close() self.process.stderr.close() self.process = None self.logger.info("Crawl terminated. (compat-mode)") try: del processes[os.path.abspath(self.ttyrec_filename)] except KeyError: self.logger.warning("Process entry already deleted") self.delete_mock_ttyrec() self.handle_process_end() def add_watcher(self, watcher): super(CompatCrawlProcessHandler, self).add_watcher(watcher) if self.process: self.process.stdin.write("^r") def handle_input(self, msg): if msg.startswith("{"): obj = json_decode(msg) self.note_activity() if obj["msg"] == "input" and self.process: self.last_action_time = time.time() data = "" for x in obj.get("data", []): data += chr(x) data += obj.get("text", u"").encode("utf8") if data == "^": self.process.stdin.write("\\94\n") self.process.stdin.write(data) elif obj["msg"] == "key" and self.process: self.process.stdin.write("\\" + str(obj["keycode"]) + "\n") else: if not msg.startswith("^"): self.note_activity() self.process.stdin.write(msg.encode("utf8")) def on_stderr(self, fd, events): if events & self.io_loop.ERROR: self.poll_crawl() elif events & self.io_loop.READ: s = self.process.stderr.readline() if not (s.isspace() or s == ""): self.logger.info("ERR: %s", s.strip()) self.poll_crawl() def on_stdout(self, fd, events): if events & self.io_loop.ERROR: self.poll_crawl() elif events & self.io_loop.READ: msg = self.process.stdout.readline() self.write_to_all(msg, True) self.poll_crawl() self.check_where() def _send_client(self, watcher): templ_path = os.path.join(config.template_path, self.client_path) loader = DynamicTemplateLoader.get(templ_path) templ = loader.load("game.html") game_html = templ.generate(prefix = self.client_path) watcher.send_message("game_client", content = game_html)