下载pydroid 参考 https://blog.qaiu.top/archives/pydroid3v72
nfd−proxy分布式代理节点客户端(简称dnode)
用于处理netdisk-fast-download服务端解析ip问题,当前nfd解析服务端IP有时候会被ban,或者云盘本身禁止云服务商解析下载 此时就需要配置代理节点,而代理最真实的就是用户自身的IP,因此本项目应运而生…废话不多说,直接提供代码,可以运行在任意python3环境
dnode技术文档参考另一篇文章
pydroid运行
#!/usr/bin/env python3
"""
NFD DNode 启动器 run.py
════════════════════════════════════════════════════════════════
功能:
· 检查 Python 版本 (>= 3.8)
· 检测/创建虚拟环境 (可选)
· 自动安装缺失依赖 (websockets)
· 运行本地 dnode_client.py 或从 URL 拉取执行
用法:
python run.py # 运行本地 dnode_client.py
python run.py wss://host:9000/ws/node # 传参给客户端
python run.py --url https://qaiu.top/src/py/dnode_client.py
python run.py --venv # 强制使用 venv
python run.py --no-venv # 跳过 venv
python run.py --debug # DEBUG 模式
Pydroid / 嵌入式 Python 一行启动:
exec(open('run.py').read()) # 本地
from urllib.request import urlopen
exec(urlopen('https://qaiu.top/src/py/run.py').read()) # 远程
════════════════════════════════════════════════════════════════
"""
import os
import sys
import subprocess
import argparse
import platform
# ── 配置 ────────────────────────────────────────────────
# 本地客户端文件名(与 run.py 同目录)
LOCAL_CLIENT = "dnode_client.py"
# 远程客户端 URL(--url 未指定时的默认远程地址)
REMOTE_CLIENT = "https://qaiu.top/src/py/dnode_client_v5.py"
# 唯一依赖
REQUIRED_PKGS = ["websockets"]
# 虚拟环境目录名
VENV_DIR = ".venv"
# Python 最低版本
MIN_PYTHON = (3, 8)
IS_ANDROID = (
os.path.exists("/data/data") or
os.environ.get("ANDROID_ROOT") is not None
)
IS_WIN = sys.platform == "win32"
# ════════════════════════════════════════════════════════
# 版本检查
# ════════════════════════════════════════════════════════
def check_python():
v = sys.version_info[:2]
if v < MIN_PYTHON:
print(f"[run] ✗ Python {'.'.join(map(str, MIN_PYTHON))}+ 必需,当前 {sys.version.split()[0]}")
sys.exit(1)
print(f"[run] ✓ Python {sys.version.split()[0]} {platform.system()} {platform.machine()}")
# ════════════════════════════════════════════════════════
# 虚拟环境
# ════════════════════════════════════════════════════════
def _in_venv() -> bool:
"""当前进程是否已在 venv/conda 环境中"""
return (
sys.prefix != sys.base_prefix or
os.environ.get("CONDA_DEFAULT_ENV") is not None or
os.environ.get("VIRTUAL_ENV") is not None
)
def ensure_venv(force: bool = False) -> bool:
"""
检测/创建 venv。
返回 True → 已在合适环境,继续本进程
返回 False → 已重启到 venv Python(当前进程应退出)
"""
if IS_ANDROID:
print("[run] Android 环境,跳过 venv")
return True
if _in_venv() and not force:
print(f"[run] ✓ 已在虚拟环境: {sys.prefix}")
return True
venv_python = (
os.path.join(VENV_DIR, "Scripts", "python.exe") if IS_WIN
else os.path.join(VENV_DIR, "bin", "python")
)
if not os.path.exists(venv_python):
print(f"[run] 创建虚拟环境 {VENV_DIR}/ ...")
try:
subprocess.run(
[sys.executable, "-m", "venv", VENV_DIR],
check=True
)
except subprocess.CalledProcessError as e:
print(f"[run] ✗ venv 创建失败: {e},使用系统 Python 继续")
return True
print(f"[run] 重启到 venv: {venv_python}")
# 用 venv Python 重新运行当前脚本,透传所有参数
os.execv(venv_python, [venv_python] + sys.argv)
# os.execv 不会返回;下面代码不会执行
return False # pragma: no cover
# ════════════════════════════════════════════════════════
# 依赖安装
# ════════════════════════════════════════════════════════
def _pkg_available(pkg: str) -> bool:
"""检查包是否可 import(用包名去掉版本约束)"""
import importlib
name = pkg.split(">=")[0].split("==")[0].split("[")[0].strip()
try:
importlib.import_module(name)
return True
except ImportError:
return False
def install_deps(pkgs: list):
missing = [p for p in pkgs if not _pkg_available(p)]
if not missing:
print(f"[run] ✓ 依赖就绪: {', '.join(pkgs)}")
return
print(f"[run] 安装依赖: {', '.join(missing)}")
# Android Termux / 受限环境需要 --break-system-packages
extra = []
if IS_ANDROID or os.environ.get("TERMUX_VERSION"):
extra = ["--break-system-packages"]
cmd = [sys.executable, "-m", "pip", "install", "--upgrade",
"--quiet", *extra, *missing]
try:
subprocess.run(cmd, check=True)
print(f"[run] ✓ 安装完成: {', '.join(missing)}")
except subprocess.CalledProcessError:
# pip 失败时尝试 --user
try:
cmd_user = [sys.executable, "-m", "pip", "install",
"--user", "--quiet", *missing]
subprocess.run(cmd_user, check=True)
print(f"[run] ✓ 安装完成 (--user): {', '.join(missing)}")
except subprocess.CalledProcessError as e:
print(f"[run] ✗ 依赖安装失败: {e}")
print(f" 请手动运行: pip install {' '.join(missing)}")
sys.exit(1)
# ════════════════════════════════════════════════════════
# 客户端加载
# ════════════════════════════════════════════════════════
def _load_local(path: str) -> str:
with open(path, encoding="utf-8") as f:
return f.read()
def _load_remote(url: str) -> str:
print(f"[run] 从远程加载客户端: {url}")
try:
from urllib.request import urlopen, Request
req = Request(url, headers={"User-Agent": f"NFD-run/{sys.version.split()[0]}"})
with urlopen(req, timeout=15) as r:
return r.read().decode("utf-8")
except Exception as e:
print(f"[run] ✗ 远程加载失败: {e}")
sys.exit(1)
def run_client(source: str, extra_argv: list):
"""在当前进程内 exec 客户端代码,透传命令行参数"""
# 注入参数:让客户端的 argparse 能拿到
sys.argv = [LOCAL_CLIENT] + extra_argv
code = compile(source, LOCAL_CLIENT, "exec")
ns = {"__name__": "__main__", "__file__": LOCAL_CLIENT}
exec(code, ns)
# ════════════════════════════════════════════════════════
# 参数解析
# ════════════════════════════════════════════════════════
def parse_launcher_args():
"""
解析 run.py 自身参数;未识别的参数全部透传给客户端
"""
parser = argparse.ArgumentParser(
description="NFD DNode 启动器",
add_help=False, # 让客户端处理 -h/--help
)
parser.add_argument("--url", default=None,
help=f"远程客户端 URL (默认: {REMOTE_CLIENT})")
parser.add_argument("--venv", action="store_true",
help="强制创建/使用 venv")
parser.add_argument("--no-venv", action="store_true",
help="跳过 venv 检查,直接运行")
parser.add_argument("--no-check", action="store_true",
help="跳过版本和依赖检查(加速启动)")
# 其余参数透传给客户端
known, passthrough = parser.parse_known_args()
return known, passthrough
# ════════════════════════════════════════════════════════
# 入口
# ════════════════════════════════════════════════════════
def main():
opts, client_argv = parse_launcher_args()
if not opts.no_check:
check_python()
# venv 管理(Android 跳过)
if not opts.no_venv and not IS_ANDROID:
if opts.venv:
ensure_venv(force=True)
elif not _in_venv():
# 自动检测:如果有 .venv 目录就进入,否则不创建(静默)
venv_python = (
os.path.join(VENV_DIR, "Scripts", "python.exe") if IS_WIN
else os.path.join(VENV_DIR, "bin", "python")
)
if os.path.exists(venv_python):
print(f"[run] 检测到 {VENV_DIR}/,切换到 venv Python")
os.execv(venv_python, [venv_python] + sys.argv)
# 安装依赖
if not opts.no_check:
install_deps(REQUIRED_PKGS)
# 加载客户端源码
if opts.url:
# 指定了 --url,从远程加载
source = _load_remote(opts.url)
else:
# 优先本地文件
local_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), LOCAL_CLIENT)
if os.path.exists(local_path):
print(f"[run] 加载本地客户端: {local_path}")
source = _load_local(local_path)
else:
# 本地文件不存在,从远程拉取
source = _load_remote(REMOTE_CLIENT)
print("[run] 启动客户端...\n" + "─" * 50)
run_client(source, client_argv)
if __name__ == "__main__":
main()
dnode_client_v4完整代码
#!/usr/bin/env python3
"""
NFD 客户端节点 ── Android APK / 跨平台
════════════════════════════════════════════════════════════════
服务端地址优先级(高→低):
1. 命令行参数 python node_client.py wss://host:9000/ws/node
2. 环境变量 NFD_SERVER=wss://host:9000/ws/node
3. 配置文件 node_config.json 中的 server_url 字段
4. 内置默认值 ws://proxy.nfd.com:9000/ws/node
其他可选参数:
--secret <token> 接入密钥(也可在配置文件或 NFD_SECRET 环境变量中设置)
--id <node_id> 指定设备 ID(默认自动生成并持久化)
--debug 等同于 LOG_LEVEL=DEBUG
Android 配置文件路径优先级:
$NFD_CONFIG_DIR/node_config.json
/sdcard/nfd/node_config.json
<脚本同级目录>/node_config.json
用法示例:
python node_client.py wss://proxy.example.com:9000/ws/node
python node_client.py wss://proxy.example.com:9000/ws/node --secret abc123
NFD_SERVER=wss://... python node_client.py
"""
import argparse
import asyncio
import hashlib
import json
import logging
import os
import platform
import signal
import socket
import ssl
import struct
import sys
import time
import uuid
from typing import Dict, Optional
import aiohttp
# ══════════════════════════════════════════════════════════
# Android 检测
# ══════════════════════════════════════════════════════════
def _is_android() -> bool:
return (
os.path.exists("/data/data") or
os.environ.get("ANDROID_ROOT") is not None or
os.environ.get("ANDROID_DATA") is not None
)
IS_ANDROID = _is_android()
PLATFORM = "Android" if IS_ANDROID else platform.system()
# ══════════════════════════════════════════════════════════
# 配置文件路径(Android 友好)
# ══════════════════════════════════════════════════════════
def _config_path() -> str:
# 1. 环境变量指定目录
env_dir = os.environ.get("NFD_CONFIG_DIR", "")
if env_dir:
return os.path.join(env_dir, "node_config.json")
# 2. Android:优先 SD 卡可读目录
if IS_ANDROID:
sdcard = "/sdcard/nfd"
try:
os.makedirs(sdcard, exist_ok=True)
return os.path.join(sdcard, "node_config.json")
except OSError:
pass
# 3. 脚本同级目录
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "node_config.json")
CONFIG_FILE = _config_path()
# ══════════════════════════════════════════════════════════
# 日志
# ══════════════════════════════════════════════════════════
def _setup_logging(debug: bool = False) -> None:
level = logging.DEBUG if debug else getattr(
logging, os.environ.get("LOG_LEVEL", "INFO").upper(), logging.INFO
)
fmt = "%(asctime)s.%(msecs)03d [%(levelname)-5s] %(name)s - %(message)s"
# Android 下 basicConfig 可能已被 chaquopy 初始化,安全地重设
root = logging.getLogger()
root.setLevel(level)
if not root.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
root.addHandler(handler)
else:
for h in root.handlers:
h.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
logger = logging.getLogger("nfd.node")
flow_log = logging.getLogger("nfd.flow")
# ══════════════════════════════════════════════════════════
# 工具函数
# ══════════════════════════════════════════════════════════
def _fmt_bytes(n: int) -> str:
n = int(n or 0)
for unit in ("B", "KB", "MB", "GB"):
if n < 1024:
return f"{n:.1f}{unit}"
n /= 1024
return f"{n:.1f}TB"
def _local_ip() -> str:
"""获取本机出口 IP(Android/Linux/Windows 均可用)"""
targets = [("8.8.8.8", 80), ("1.1.1.1", 80), ("114.114.114.114", 80)]
for host, port in targets:
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.settimeout(1)
s.connect((host, port))
return s.getsockname()[0]
except OSError:
continue
return "?"
def _device_fp() -> str:
"""设备指纹:Android 优先读 Android ID,回退到通用方案"""
if IS_ANDROID:
# 尝试读取 Android ID(需 READ_PHONE_STATE 或已知路径)
try:
import subprocess
aid = subprocess.check_output(
["settings", "get", "secure", "android_id"],
timeout=2, stderr=subprocess.DEVNULL
).decode().strip()
if aid and aid != "null":
return hashlib.sha256(aid.encode()).hexdigest()[:16]
except Exception:
pass
raw = f"{platform.node()}-{PLATFORM}-{uuid.getnode()}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def _make_ssl_ctx(url: str) -> Optional[ssl.SSLContext]:
"""wss:// 创建 SSL context;Android 证书链有时需宽松验证"""
if not url.startswith("wss://"):
return None
ctx = ssl.create_default_context()
if IS_ANDROID:
# Android 系统根证书路径不标准,允许加载系统证书
for ca_path in (
"/system/etc/security/cacerts",
"/data/misc/keychain/certs-added",
):
if os.path.isdir(ca_path):
for f in os.listdir(ca_path):
try:
ctx.load_verify_locations(os.path.join(ca_path, f))
except Exception:
pass
# 如果仍有问题,可通过环境变量 NFD_SSL_NOVERIFY=1 跳过验证(不推荐生产)
if os.environ.get("NFD_SSL_NOVERIFY", "0") == "1":
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
logger.warning("SSL 证书验证已禁用(NFD_SSL_NOVERIFY=1)")
return ctx
# ══════════════════════════════════════════════════════════
# 帧协议
# ══════════════════════════════════════════════════════════
TYPE_CONNECT_REQ = 0x01
TYPE_CONNECT_OK = 0x02
TYPE_CONNECT_FAIL = 0x03
TYPE_DATA = 0x04
TYPE_CLOSE = 0x05
MAX_CONCURRENCY = 50
def pack_frame(tunnel_id: int, ftype: int, payload: bytes = b"") -> bytes:
return struct.pack(">IB", tunnel_id, ftype) + payload
def unpack_frame(data: bytes):
tunnel_id, ftype = struct.unpack(">IB", data[:5])
return tunnel_id, ftype, data[5:]
# ══════════════════════════════════════════════════════════
# 单条隧道
# ══════════════════════════════════════════════════════════
class TunnelWorker:
def __init__(self, tid: int, host: str, port: int,
ws_send_queue: asyncio.Queue, local_ip: str):
self.tid = tid
self.host = host
self.port = port
self._ws_queue = ws_send_queue
self._local_ip = local_ip
self._tcp_writer: Optional[asyncio.StreamWriter] = None
self._data_queue: asyncio.Queue = asyncio.Queue()
self._closed = False
self._t_start = time.monotonic()
self._tx_bytes = 0
self._rx_bytes = 0
async def run(self):
flow_log.debug(f"[node] tid={self.tid:08x} | {self._local_ip} → {self.host}:{self.port} | TCP_CONNECTING")
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=10
)
self._tcp_writer = writer
local = writer.get_extra_info("sockname")
local_addr = f"{local[0]}:{local[1]}" if local else self._local_ip
except Exception as e:
flow_log.debug(f"[node] tid={self.tid:08x} | TCP_FAIL {self.host}:{self.port} err={e}")
await self._ws_queue.put(pack_frame(self.tid, TYPE_CONNECT_FAIL, str(e).encode()))
return
conn_ms = f"{(time.monotonic()-self._t_start)*1000:.0f}ms"
await self._ws_queue.put(pack_frame(self.tid, TYPE_CONNECT_OK))
flow_log.debug(f"[node] tid={self.tid:08x} | {local_addr} → {self.host}:{self.port} | TCP_CONNECTED conn={conn_ms}")
async def tcp_to_ws():
try:
while not self._closed:
chunk = await reader.read(65536)
if not chunk:
break
self._rx_bytes += len(chunk)
flow_log.debug(
f"[node] tid={self.tid:08x} | "
f"target→node chunk={_fmt_bytes(len(chunk))} ↓{_fmt_bytes(self._rx_bytes)}"
)
await self._ws_queue.put(pack_frame(self.tid, TYPE_DATA, chunk))
except Exception as e:
flow_log.debug(f"[node] tid={self.tid:08x} | tcp_to_ws err={e}")
finally:
await self._ws_queue.put(pack_frame(self.tid, TYPE_CLOSE))
self._closed = True
async def ws_to_tcp():
try:
while not self._closed:
chunk = await self._data_queue.get()
if chunk is None:
break
self._tx_bytes += len(chunk)
flow_log.debug(
f"[node] tid={self.tid:08x} | "
f"node→target chunk={_fmt_bytes(len(chunk))} ↑{_fmt_bytes(self._tx_bytes)}"
)
writer.write(chunk)
await writer.drain()
except Exception as e:
flow_log.debug(f"[node] tid={self.tid:08x} | ws_to_tcp err={e}")
finally:
try:
writer.close()
except Exception:
pass
self._closed = True
await asyncio.gather(tcp_to_ws(), ws_to_tcp(), return_exceptions=True)
logger.info(
f"[node] tid={self.tid:08x} | "
f"{self._local_ip} ⇄ {self.host}:{self.port} | "
f"CLOSED ↑{_fmt_bytes(self._tx_bytes)} ↓{_fmt_bytes(self._rx_bytes)} "
f"dur={time.monotonic()-self._t_start:.2f}s"
)
def feed_data(self, data: bytes):
if not self._closed:
self._data_queue.put_nowait(data)
def close(self):
self._closed = True
self._data_queue.put_nowait(None)
if self._tcp_writer:
try:
self._tcp_writer.close()
except Exception:
pass
# ══════════════════════════════════════════════════════════
# 节点主体
# ══════════════════════════════════════════════════════════
class ProxyNode:
RECONNECT_DELAYS = [2, 4, 8, 16, 30, 60]
def __init__(self, server_url: str, node_id: str, secret: str = "", is_default: bool = False):
self.server_url = server_url
self.node_id = node_id
self.secret = secret
self.is_default = is_default
self._local_ip = _local_ip()
self._ssl_ctx = _make_ssl_ctx(server_url)
self._tunnels: Dict[int, TunnelWorker] = {}
self._ws_queue: asyncio.Queue = asyncio.Queue()
self._sem = asyncio.Semaphore(MAX_CONCURRENCY)
self._running = False
self._total_tunnels = 0
self._total_tx = 0
self._total_rx = 0
async def run(self):
self._running = True
attempt = 0
while self._running:
try:
await self._connect_loop()
attempt = 0
except Exception as e:
delay = self.RECONNECT_DELAYS[min(attempt, len(self.RECONNECT_DELAYS)-1)]
logger.warning(
f"连接断开 ({type(e).__name__}: {e}),"
f"{delay}s 后重连... (第{attempt+1}次)"
)
await asyncio.sleep(delay)
attempt += 1
def stop(self):
self._running = False
for t in list(self._tunnels.values()):
t.close()
async def _connect_loop(self):
headers = {
"X-Node-ID": self.node_id,
"X-Platform": PLATFORM,
"X-Version": "3.0",
"X-Device-FP": _device_fp(),
"X-Local-IP": self._local_ip,
"X-Default-Node": "true" if self.is_default else "false",
}
if self.secret:
headers["Authorization"] = f"Bearer {self.secret}"
logger.info(
f"连接服务端 url={self.server_url} "
f"platform={PLATFORM} local_ip={self._local_ip} "
f"node_id={self.node_id[:8]}"
)
# Android 上 aiohttp 需要明确传入 ssl context 或 False
ws_kwargs: dict = dict(
headers=headers,
heartbeat=30,
max_msg_size=0,
)
if self._ssl_ctx is not None:
ws_kwargs["ssl"] = self._ssl_ctx
elif self.server_url.startswith("wss://"):
ws_kwargs["ssl"] = True # fallback: 使用系统默认
async with aiohttp.ClientSession() as sess:
async with sess.ws_connect(self.server_url, **ws_kwargs) as ws:
logger.info(
f"✅ 已连接 url={self.server_url} "
f"local_ip={self._local_ip} node_id={self.node_id[:8]}"
)
send_task = asyncio.create_task(self._ws_sender(ws))
try:
await self._ws_receiver(ws)
finally:
send_task.cancel()
for t in list(self._tunnels.values()):
t.close()
self._tunnels.clear()
logger.info(
f"连接断开 tunnels={self._total_tunnels} "
f"↑{_fmt_bytes(self._total_tx)} ↓{_fmt_bytes(self._total_rx)}"
)
async def _ws_sender(self, ws):
while True:
frame = await self._ws_queue.get()
try:
await ws.send_bytes(frame)
except Exception as e:
logger.warning(f"WS 发送失败: {e}")
break
async def _ws_receiver(self, ws):
async for msg in ws:
if msg.type == aiohttp.WSMsgType.BINARY:
await self._dispatch(msg.data)
elif msg.type == aiohttp.WSMsgType.TEXT:
try:
d = json.loads(msg.data)
if d.get("type") == "pong":
flow_log.debug("heartbeat pong")
except Exception:
pass
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSE):
break
async def _dispatch(self, raw: bytes):
if len(raw) < 5:
return
tid, ftype, payload = unpack_frame(raw)
if ftype == TYPE_CONNECT_REQ:
host_len = payload[0]
host = payload[1:1+host_len].decode()
port = struct.unpack(">H", payload[1+host_len:3+host_len])[0]
self._total_tunnels += 1
logger.info(
f"[node] tid={tid:08x} | "
f"新隧道 → {host}:{port} "
f"active={len(self._tunnels)+1}/{MAX_CONCURRENCY}"
)
asyncio.create_task(self._run_tunnel(tid, host, port))
elif ftype == TYPE_DATA:
t = self._tunnels.get(tid)
if t:
t.feed_data(payload)
else:
flow_log.debug(f"DATA tid={tid:08x} 无隧道")
elif ftype == TYPE_CLOSE:
t = self._tunnels.pop(tid, None)
if t:
t.close()
async def _run_tunnel(self, tid: int, host: str, port: int):
async with self._sem:
worker = TunnelWorker(tid, host, port, self._ws_queue, self._local_ip)
self._tunnels[tid] = worker
await worker.run()
self._tunnels.pop(tid, None)
self._total_tx += worker._tx_bytes
self._total_rx += worker._rx_bytes
# ══════════════════════════════════════════════════════════
# 配置加载 & CLI
# ══════════════════════════════════════════════════════════
DEFAULT_SERVER = "wss://dnode.qaiu.top/ws/node"
def load_file_config() -> dict:
"""从配置文件读取(文件不存在则创建模板)"""
defaults = {"server_url": DEFAULT_SERVER, "node_id": "", "secret": ""}
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, encoding="utf-8") as f:
return {**defaults, **json.load(f)}
except Exception as e:
logger.warning(f"配置文件读取失败: {e},使用默认值")
return defaults
# 首次运行:写入模板(含新生成 node_id)
defaults["node_id"] = uuid.uuid4().hex
try:
os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True)
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(defaults, f, indent=2, ensure_ascii=False)
logger.info(f"已生成配置文件: {CONFIG_FILE}")
except OSError as e:
logger.warning(f"配置文件写入失败: {e}")
return defaults
def save_node_id(node_id: str):
"""持久化自动生成的 node_id"""
try:
cfg = {}
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, encoding="utf-8") as f:
cfg = json.load(f)
cfg["node_id"] = node_id
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
except OSError:
pass
def parse_args():
parser = argparse.ArgumentParser(
description="NFD 代理客户端节点",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python node_client.py wss://proxy.example.com:9000/ws/node
python node_client.py wss://proxy.example.com:9000/ws/node --secret mytoken
python node_client.py --debug
NFD_SERVER=wss://... python node_client.py
""",
)
parser.add_argument(
"server_url",
nargs="?",
default=None,
metavar="SERVER_URL",
help="服务端 ws:// 或 wss:// 地址(最高优先级)",
)
parser.add_argument("--secret", default=None, help="接入密钥")
parser.add_argument("--id", default=None, help="指定 node_id(设备标识)")
parser.add_argument("--debug", action="store_true", help="开启 DEBUG 日志")
parser.add_argument(
"--default", action="store_true",
help="标记为默认节点(兜底分发,服务器本地部署时使用)"
)
return parser.parse_args()
# ══════════════════════════════════════════════════════════
# 入口
# ══════════════════════════════════════════════════════════
async def amain():
args = parse_args()
_setup_logging(args.debug)
file_cfg = load_file_config()
# 服务端地址优先级: CLI arg > 环境变量 > 配置文件 > 默认
server_url = (
args.server_url
or os.environ.get("NFD_SERVER", "")
or file_cfg.get("server_url", "")
or DEFAULT_SERVER
).strip()
# 密钥优先级: CLI > 环境变量 > 配置文件
secret = (
args.secret
or os.environ.get("NFD_SECRET", "")
or file_cfg.get("secret", "")
)
# node_id 优先级: CLI > 配置文件 > 自动生成
node_id = (
args.id
or file_cfg.get("node_id", "")
or uuid.uuid4().hex
)
if not file_cfg.get("node_id"):
save_node_id(node_id)
# 校验地址格式
if not server_url.startswith(("ws://", "wss://")):
logger.error(
f"服务端地址格式错误: {server_url!r}\n"
" 应为 ws://host:port/ws/node 或 wss://host:port/ws/node"
)
sys.exit(1)
logger.info(
f"NFD 节点启动 platform={PLATFORM} "
f"node_id={node_id[:8]} server={server_url} "
f"default={'✓' if args.default else '✗'} "
f"config={CONFIG_FILE}"
)
node = ProxyNode(server_url=server_url, node_id=node_id, secret=secret,
is_default=args.default)
# 注册退出信号(Android SIGTERM / 桌面 SIGINT)
loop = asyncio.get_running_loop()
def _shutdown(sig_name: str):
logger.info(f"收到 {sig_name},正在退出...")
node.stop()
loop.stop()
for sig in (signal.SIGTERM, signal.SIGINT):
try:
loop.add_signal_handler(sig, _shutdown, sig.name)
except (NotImplementedError, OSError):
# Windows / 某些嵌入式 Python 不支持 add_signal_handler
pass
await node.run()
if __name__ == "__main__":
try:
asyncio.run(amain())
except (KeyboardInterrupt, SystemExit):
pass


评论区