| # ============================================================================== | |
| # 依赖检查与自动安装 (Dependency Check and Auto-Installation) | |
| # ============================================================================== | |
| import sys | |
| import subprocess | |
| import importlib.util | |
| print("--- 正在检查并安装必要的依赖库 ---") | |
| # 定义需要检查和安装的库列表 | |
| # 格式为 '库名': '导入时使用的模块名' | |
| required_packages = { | |
| 'fastapi': 'fastapi', | |
| 'uvicorn': 'uvicorn', | |
| 'httpx': 'httpx', | |
| 'websockets': 'websockets', | |
| 'loguru': 'loguru', | |
| 'cryptography': 'cryptography', | |
| 'psutil': 'psutil', | |
| 'platform': 'platform' | |
| } | |
| def install_package(package_name): | |
| """使用 pip 安装指定的库""" | |
| try: | |
| print(f"正在尝试安装 {package_name}...") | |
| # 使用 sys.executable 确保使用当前 Python 解释器对应的 pip | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", package_name]) | |
| print(f"✅ 成功安装 {package_name}") | |
| except subprocess.CalledProcessError as e: | |
| print(f"❌ 安装 {package_name} 失败: {e}") | |
| # 根据需要可以决定是否在安装失败时退出程序 | |
| # sys.exit(1) | |
| # 遍历并检查每个库 | |
| for package_name, module_name in required_packages.items(): | |
| if importlib.util.find_spec(module_name) is None: | |
| print(f"⚠️ 未找到库: {package_name}。") | |
| install_package(package_name) | |
| else: | |
| print(f"✔️ 库 {package_name} 已安装。") | |
| print("--- 所有依赖库均已准备就绪 ---") | |
| print("\n" * 2) | |
| from fastapi import FastAPI, Request, Response, WebSocket, WebSocketDisconnect, Query | |
| import httpx | |
| import uvicorn | |
| import asyncio | |
| import websockets | |
| import json | |
| import subprocess | |
| import threading | |
| import time | |
| import os | |
| import random | |
| from typing import Dict | |
| from loguru import logger | |
| from starlette.responses import StreamingResponse | |
| import socket | |
| import base64 | |
| from cryptography.fernet import Fernet | |
| from cryptography.hazmat.primitives import hashes | |
| from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | |
| import re | |
| import platform | |
| import psutil | |
| import datetime | |
| app = FastAPI() | |
| # 加密配置 | |
| ENCRYPTION_KEY = os.getenv('ENCRYPTION_KEY', 'default-encryption-key-change-in-production') # 生产环境中应该使用环境变量 | |
| SALT = b'stable_salt_123456' # 固定salt,生产环境中应该更安全 | |
| def get_encryption_key(): | |
| """生成加密密钥""" | |
| kdf = PBKDF2HMAC( | |
| algorithm=hashes.SHA256(), | |
| length=32, | |
| salt=SALT, | |
| iterations=100000, | |
| ) | |
| key = base64.urlsafe_b64encode(kdf.derive(ENCRYPTION_KEY.encode())) | |
| return key | |
| def encrypt_data(data: str) -> str: | |
| """加密数据""" | |
| try: | |
| key = get_encryption_key() | |
| f = Fernet(key) | |
| encrypted_data = f.encrypt(data.encode()) | |
| return base64.urlsafe_b64encode(encrypted_data).decode() | |
| except Exception as e: | |
| logger.error(f"加密失败: {e}") | |
| return data | |
| def decrypt_data(encrypted_data: str) -> str: | |
| """解密数据""" | |
| try: | |
| key = get_encryption_key() | |
| f = Fernet(key) | |
| encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode()) | |
| decrypted_data = f.decrypt(encrypted_bytes) | |
| return decrypted_data.decode() | |
| except Exception as e: | |
| logger.error(f"解密失败: {e}") | |
| return encrypted_data | |
| def encrypt_json_response(data: dict) -> dict: | |
| """加密JSON响应数据""" | |
| try: | |
| json_str = json.dumps(data, ensure_ascii=False) | |
| encrypted_str = encrypt_data(json_str) | |
| return { | |
| "encrypted": True, | |
| "data": encrypted_str, | |
| "timestamp": int(time.time()) | |
| } | |
| except Exception as e: | |
| logger.error(f"加密JSON响应失败: {e}") | |
| return data | |
| # 浏览器实例管理 | |
| BROWSERS: Dict[str, dict] = {} # {browser_id: {"process": Popen, "ws": str, "port": int, "status": str, "info": dict}} | |
| # 浏览器实例状态: open, closed | |
| PORT_RANGE = (9300, 9700) | |
| CHROME_PATH = "google-chrome" # Linux下可用命令,或自定义绝对路径 | |
| PROFILE_BASE = "/tmp/profiles" | |
| def get_system_info(): | |
| # CPU型号 | |
| try: | |
| with open("/proc/cpuinfo") as f: | |
| cpuinfo = f.read() | |
| model_match = re.search(r"model name\s*:\s*(.+)", cpuinfo) | |
| cpu_model = model_match.group(1) if model_match else "Unknown" | |
| except Exception: | |
| cpu_model = "Unknown" | |
| # GPU型号 | |
| try: | |
| gpu_info = subprocess.check_output("nvidia-smi --query-gpu=name --format=csv,noheader", shell=True).decode().strip() | |
| gpu_model = gpu_info.split('\n')[0] if gpu_info else "Unknown" | |
| except Exception: | |
| try: | |
| lspci_info = subprocess.check_output("lspci | grep VGA", shell=True).decode().strip() | |
| gpu_model = lspci_info.split(":")[-1].strip() if lspci_info else "Unknown" | |
| except Exception: | |
| gpu_model = "Unknown" | |
| # 操作系统信息 | |
| os_name = platform.system() | |
| os_version = platform.version() | |
| os_release = platform.release() | |
| # 主机名 | |
| hostname = socket.gethostname() | |
| hostname_short = '-'.join(hostname.split('-')[-3:]) | |
| # 启动时间 | |
| boot_time = datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S") | |
| # 运行时长 | |
| uptime_seconds = int(time.time() - psutil.boot_time()) | |
| uptime = str(datetime.timedelta(seconds=uptime_seconds)) | |
| # 物理内存 | |
| mem = psutil.virtual_memory() | |
| mem_total = f"{int(mem.total / 1024 / 1024)}MB" | |
| # CPU核心数 | |
| cpu_count_logical = psutil.cpu_count() | |
| cpu_count_physical = psutil.cpu_count(logical=False) | |
| # 磁盘 | |
| disk = psutil.disk_usage('/') | |
| disk_total = f"{int(disk.total / 1024 / 1024 / 1024)}GB" | |
| disk_used = f"{int(disk.used / 1024 / 1024 / 1024)}GB" | |
| # 当前用户 | |
| try: | |
| user = os.getlogin() | |
| except Exception: | |
| user = "Unknown" | |
| # IP地址 | |
| try: | |
| ip = socket.gethostbyname(hostname) | |
| except Exception: | |
| ip = "Unknown" | |
| return { | |
| "cpu_model": cpu_model, | |
| "gpu_model": gpu_model, | |
| "os": f"{os_name} {os_release} ({os_version})", | |
| "hostname": hostname_short, | |
| "boot_time": boot_time, | |
| "uptime": uptime, | |
| "mem_total": mem_total, | |
| "cpu_count_logical": cpu_count_logical, | |
| "cpu_count_physical": cpu_count_physical, | |
| "disk_total": disk_total, | |
| "disk_used": disk_used, | |
| "user": user, | |
| "ip": ip | |
| } | |
| def get_system_usage(): | |
| cpu = subprocess.check_output("top -bn1 | grep 'Cpu(s)'", shell=True).decode().strip() | |
| mem = subprocess.check_output("free -m | grep Mem", shell=True).decode().strip() | |
| # 字段映射 | |
| cpu_key_map = { | |
| "us": "user", | |
| "sy": "system", | |
| "ni": "nice", | |
| "id": "idle", | |
| "wa": "iowait", | |
| "hi": "hardware_irq", | |
| "si": "software_irq", | |
| "st": "steal" | |
| } | |
| # 解析CPU信息 | |
| cpu_parts = cpu.split(":")[1].split(",") | |
| cpu_info = {} | |
| for part in cpu_parts: | |
| value, key = part.strip().split(" ", 1) | |
| key = key.strip().replace("%", "") | |
| mapped_key = cpu_key_map.get(key, key) | |
| cpu_info[mapped_key] = value.strip() + "%" | |
| # 解析内存信息 | |
| mem_values = mem.split()[1:4] | |
| mem_total, mem_used, mem_free = mem_values | |
| mem_available = int(mem_total) - int(mem_used) | |
| mem_info = { | |
| "Total": f"{mem_total}MB", | |
| "Used": f"{mem_used}MB", | |
| "Free": f"{mem_free}MB", | |
| "Available": f"{mem_available}MB" | |
| } | |
| sys_info = get_system_info() | |
| logger.info(f"CPU: {cpu_info}, Memory: {mem_info}") | |
| info = {"CPU": cpu_info, "Memory": mem_info, "System": sys_info} | |
| return info | |
| def get_free_port(): | |
| used_ports = {b["port"] for b in BROWSERS.values()} | |
| for _ in range(100): | |
| port = random.randint(*PORT_RANGE) | |
| if port not in used_ports: | |
| return port | |
| raise RuntimeError("No free port available") | |
| def gen_browser_id(): | |
| return str(int(time.time() * 1000)) + str(random.randint(1000, 9999)) | |
| def get_default_launch_args(): | |
| return [ | |
| "--disable-gpu", | |
| "--disable-dev-shm-usage", | |
| "--disable-software-rasterizer", | |
| "--disable-extensions", | |
| "--disable-background-networking", | |
| "--disable-default-apps", | |
| "--disable-sync", | |
| "--disable-translate", | |
| "--disable-features=TranslateUI", | |
| "--no-first-run", | |
| "--no-default-browser-check", | |
| "--remote-allow-origins=*", | |
| "--incognito", | |
| "--disable-blink-features=AutomationControlled", | |
| "--disable-notifications", | |
| "--disable-popup-blocking", | |
| "--disable-infobars", | |
| "--disable-features=TranslateUI,NotificationIndicator", | |
| "--headless" | |
| ] | |
| def wait_port(host, port, timeout=5.0): | |
| """等待端口可用""" | |
| start = time.time() | |
| while time.time() - start < timeout: | |
| try: | |
| with socket.create_connection((host, port), timeout=0.5): | |
| return True | |
| except Exception: | |
| time.sleep(0.1) | |
| return False | |
| @app.get("/system/usage") | |
| async def system_usage(): | |
| """ | |
| 获取系统资源使用情况 | |
| """ | |
| try: | |
| usage_info = get_system_usage() | |
| response_data = {"success": True, "msg": "success", "data": usage_info} | |
| #return encrypt_json_response(response_data) | |
| return response_data | |
| except Exception as e: | |
| logger.error(f"获取系统资源使用情况失败: {e}") | |
| error_data = {"success": False, "msg": "failed", "error": str(e)} | |
| #return encrypt_json_response(error_data) | |
| return error_data | |
| # BitBrowser风格API | |
| @app.post("/browser/creat") | |
| @app.post("/browser/update") | |
| @app.post("/browser/open") | |
| async def open_browser(request: Request): | |
| data = await request.json() | |
| browser_id = data.get("id", '') | |
| if browser_id in BROWSERS: | |
| logger.info(f"Browser ID {browser_id}: {BROWSERS[browser_id]}") | |
| if BROWSERS[browser_id]["status"] == "open": | |
| response_data = { | |
| "success": True, | |
| "msg": "already opened", | |
| "data": {"id": browser_id, "ws": BROWSERS[browser_id]["ws"]}, | |
| "info": BROWSERS[browser_id]["info"] | |
| } | |
| return encrypt_json_response(response_data) | |
| else: | |
| port = get_free_port() | |
| BROWSERS[browser_id]["port"] = port | |
| else: | |
| # 新建浏览器实例 | |
| browser_id = gen_browser_id() | |
| port = get_free_port() | |
| logger.info(f"新建浏览器实例: {browser_id} :{port}") | |
| profile_dir = f"{PROFILE_BASE}/{browser_id}" | |
| os.makedirs(profile_dir, exist_ok=True) | |
| # 解析指纹参数 | |
| browser_fp = data.get("browserFingerPrint", {}) | |
| launch_args = browser_fp.get("launchArgs", "") | |
| # 解析代理参数 | |
| proxy_type = data.get("proxyType", "").lower() | |
| host = data.get("host", "") | |
| proxy_port = data.get("port", "") | |
| proxy_user = data.get("proxyUserName", "") | |
| proxy_pass = data.get("proxyPassword", "") | |
| proxy_arg = "" | |
| if proxy_type and host and proxy_port: | |
| # 组装协议前缀 | |
| if proxy_type in ["http", "https", "socks5", "socks4"]: | |
| prefix = proxy_type | |
| else: | |
| prefix = "http" | |
| # 组装代理地址 | |
| if proxy_user and proxy_pass: | |
| proxy_url = f"{prefix}://{proxy_user}:{proxy_pass}@{host}:{proxy_port}" | |
| else: | |
| proxy_url = f"{prefix}://{host}:{proxy_port}" | |
| proxy_arg = f"--proxy-server={proxy_url}" | |
| args = [CHROME_PATH, f"--remote-debugging-port={port}", f"--user-data-dir={profile_dir}"] | |
| logger.info(f"使用默认参数: {get_default_launch_args()}") | |
| args.extend(get_default_launch_args()) | |
| if proxy_arg: | |
| args.append(proxy_arg) | |
| if launch_args: | |
| # 拆分参数并去重,保留顺序 | |
| base_args = args[1:] # 除去chrome可执行文件 | |
| extra_args = [a for a in launch_args.split() if a] | |
| all_args = base_args + extra_args | |
| seen = set() | |
| deduped_args = [] | |
| for arg in all_args: | |
| key = arg.split('=')[0].strip() if '=' in arg else arg.strip() | |
| if key not in seen: | |
| seen.add(key) | |
| deduped_args.append(arg) | |
| args = [CHROME_PATH] + deduped_args | |
| proc = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| if not wait_port("127.0.0.1", port, timeout=60): | |
| proc.terminate() | |
| error_data = {"success": False, "msg": f"chrome端口{port}未就绪", "data": {}} | |
| return encrypt_json_response(error_data) | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.get(f"http://127.0.0.1:{port}/json/version") | |
| if resp.status_code == 200: | |
| browser_info = resp.json() | |
| # 获取纯域名 (不带端口) | |
| hostname = request.url.hostname | |
| ws_url = browser_info.get("webSocketDebuggerUrl").replace("ws://127.0.0.1", f"wss://{hostname}") | |
| # 去掉端口号 | |
| ws_url = re.sub(r":\d+", "", ws_url) | |
| logger.info(f"Browser opened with ID: {browser_id}, Info: {browser_info}") | |
| BROWSERS[browser_id] = {"process": proc, "ws": ws_url, "port": port, "status": "open", "info": browser_info} | |
| response_data = {"success": True, "msg": "success", "data": {"id": browser_id, "ws": ws_url}, "info": browser_info} | |
| # return encrypt_json_response(response_data) | |
| return response_data | |
| @app.post("/browser/close") | |
| async def close_browser(request: Request): | |
| data = await request.json() | |
| browser_id = data.get("id") | |
| b = BROWSERS.get(browser_id) | |
| if not b: | |
| error_data = {"success": False, "msg": "not found"} | |
| #return encrypt_json_response(error_data) | |
| return error_data | |
| b["process"].terminate() | |
| b["status"] = "closed" | |
| response_data = {"success": True, "msg": "closed", "data": {"id": browser_id}} | |
| # return encrypt_json_response(response_data) | |
| return response_data | |
| @app.post("/browser/delete") | |
| @app.post("/browser/delete") | |
| async def delete_browser(request: Request): | |
| data = await request.json() | |
| browser_id = data.get("id") | |
| b = BROWSERS.pop(browser_id, None) | |
| if not b: | |
| error_data = {"success": False, "msg": "not found"} | |
| #return encrypt_json_response(error_data) | |
| return error_data | |
| try: | |
| b["process"].terminate() | |
| except Exception: | |
| pass | |
| # 删除用户数据目录 | |
| profile_dir = f"{PROFILE_BASE}/{browser_id}" | |
| if os.path.exists(profile_dir): | |
| import shutil | |
| shutil.rmtree(profile_dir, ignore_errors=True) | |
| response_data = {"success": True, "msg": "deleted", "data": {"id": browser_id}} | |
| # return encrypt_json_response(response_data) | |
| return response_data | |
| # @app.post("/browser/update") | |
| # async def update_browser(request: Request): | |
| # data = await request.json() | |
| # # 兼容 BitBrowser 结构 | |
| # launch_args = "" | |
| # if isinstance(data.get("browserFingerPrint"), dict): | |
| # launch_args = data["browserFingerPrint"].get("launchArgs", "") | |
| # elif "launchArgs" in data: | |
| # launch_args = data["launchArgs"] | |
| # # 合并 launchArgs 到 data | |
| # if launch_args: | |
| # data["launchArgs"] = launch_args | |
| # return await open_browser(Request({**request.scope, "body": json.dumps(data).encode()}, receive=request._receive)) | |
| @app.post("/browser/ports") | |
| async def browser_ports(): | |
| # 返回 {id: port} 格式 | |
| data = {k: str(v["ws"]) for k, v in BROWSERS.items() if v["status"] == "open"} | |
| response_data = {"success": True, "data": data} | |
| #return encrypt_json_response(response_data) | |
| return response_data | |
| @app.post("/health") | |
| async def health(): | |
| response_data = {"success": True, "msg": "ok"} | |
| # return encrypt_json_response(response_data) | |
| return response_data | |
| @app.post("/decrypt") | |
| async def decrypt_response(request: Request): | |
| """ | |
| 解密接口,用于客户端解密响应数据 | |
| """ | |
| try: | |
| data = await request.json() | |
| encrypted_data = data.get("data") | |
| if not encrypted_data: | |
| return {"success": False, "msg": "missing encrypted data"} | |
| decrypted_str = decrypt_data(encrypted_data) | |
| decrypted_json = json.loads(decrypted_str) | |
| return {"success": True, "msg": "decrypted", "data": decrypted_json} | |
| except Exception as e: | |
| logger.error(f"解密失败: {e}") | |
| return {"success": False, "msg": "decrypt failed", "error": str(e)} | |
| # 添加获取加密配置的接口(仅用于开发调试) | |
| @app.get("/encryption/info") | |
| async def encryption_info(): | |
| """ | |
| 返回加密相关信息(用于开发调试) | |
| """ | |
| return { | |
| "encrypted": True, | |
| "algorithm": "Fernet (AES 128)", | |
| "key_derivation": "PBKDF2HMAC with SHA256", | |
| "iterations": 100000, | |
| "info": "All JSON responses are encrypted. Use /decrypt endpoint to decrypt." | |
| } | |
| CDP_PATHS = [ | |
| "/json", | |
| "/json/list", | |
| "/json/version", | |
| "/json/protocol", | |
| ] | |
| def get_browser_by_id(browser_id: str): | |
| b = BROWSERS.get(browser_id) | |
| if not b or b["status"] != "open": | |
| return None | |
| return b | |
| async def find_browser_by_target_id(tab_type,target_id: str): | |
| """ | |
| 根据 targetId 智能查找对应的浏览器实例 | |
| 1. 检查 target_id 是否是一个 browser_id | |
| 2. 如果不是,查询所有浏览器实例的页面列表,找到匹配 target_id 的页面所在浏览器 | |
| 3. 如果找不到,返回第一个可用的浏览器实例 | |
| """ | |
| if tab_type == "browser": | |
| # 查询所有浏览器实例,找到匹配的页面 | |
| for browser_id, browser in BROWSERS.items(): | |
| ws_url = browser.get("info", {}).get("webSocketDebuggerUrl", "") | |
| if target_id in ws_url: | |
| return browser | |
| # 查询所有浏览器实例,找到匹配的页面 | |
| for browser_id, browser in BROWSERS.items(): | |
| if browser["status"] != "open": | |
| continue | |
| try: | |
| # 尝试获取浏览器的页面列表 | |
| port = browser["port"] | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.get(f"http://127.0.0.1:{port}/json/list") | |
| if resp.status_code == 200: | |
| pages = resp.json() | |
| # 检查页面是否包含 target_id | |
| for page in pages: | |
| if page.get("id") == target_id: | |
| return browser | |
| except Exception as e: | |
| logger.error(f"查询浏览器 {browser_id} 页面列表失败: {e}") | |
| # 如果找不到,返回第一个可用的浏览器实例 | |
| for browser_id, browser in BROWSERS.items(): | |
| if browser["status"] == "open": | |
| return browser | |
| return None | |
| @app.api_route("/json", methods=["GET"]) | |
| @app.api_route("/json/list", methods=["GET"]) | |
| @app.api_route("/json/version", methods=["GET"]) | |
| @app.api_route("/json/protocol", methods=["GET"]) | |
| async def cdp_native_proxy(request: Request): | |
| # 这个接口只返回最后一个浏览器实例的内容 | |
| # 1. 获取最后一个浏览器实例 | |
| if not BROWSERS: | |
| return Response(content="No browser instance", status_code=404) | |
| browser_id = next(reversed(BROWSERS.keys())) | |
| b = get_browser_by_id(browser_id) | |
| if not b: | |
| return Response(content="browser not found", status_code=404) | |
| port = b["port"] | |
| # 2. 构造目标URL | |
| path = request.url.path | |
| url = f"http://127.0.0.1:{port}{path}" | |
| # 3. 代理请求 | |
| headers = dict(request.headers) | |
| headers["host"] = "127.0.0.1" | |
| body = await request.body() | |
| async with httpx.AsyncClient(follow_redirects=True) as client: | |
| resp = await client.request( | |
| request.method, | |
| url, | |
| headers=headers, | |
| content=body, | |
| params=request.query_params | |
| ) | |
| return Response( | |
| content=resp.content, | |
| status_code=resp.status_code, | |
| headers=dict(resp.headers) | |
| ) | |
| @app.websocket("/devtools/{tab_type}/{target_id}") | |
| async def cdp_native_ws_proxy(websocket: WebSocket, tab_type: str, target_id: str): | |
| if not target_id: | |
| await websocket.close() | |
| return | |
| # 智能查找 target_id 对应的浏览器实例 | |
| browser = await find_browser_by_target_id(tab_type,target_id) | |
| if not browser: | |
| await websocket.close(code=1008, reason="无法找到有效的浏览器实例") | |
| return | |
| await websocket.accept() | |
| tab_type = "page" if tab_type == "page" else "browser" | |
| logger.info(f"WebSocket连接: {browser}") | |
| port = browser["port"] | |
| target_url = f"ws://127.0.0.1:{port}/devtools/{tab_type}/{target_id}" | |
| logger.info(f"Forwarding to: {target_url}") | |
| try: | |
| async with websockets.connect(target_url) as target_ws: | |
| async def forward_client_to_server(): | |
| try: | |
| while True: | |
| data = await websocket.receive_text() | |
| await target_ws.send(data) | |
| except WebSocketDisconnect: | |
| pass | |
| async def forward_server_to_client(): | |
| try: | |
| while True: | |
| response = await target_ws.recv() | |
| await websocket.send_text(response) | |
| except websockets.exceptions.ConnectionClosed: | |
| pass | |
| await asyncio.gather( | |
| forward_client_to_server(), | |
| forward_server_to_client() | |
| ) | |
| except Exception as e: | |
| logger.error(f"WebSocket代理错误: {e}") | |
| finally: | |
| await websocket.close() | |
| if __name__ == "__main__": | |
| uvicorn.run("api:app", host="0.0.0.0", port=7860, reload=True) | |