342 lines
11 KiB
Python
342 lines
11 KiB
Python
import argparse
|
|
import asyncio
|
|
import random
|
|
import logging
|
|
from datetime import datetime
|
|
import traceback
|
|
|
|
__version__ = "1.7"
|
|
|
|
class ConnectionInfo:
|
|
def __init__(self, src_ip, dst_domain, method):
|
|
self.src_ip = src_ip
|
|
self.dst_domain = dst_domain
|
|
self.method = method
|
|
self.start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
self.traffic_in = 0
|
|
self.traffic_out = 0
|
|
|
|
|
|
class ProxyServer:
|
|
|
|
def __init__(self, host, port, log_access, quiet, verbose):
|
|
|
|
self.host = host
|
|
self.port = port
|
|
self.log_access_file = log_access
|
|
self.quiet = quiet
|
|
self.verbose = verbose
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
self.logging_access = None
|
|
|
|
self.total_connections = 0
|
|
self.allowed_connections = 0
|
|
self.blocked_connections = 0
|
|
self.traffic_in = 0
|
|
self.traffic_out = 0
|
|
self.last_traffic_in = 0
|
|
self.last_traffic_out = 0
|
|
self.speed_in = 0
|
|
self.speed_out = 0
|
|
self.last_time = None
|
|
|
|
self.active_connections: dict[tuple, ConnectionInfo] = {}
|
|
self.connections_lock = asyncio.Lock()
|
|
|
|
self.blocked = []
|
|
self.tasks = []
|
|
self.server = None
|
|
|
|
self.setup_logging()
|
|
|
|
def setup_logging(self):
|
|
"""
|
|
Set up the logging configuration.
|
|
|
|
The logging level is set to ERROR and the log messages are written to the
|
|
file specified by the log_file parameter. The log format is
|
|
[%(asctime)s][%(levelname)s]: %(message)s and the date format is
|
|
%Y-%m-%d %H:%M:%S.
|
|
"""
|
|
|
|
if self.log_access_file:
|
|
self.logging_access = logging.FileHandler(
|
|
self.log_access_file, encoding='utf-8')
|
|
|
|
self.logging_access.setFormatter(logging.Formatter("%(message)s"))
|
|
self.logging_access.setLevel(logging.INFO)
|
|
self.logging_access.addFilter(
|
|
lambda record: record.levelno == logging.INFO)
|
|
else:
|
|
self.logging_access = logging.NullHandler()
|
|
|
|
self.logger.propagate = False
|
|
self.logger.handlers = []
|
|
self.logger.setLevel(logging.INFO)
|
|
self.logger.addHandler(self.logging_access)
|
|
|
|
async def run(self):
|
|
"""
|
|
Start the proxy server and run it until it is stopped.
|
|
|
|
This method starts the proxy server by calling
|
|
`asyncio.start_server` with the `handle_connection` method as the
|
|
protocol handler. The server is then started with the `serve_forever`
|
|
method.
|
|
"""
|
|
self.print_banner()
|
|
self.server = await asyncio.start_server(
|
|
self.handle_connection, self.host, self.port
|
|
)
|
|
await self.server.serve_forever()
|
|
|
|
def print_banner(self):
|
|
"""
|
|
Print information about the proxy.
|
|
"""
|
|
print(f"Proxy is running on {self.host}:{self.port}")
|
|
print(f"Proxy started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
@staticmethod
|
|
def format_size(size):
|
|
"""
|
|
Convert a size in bytes to a human-readable string with appropriate units.
|
|
"""
|
|
units = ["B", "KB", "MB", "GB"]
|
|
unit = 0
|
|
while size >= 1024 and unit < len(units) - 1:
|
|
size /= 1024
|
|
unit += 1
|
|
return f"{size:.1f} {units[unit]}"
|
|
|
|
@staticmethod
|
|
def format_speed(speed_bps):
|
|
units = ["bps", "Kbps", "Mbps", "Gbps"]
|
|
unit = 0
|
|
speed = speed_bps
|
|
while speed >= 1000 and unit < len(units) - 1:
|
|
speed /= 1000
|
|
unit += 1
|
|
return f"{speed:.1f} {units[unit]}"
|
|
|
|
async def handle_connection(self, reader, writer):
|
|
"""
|
|
Handle a connection from a client.
|
|
|
|
This method is called when a connection is accepted from a client. It reads
|
|
the initial HTTP data from the client and tries to parse it as a CONNECT
|
|
request. If the request is valid, it opens a connection to the target
|
|
server and starts piping data between the client and the target server.
|
|
"""
|
|
|
|
try:
|
|
client_ip, client_port = writer.get_extra_info("peername")
|
|
http_data = await reader.read(1500)
|
|
if not http_data:
|
|
writer.close()
|
|
return
|
|
headers = http_data.split(b"\r\n")
|
|
first_line = headers[0].split(b" ")
|
|
method = first_line[0]
|
|
url = first_line[1]
|
|
|
|
if method == b"CONNECT":
|
|
host_port = url.split(b":")
|
|
host = host_port[0]
|
|
port = int(host_port[1]) if len(host_port) > 1 else 443
|
|
else:
|
|
host_header = next(
|
|
(h for h in headers if h.startswith(b"Host: ")), None
|
|
)
|
|
if not host_header:
|
|
raise ValueError("Missing Host header")
|
|
|
|
host_port = host_header[6:].split(b":")
|
|
host = host_port[0]
|
|
port = int(host_port[1]) if len(host_port) > 1 else 80
|
|
|
|
conn_key = (client_ip, client_port)
|
|
conn_info = ConnectionInfo(
|
|
client_ip, host.decode(), method.decode())
|
|
|
|
async with self.connections_lock:
|
|
self.active_connections[conn_key] = conn_info
|
|
|
|
if method == b"CONNECT":
|
|
writer.write(b"HTTP/1.1 200 Connection Established\r\n\r\n")
|
|
await writer.drain()
|
|
|
|
remote_reader, remote_writer = await asyncio.open_connection(
|
|
host.decode(), port
|
|
)
|
|
|
|
await self.fragment_data(reader, remote_writer)
|
|
else:
|
|
remote_reader, remote_writer = await asyncio.open_connection(
|
|
host.decode(), port
|
|
)
|
|
remote_writer.write(http_data)
|
|
await remote_writer.drain()
|
|
|
|
self.allowed_connections += 1
|
|
|
|
self.total_connections += 1
|
|
|
|
self.tasks.extend(
|
|
[
|
|
asyncio.create_task(
|
|
self.pipe(reader, remote_writer, "out", conn_key)
|
|
),
|
|
asyncio.create_task(
|
|
self.pipe(remote_reader, writer, "in", conn_key)
|
|
),
|
|
]
|
|
)
|
|
except Exception as e:
|
|
self.logger.error(traceback.format_exc())
|
|
if self.verbose:
|
|
print(f"[NON-CRITICAL]: {e}")
|
|
writer.close()
|
|
|
|
async def pipe(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, direction, conn_key):
|
|
"""
|
|
Pipe data from a reader to a writer.
|
|
|
|
This function reads data from a reader and writes it to a writer until
|
|
the reader is closed or the writer is closed. If an error occurs during
|
|
the transfer, the error is logged and the writer is closed.
|
|
"""
|
|
try:
|
|
while not reader.at_eof() and not writer.is_closing():
|
|
data = await reader.read(1500)
|
|
async with self.connections_lock:
|
|
conn_info: ConnectionInfo | None = self.active_connections.get(conn_key)
|
|
self.active_connections
|
|
if conn_info:
|
|
if direction == "out":
|
|
self.traffic_out += len(data)
|
|
conn_info.traffic_out += len(data)
|
|
else:
|
|
self.traffic_in += len(data)
|
|
conn_info.traffic_in += len(data)
|
|
writer.write(data)
|
|
await writer.drain()
|
|
except Exception as e:
|
|
self.logger.error(traceback.format_exc())
|
|
if self.verbose:
|
|
print(f"\033[93m[NON-CRITICAL]:\033[97m {e}\033[0m")
|
|
finally:
|
|
writer.close()
|
|
async with self.connections_lock:
|
|
conn_info: ConnectionInfo | None = self.active_connections.pop(
|
|
conn_key, None)
|
|
if conn_info:
|
|
self.logger.info(
|
|
f"{conn_info.start_time} {conn_info.src_ip} {conn_info.method} {conn_info.dst_domain}"
|
|
)
|
|
|
|
async def fragment_data(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
|
"""
|
|
Fragment data from a reader and write it to a writer.
|
|
|
|
This function reads data from a reader and fragments. It is split into
|
|
chunks and each chunk is written to the writer as a separate TLS record
|
|
"""
|
|
|
|
data: bytes = b''
|
|
|
|
try:
|
|
await reader.read(5)
|
|
data = await reader.read(2048)
|
|
except Exception as e:
|
|
self.logger.error(traceback.format_exc())
|
|
if self.verbose:
|
|
print(f"[NON-CRITICAL]: {e}")
|
|
return
|
|
|
|
self.blocked_connections += 1
|
|
|
|
host_end = data.find(b"\x00")
|
|
if host_end != -1:
|
|
writer.write(
|
|
bytes.fromhex("160304")
|
|
+ (host_end + 1).to_bytes(2, "big")
|
|
+ data[:host_end + 1]
|
|
)
|
|
data = data[host_end + 1:]
|
|
|
|
while data:
|
|
chunk_len = random.randint(1, len(data))
|
|
writer.write(
|
|
bytes.fromhex("160304")
|
|
+ chunk_len.to_bytes(2, "big")
|
|
+ data[:chunk_len]
|
|
)
|
|
data = data[chunk_len:]
|
|
|
|
await writer.drain()
|
|
|
|
async def shutdown(self):
|
|
"""
|
|
Shutdown the proxy server.
|
|
|
|
This function closes the server and cancels all tasks running on the
|
|
event loop. If a server is not running, the function does nothing.
|
|
"""
|
|
if self.server:
|
|
self.server.close()
|
|
await self.server.wait_closed()
|
|
for task in self.tasks:
|
|
task.cancel()
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--host", default="0.0.0.0", help="Proxy host")
|
|
parser.add_argument("--port", type=int, default=8881, help="Proxy port")
|
|
|
|
parser.add_argument("--blacklist",
|
|
default="blacklist.txt",
|
|
help="Path to blacklist file"
|
|
)
|
|
|
|
parser.add_argument("--log_access",
|
|
required=False,
|
|
help="Path to the access control log"
|
|
)
|
|
parser.add_argument("--log_error",
|
|
required=False,
|
|
help="Path to log file for errors"
|
|
)
|
|
|
|
parser.add_argument("-q", "--quiet",
|
|
action="store_true",
|
|
help="Remove UI output"
|
|
)
|
|
|
|
parser.add_argument("-v", "--verbose",
|
|
action="store_true",
|
|
help="Show more info (only for devs)"
|
|
)
|
|
|
|
logging.getLogger("asyncio").setLevel(logging.CRITICAL)
|
|
|
|
args = parser.parse_args()
|
|
proxy = ProxyServer(
|
|
args.host,
|
|
args.port,
|
|
args.log_access,
|
|
args.quiet,
|
|
args.verbose,
|
|
)
|
|
|
|
try:
|
|
await proxy.run()
|
|
except asyncio.CancelledError:
|
|
await proxy.shutdown()
|
|
print("Shutting down proxy...")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|