#!/usr/bin/env python3 """ MUD Keep-Alive Proxy ==================== A lightweight TCP proxy that sits between your MUD client and a MUD server, keeping your connection alive when your client disconnects temporarily. How it works: 1. You run this proxy on your machine (or a server you control). 2. You point your MUD client at the proxy (e.g., localhost:4000). 3. The proxy connects to the real MUD server on your behalf. 4. If your client disconnects, the proxy keeps the MUD connection alive. 5. When you reconnect, the proxy replays any missed output. Requirements: - Python 3.8+ (uses only the standard library, no pip install needed) Quick Start: 1. Edit the CONFIGURATION section below (set your MUD server and port). 2. Run: python3 mud_proxy.py 3. Connect your MUD client to localhost:4000 (or whatever PROXY_PORT you set). License: MIT """ import asyncio import logging import os import signal import socket import sys import time from collections import deque # ───────────────────────────────────────────────────────────────────────────── # CONFIGURATION — Edit these values for your MUD server # ───────────────────────────────────────────────────────────────────────────── # The MUD server you want to connect to MUD_HOST = "your-mud-server.com" MUD_PORT = 4000 # The local port this proxy listens on (your MUD client connects here) PROXY_HOST = "0.0.0.0" # 0.0.0.0 = accept connections from any interface PROXY_PORT = 4000 # Connect your MUD client to localhost:4000 # How often (in seconds) to send a keepalive to the MUD server. # Most MUD servers idle-disconnect after 10-30 minutes. # Set to 0 to disable keepalives. KEEPALIVE_INTERVAL = 120 # 2 minutes # What to send as the keepalive. A blank line works for most MUDs. # Some MUDs use specific idle commands. Common options: # "" — empty line (most MUDs accept this silently) # "idle" — some MUDs recognize this as a no-op # "look" — forces the MUD to send output (noisy but reliable) KEEPALIVE_COMMAND = "" # How long (in minutes) to keep an orphaned MUD connection alive # after your client disconnects. After this timeout, the MUD connection # is closed. Set to 0 to close immediately on client disconnect. ORPHAN_TIMEOUT_MINUTES = 60 # Maximum output to buffer (in KB) while the client is disconnected. # When the client reconnects, buffered output is replayed. # Older output is discarded if the buffer exceeds this size. BUFFER_MAX_KB = 64 # Maximum number of simultaneous client connections the proxy will accept. MAX_CONNECTIONS = 10 # TCP keepalive settings (low-level socket keepalive, separate from the # MUD-level keepalive command above). These detect dead network connections. TCP_KEEPALIVE_IDLE = 30 # Seconds before first keepalive probe TCP_KEEPALIVE_INTERVAL = 10 # Seconds between probes TCP_KEEPALIVE_COUNT = 6 # Failed probes before closing connection # Logging level: DEBUG, INFO, WARNING, ERROR # INFO shows connection/disconnect events. DEBUG shows all data flow. LOG_LEVEL = "INFO" # ───────────────────────────────────────────────────────────────────────────── # END OF CONFIGURATION — You should not need to edit anything below this line. # ───────────────────────────────────────────────────────────────────────────── __version__ = "1.0.0" # ── Logging ───────────────────────────────────────────────────────────────── logging.basicConfig( format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), ) log = logging.getLogger("mud_proxy") # ── Helpers ───────────────────────────────────────────────────────────────── def enable_tcp_keepalive(writer): """Enable TCP-level keepalive on a socket to detect dead connections.""" try: sock = writer.transport.get_extra_info("socket") if sock is None: return sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) if hasattr(socket, "TCP_KEEPIDLE"): # Linux sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, TCP_KEEPALIVE_IDLE) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, TCP_KEEPALIVE_INTERVAL) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, TCP_KEEPALIVE_COUNT) elif hasattr(socket, "TCP_KEEPALIVE"): # macOS sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, TCP_KEEPALIVE_IDLE) except (OSError, AttributeError): pass def format_addr(addr): """Format a (host, port) tuple as a string.""" if addr: return f"{addr[0]}:{addr[1]}" return "unknown" # ── Session ───────────────────────────────────────────────────────────────── class ProxySession: """ Represents one proxied connection: client <-> proxy <-> MUD server. When the client disconnects, the session becomes 'orphaned' — the MUD connection stays alive and output is buffered. When a new client connects, it can resume the orphaned session. """ def __init__(self, session_id, mud_reader, mud_writer, client_reader, client_writer, client_addr): self.session_id = session_id self.mud_reader = mud_reader self.mud_writer = mud_writer self.client_reader = client_reader self.client_writer = client_writer self.client_addr = client_addr self.orphaned = False self.orphaned_at = None self.output_buffer = bytearray() self.buffer_max = BUFFER_MAX_KB * 1024 # asyncio tasks for the two relay directions self.mud_task = None self.client_task = None self.keepalive_task = None self.created_at = time.time() def __repr__(self): state = "orphaned" if self.orphaned else "active" return f"" def buffer_output(self, data): """Buffer MUD output while the client is disconnected.""" self.output_buffer.extend(data) if len(self.output_buffer) > self.buffer_max: # Keep the most recent output overflow = len(self.output_buffer) - self.buffer_max self.output_buffer = self.output_buffer[overflow:] log.debug(f"{self} Buffer overflow, trimmed {overflow} bytes") def drain_buffer(self): """Return and clear the buffered output.""" data = bytes(self.output_buffer) self.output_buffer.clear() return data # ── Proxy Server ──────────────────────────────────────────────────────────── class MUDProxy: """ The main proxy server. Listens for client connections, connects to the MUD server, and relays data between them with keepalive support. """ def __init__(self): self.sessions = {} # session_id -> ProxySession self.orphaned_by_ip = {} # client_ip -> session_id (for auto-resume) self._next_id = 1 self._server = None self._cleanup_task = None self._shutting_down = False # ── Lifecycle ─────────────────────────────────────────────────────── async def start(self): """Start the proxy server and listen for connections.""" self._server = await asyncio.start_server( self._on_client_connect, PROXY_HOST, PROXY_PORT, ) self._cleanup_task = asyncio.create_task(self._cleanup_loop()) addrs = ", ".join(str(s.getsockname()) for s in self._server.sockets) log.info(f"MUD Keep-Alive Proxy v{__version__} started") log.info(f"Listening on {addrs}") log.info(f"Forwarding to {MUD_HOST}:{MUD_PORT}") if KEEPALIVE_INTERVAL > 0: log.info(f"Keepalive every {KEEPALIVE_INTERVAL}s " f"(command: {KEEPALIVE_COMMAND!r})") log.info(f"Orphan timeout: {ORPHAN_TIMEOUT_MINUTES} minutes") log.info("Ready for connections.") async with self._server: await self._server.serve_forever() async def shutdown(self): """Gracefully shut down the proxy.""" if self._shutting_down: return self._shutting_down = True log.info("Shutting down proxy...") # Cancel cleanup if self._cleanup_task: self._cleanup_task.cancel() # Close all sessions for session in list(self.sessions.values()): await self._close_session(session, reason="proxy shutdown") # Stop accepting connections if self._server: self._server.close() await self._server.wait_closed() log.info("Proxy shut down cleanly.") # ── Connection Handling ───────────────────────────────────────────── async def _on_client_connect(self, client_reader, client_writer): """Handle a new client connection.""" addr = client_writer.get_extra_info("peername") addr_str = format_addr(addr) client_ip = addr[0] if addr else "unknown" log.info(f"New client connection from {addr_str}") # Connection limit active = sum(1 for s in self.sessions.values() if not s.orphaned) if active >= MAX_CONNECTIONS: log.warning(f"Connection limit reached ({active}/{MAX_CONNECTIONS}), " f"rejecting {addr_str}") try: client_writer.write( b"\r\nProxy: Too many connections. Try again later.\r\n") await client_writer.drain() client_writer.close() except Exception: pass return # Check for an orphaned session from the same IP (auto-resume) orphan_sid = self.orphaned_by_ip.get(client_ip) if orphan_sid and orphan_sid in self.sessions: session = self.sessions[orphan_sid] if session.orphaned: log.info(f"Resuming orphaned session #{session.session_id} " f"for {addr_str}") await self._resume_session(session, client_reader, client_writer, addr_str) return # No orphaned session — create a fresh connection to the MUD server try: mud_reader, mud_writer = await asyncio.wait_for( asyncio.open_connection(MUD_HOST, MUD_PORT), timeout=15, ) except asyncio.TimeoutError: log.error(f"Timeout connecting to MUD server {MUD_HOST}:{MUD_PORT}") try: client_writer.write( b"\r\nProxy: Could not reach the MUD server (timeout). " b"Check your MUD_HOST and MUD_PORT settings.\r\n") await client_writer.drain() client_writer.close() except Exception: pass return except OSError as e: log.error(f"Cannot connect to MUD server {MUD_HOST}:{MUD_PORT}: {e}") try: client_writer.write( f"\r\nProxy: Cannot connect to MUD server: {e}\r\n" .encode("utf-8", errors="replace")) await client_writer.drain() client_writer.close() except Exception: pass return # Enable TCP keepalive on both connections enable_tcp_keepalive(mud_writer) enable_tcp_keepalive(client_writer) # Create session sid = self._next_id self._next_id += 1 session = ProxySession( sid, mud_reader, mud_writer, client_reader, client_writer, addr_str, ) self.sessions[sid] = session log.info(f"Session #{sid} created: {addr_str} <-> " f"{MUD_HOST}:{MUD_PORT}") # Notify the client try: client_writer.write( b"\r\n[MUD Proxy] Connected to MUD server. " b"Your session will persist if you disconnect.\r\n\r\n") await client_writer.drain() except (ConnectionError, OSError): pass # Start relay tasks session.mud_task = asyncio.create_task( self._relay_mud_to_client(session)) session.client_task = asyncio.create_task( self._relay_client_to_mud(session)) if KEEPALIVE_INTERVAL > 0: session.keepalive_task = asyncio.create_task( self._keepalive_loop(session)) # Wait for both relays to finish try: await asyncio.gather( session.mud_task, session.client_task, return_exceptions=True) except Exception: pass # ── Resume ────────────────────────────────────────────────────────── async def _resume_session(self, session, client_reader, client_writer, addr_str): """Resume an orphaned session with a new client connection.""" session.client_reader = client_reader session.client_writer = client_writer session.client_addr = addr_str session.orphaned = False session.orphaned_at = None enable_tcp_keepalive(client_writer) # Remove from orphan tracking for ip, sid in list(self.orphaned_by_ip.items()): if sid == session.session_id: del self.orphaned_by_ip[ip] # Replay buffered output buffered = session.drain_buffer() if buffered: try: client_writer.write( b"\r\n[MUD Proxy] Session resumed. " b"Replaying missed output...\r\n\r\n") client_writer.write(buffered) client_writer.write( b"\r\n[MUD Proxy] End of buffered output.\r\n") await client_writer.drain() except (ConnectionError, OSError): log.warning(f"{session} Failed to replay buffer to new client") self._orphan_session(session, addr_str) return else: try: client_writer.write( b"\r\n[MUD Proxy] Session resumed. " b"No missed output.\r\n") await client_writer.drain() except (ConnectionError, OSError): self._orphan_session(session, addr_str) return log.info(f"{session} Resumed successfully from {addr_str}") # Restart relay tasks session.mud_task = asyncio.create_task( self._relay_mud_to_client(session)) session.client_task = asyncio.create_task( self._relay_client_to_mud(session)) if KEEPALIVE_INTERVAL > 0 and not session.keepalive_task: session.keepalive_task = asyncio.create_task( self._keepalive_loop(session)) try: await asyncio.gather( session.mud_task, session.client_task, return_exceptions=True) except Exception: pass # ── Relay: MUD -> Client ──────────────────────────────────────────── async def _relay_mud_to_client(self, session): """Forward data from the MUD server to the client.""" try: while True: try: data = await asyncio.wait_for( session.mud_reader.read(8192), timeout=300, ) except asyncio.TimeoutError: # No data from MUD in 5 minutes — could be normal for # idle MUDs. Just continue the loop. log.debug(f"{session} MUD read timeout (idle, continuing)") continue if not data: log.info(f"{session} MUD server disconnected") await self._close_session( session, reason="MUD disconnected", client_msg=b"\r\n[MUD Proxy] MUD server disconnected.\r\n") return if session.orphaned: session.buffer_output(data) else: try: session.client_writer.write(data) await session.client_writer.drain() except (ConnectionError, OSError): log.info(f"{session} Client write failed, orphaning") self._orphan_session(session, session.client_addr) except asyncio.CancelledError: return except Exception as e: log.error(f"{session} MUD relay error: {e}") await self._close_session(session, reason=f"MUD relay error: {e}") # ── Relay: Client -> MUD ──────────────────────────────────────────── async def _relay_client_to_mud(self, session): """Forward data from the client to the MUD server.""" try: while True: try: data = await asyncio.wait_for( session.client_reader.read(8192), timeout=300, ) except asyncio.TimeoutError: log.debug(f"{session} Client read timeout (idle, continuing)") continue if not data: log.info(f"{session} Client disconnected") self._orphan_session(session, session.client_addr) return # Forward to MUD try: session.mud_writer.write(data) await session.mud_writer.drain() except (ConnectionError, OSError): log.error(f"{session} MUD write failed") await self._close_session( session, reason="MUD write failed", client_msg=b"\r\n[MUD Proxy] Lost connection to " b"MUD server.\r\n") return except asyncio.CancelledError: return except Exception as e: log.error(f"{session} Client relay error: {e}") await self._close_session(session, reason=f"client relay error: {e}") # ── Keepalive ─────────────────────────────────────────────────────── async def _keepalive_loop(self, session): """Periodically send a keepalive command to the MUD server.""" try: while True: await asyncio.sleep(KEEPALIVE_INTERVAL) if session.orphaned: # Still send keepalives while orphaned to keep the MUD # connection alive pass try: cmd = KEEPALIVE_COMMAND + "\r\n" session.mud_writer.write(cmd.encode("utf-8")) await session.mud_writer.drain() log.debug(f"{session} Sent keepalive") except (ConnectionError, OSError): log.warning(f"{session} Keepalive failed, MUD connection dead") await self._close_session( session, reason="keepalive failed", client_msg=b"\r\n[MUD Proxy] MUD connection lost.\r\n") return except asyncio.CancelledError: return # ── Orphan / Close ────────────────────────────────────────────────── def _orphan_session(self, session, client_addr): """ Mark a session as orphaned. The MUD connection stays alive and output is buffered for when the client reconnects. """ if session.orphaned: return if ORPHAN_TIMEOUT_MINUTES <= 0: # Orphan timeout disabled — close immediately asyncio.create_task( self._close_session(session, reason="client disconnected")) return session.orphaned = True session.orphaned_at = time.time() session.client_reader = None session.client_writer = None # Cancel the client relay task (MUD relay continues to buffer output) if session.client_task and not session.client_task.done(): session.client_task.cancel() # Track by IP for auto-resume client_ip = client_addr.split(":")[0] if ":" in client_addr else client_addr self.orphaned_by_ip[client_ip] = session.session_id log.info(f"{session} Orphaned. MUD connection alive for " f"{ORPHAN_TIMEOUT_MINUTES} minutes.") async def _close_session(self, session, reason="unknown", client_msg=None): """Fully close a session (both client and MUD connections).""" sid = session.session_id if sid not in self.sessions: return log.info(f"{session} Closing: {reason}") # Send message to client if connected if client_msg and session.client_writer and not session.orphaned: try: session.client_writer.write(client_msg) await session.client_writer.drain() except Exception: pass # Cancel tasks for task in (session.mud_task, session.client_task, session.keepalive_task): if task and not task.done(): task.cancel() # Close connections for writer in (session.mud_writer, session.client_writer): if writer: try: writer.close() await writer.wait_closed() except Exception: pass # Remove from tracking del self.sessions[sid] for ip, osid in list(self.orphaned_by_ip.items()): if osid == sid: del self.orphaned_by_ip[ip] # ── Cleanup Loop ──────────────────────────────────────────────────── async def _cleanup_loop(self): """Periodically clean up expired orphaned sessions.""" try: while True: await asyncio.sleep(60) # Check every minute now = time.time() timeout = ORPHAN_TIMEOUT_MINUTES * 60 for session in list(self.sessions.values()): if (session.orphaned and session.orphaned_at and now - session.orphaned_at > timeout): log.info(f"{session} Orphan timeout expired") await self._close_session( session, reason="orphan timeout expired") # Log status periodically active = sum(1 for s in self.sessions.values() if not s.orphaned) orphaned = sum(1 for s in self.sessions.values() if s.orphaned) if active or orphaned: log.debug(f"Sessions: {active} active, " f"{orphaned} orphaned") except asyncio.CancelledError: return # ── Main ──────────────────────────────────────────────────────────────────── def main(): """Entry point. Sets up signal handlers and starts the proxy.""" # Validate configuration if MUD_HOST == "your-mud-server.com": print("=" * 60) print(" ERROR: You need to configure the proxy first!") print() print(" Open mud_proxy.py in a text editor and change the") print(" CONFIGURATION section near the top of the file.") print() print(" At minimum, set MUD_HOST and MUD_PORT to your") print(" MUD server's address and port.") print("=" * 60) sys.exit(1) if MUD_PORT == PROXY_PORT and MUD_HOST in ("127.0.0.1", "localhost"): print("ERROR: PROXY_PORT and MUD_PORT cannot be the same when") print(" MUD_HOST is localhost. The proxy would connect to itself.") sys.exit(1) proxy = MUDProxy() # Handle Ctrl+C and termination signals gracefully loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) def signal_handler(): log.info("Received shutdown signal") asyncio.ensure_future(proxy.shutdown()) for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, signal_handler) except NotImplementedError: # Windows does not support add_signal_handler pass try: loop.run_until_complete(proxy.start()) except KeyboardInterrupt: log.info("Interrupted by user") loop.run_until_complete(proxy.shutdown()) finally: loop.close() if __name__ == "__main__": main()