import argparse import asyncio import random import logging from datetime import datetime import traceback __version__ = "1.7" class ConnectionInfo: def __init__(self, src_ip, dst_domain, method): self.src_ip = src_ip self.dst_domain = dst_domain self.method = method self.start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.traffic_in = 0 self.traffic_out = 0 class ProxyServer: def __init__(self, host, port, log_access, quiet, verbose): self.host = host self.port = port self.log_access_file = log_access self.quiet = quiet self.verbose = verbose self.logger = logging.getLogger(__name__) self.logging_access = None self.total_connections = 0 self.allowed_connections = 0 self.blocked_connections = 0 self.traffic_in = 0 self.traffic_out = 0 self.last_traffic_in = 0 self.last_traffic_out = 0 self.speed_in = 0 self.speed_out = 0 self.last_time = None self.active_connections: dict[tuple, ConnectionInfo] = {} self.connections_lock = asyncio.Lock() self.blocked = [] self.tasks = [] self.server = None self.setup_logging() def setup_logging(self): """ Set up the logging configuration. The logging level is set to ERROR and the log messages are written to the file specified by the log_file parameter. The log format is [%(asctime)s][%(levelname)s]: %(message)s and the date format is %Y-%m-%d %H:%M:%S. """ if self.log_access_file: self.logging_access = logging.FileHandler( self.log_access_file, encoding='utf-8') self.logging_access.setFormatter(logging.Formatter("%(message)s")) self.logging_access.setLevel(logging.INFO) self.logging_access.addFilter( lambda record: record.levelno == logging.INFO) else: self.logging_access = logging.NullHandler() self.logger.propagate = False self.logger.handlers = [] self.logger.setLevel(logging.INFO) self.logger.addHandler(self.logging_access) async def run(self): """ Start the proxy server and run it until it is stopped. This method starts the proxy server by calling `asyncio.start_server` with the `handle_connection` method as the protocol handler. The server is then started with the `serve_forever` method. """ self.print_banner() self.server = await asyncio.start_server( self.handle_connection, self.host, self.port ) await self.server.serve_forever() def print_banner(self): """ Print information about the proxy. """ print(f"Proxy is running on {self.host}:{self.port}") print(f"Proxy started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") @staticmethod def format_size(size): """ Convert a size in bytes to a human-readable string with appropriate units. """ units = ["B", "KB", "MB", "GB"] unit = 0 while size >= 1024 and unit < len(units) - 1: size /= 1024 unit += 1 return f"{size:.1f} {units[unit]}" @staticmethod def format_speed(speed_bps): units = ["bps", "Kbps", "Mbps", "Gbps"] unit = 0 speed = speed_bps while speed >= 1000 and unit < len(units) - 1: speed /= 1000 unit += 1 return f"{speed:.1f} {units[unit]}" async def handle_connection(self, reader, writer): """ Handle a connection from a client. This method is called when a connection is accepted from a client. It reads the initial HTTP data from the client and tries to parse it as a CONNECT request. If the request is valid, it opens a connection to the target server and starts piping data between the client and the target server. """ try: client_ip, client_port = writer.get_extra_info("peername") http_data = await reader.read(1500) if not http_data: writer.close() return headers = http_data.split(b"\r\n") first_line = headers[0].split(b" ") method = first_line[0] url = first_line[1] if method == b"CONNECT": host_port = url.split(b":") host = host_port[0] port = int(host_port[1]) if len(host_port) > 1 else 443 else: host_header = next( (h for h in headers if h.startswith(b"Host: ")), None ) if not host_header: raise ValueError("Missing Host header") host_port = host_header[6:].split(b":") host = host_port[0] port = int(host_port[1]) if len(host_port) > 1 else 80 conn_key = (client_ip, client_port) conn_info = ConnectionInfo( client_ip, host.decode(), method.decode()) async with self.connections_lock: self.active_connections[conn_key] = conn_info if method == b"CONNECT": writer.write(b"HTTP/1.1 200 Connection Established\r\n\r\n") await writer.drain() remote_reader, remote_writer = await asyncio.open_connection( host.decode(), port ) await self.fragment_data(reader, remote_writer) else: remote_reader, remote_writer = await asyncio.open_connection( host.decode(), port ) remote_writer.write(http_data) await remote_writer.drain() self.allowed_connections += 1 self.total_connections += 1 self.tasks.extend( [ asyncio.create_task( self.pipe(reader, remote_writer, "out", conn_key) ), asyncio.create_task( self.pipe(remote_reader, writer, "in", conn_key) ), ] ) except Exception as e: self.logger.error(traceback.format_exc()) if self.verbose: print(f"[NON-CRITICAL]: {e}") writer.close() async def pipe(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, direction, conn_key): """ Pipe data from a reader to a writer. This function reads data from a reader and writes it to a writer until the reader is closed or the writer is closed. If an error occurs during the transfer, the error is logged and the writer is closed. """ try: while not reader.at_eof() and not writer.is_closing(): data = await reader.read(1500) async with self.connections_lock: conn_info: ConnectionInfo | None = self.active_connections.get(conn_key) self.active_connections if conn_info: if direction == "out": self.traffic_out += len(data) conn_info.traffic_out += len(data) else: self.traffic_in += len(data) conn_info.traffic_in += len(data) writer.write(data) await writer.drain() except Exception as e: self.logger.error(traceback.format_exc()) if self.verbose: print(f"\033[93m[NON-CRITICAL]:\033[97m {e}\033[0m") finally: writer.close() async with self.connections_lock: conn_info: ConnectionInfo | None = self.active_connections.pop( conn_key, None) if conn_info: self.logger.info( f"{conn_info.start_time} {conn_info.src_ip} {conn_info.method} {conn_info.dst_domain}" ) async def fragment_data(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): """ Fragment data from a reader and write it to a writer. This function reads data from a reader and fragments. It is split into chunks and each chunk is written to the writer as a separate TLS record """ data: bytes = b'' try: await reader.read(5) data = await reader.read(2048) except Exception as e: self.logger.error(traceback.format_exc()) if self.verbose: print(f"[NON-CRITICAL]: {e}") return self.blocked_connections += 1 host_end = data.find(b"\x00") if host_end != -1: writer.write( bytes.fromhex("160304") + (host_end + 1).to_bytes(2, "big") + data[:host_end + 1] ) data = data[host_end + 1:] while data: chunk_len = random.randint(1, len(data)) writer.write( bytes.fromhex("160304") + chunk_len.to_bytes(2, "big") + data[:chunk_len] ) data = data[chunk_len:] await writer.drain() async def shutdown(self): """ Shutdown the proxy server. This function closes the server and cancels all tasks running on the event loop. If a server is not running, the function does nothing. """ if self.server: self.server.close() await self.server.wait_closed() for task in self.tasks: task.cancel() async def main(): parser = argparse.ArgumentParser() parser.add_argument("--host", default="0.0.0.0", help="Proxy host") parser.add_argument("--port", type=int, default=8881, help="Proxy port") parser.add_argument("--blacklist", default="blacklist.txt", help="Path to blacklist file" ) parser.add_argument("--log_access", required=False, help="Path to the access control log" ) parser.add_argument("--log_error", required=False, help="Path to log file for errors" ) parser.add_argument("-q", "--quiet", action="store_true", help="Remove UI output" ) parser.add_argument("-v", "--verbose", action="store_true", help="Show more info (only for devs)" ) logging.getLogger("asyncio").setLevel(logging.CRITICAL) args = parser.parse_args() proxy = ProxyServer( args.host, args.port, args.log_access, args.quiet, args.verbose, ) try: await proxy.run() except asyncio.CancelledError: await proxy.shutdown() print("Shutting down proxy...") if __name__ == "__main__": asyncio.run(main())