下载pydroid 参考 https://blog.qaiu.top/archives/pydroid3v72
pydroid运行
#pylint:disable=W0122
# Pydroid dnode客户端
from urllib.request import urlopen
exec(urlopen('https://qaiu.top/src/py/dnode_client_v4.py').read())
dnode_client_v4完整代码
#!/usr/bin/env python3
"""
NFD 客户端节点 ── Android APK / 跨平台
════════════════════════════════════════════════════════════════
服务端地址优先级(高→低):
1. 命令行参数 python node_client.py wss://host:9000/ws/node
2. 环境变量 NFD_SERVER=wss://host:9000/ws/node
3. 配置文件 node_config.json 中的 server_url 字段
4. 内置默认值 ws://proxy.nfd.com:9000/ws/node
其他可选参数:
--secret <token> 接入密钥(也可在配置文件或 NFD_SECRET 环境变量中设置)
--id <node_id> 指定设备 ID(默认自动生成并持久化)
--debug 等同于 LOG_LEVEL=DEBUG
Android 配置文件路径优先级:
$NFD_CONFIG_DIR/node_config.json
/sdcard/nfd/node_config.json
<脚本同级目录>/node_config.json
用法示例:
python node_client.py wss://proxy.example.com:9000/ws/node
python node_client.py wss://proxy.example.com:9000/ws/node --secret abc123
NFD_SERVER=wss://... python node_client.py
"""
import argparse
import asyncio
import hashlib
import json
import logging
import os
import platform
import signal
import socket
import ssl
import struct
import sys
import time
import uuid
from typing import Dict, Optional
import aiohttp
# ══════════════════════════════════════════════════════════
# Android 检测
# ══════════════════════════════════════════════════════════
def _is_android() -> bool:
return (
os.path.exists("/data/data") or
os.environ.get("ANDROID_ROOT") is not None or
os.environ.get("ANDROID_DATA") is not None
)
IS_ANDROID = _is_android()
PLATFORM = "Android" if IS_ANDROID else platform.system()
# ══════════════════════════════════════════════════════════
# 配置文件路径(Android 友好)
# ══════════════════════════════════════════════════════════
def _config_path() -> str:
# 1. 环境变量指定目录
env_dir = os.environ.get("NFD_CONFIG_DIR", "")
if env_dir:
return os.path.join(env_dir, "node_config.json")
# 2. Android:优先 SD 卡可读目录
if IS_ANDROID:
sdcard = "/sdcard/nfd"
try:
os.makedirs(sdcard, exist_ok=True)
return os.path.join(sdcard, "node_config.json")
except OSError:
pass
# 3. 脚本同级目录
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "node_config.json")
CONFIG_FILE = _config_path()
# ══════════════════════════════════════════════════════════
# 日志
# ══════════════════════════════════════════════════════════
def _setup_logging(debug: bool = False) -> None:
level = logging.DEBUG if debug else getattr(
logging, os.environ.get("LOG_LEVEL", "INFO").upper(), logging.INFO
)
fmt = "%(asctime)s.%(msecs)03d [%(levelname)-5s] %(name)s - %(message)s"
# Android 下 basicConfig 可能已被 chaquopy 初始化,安全地重设
root = logging.getLogger()
root.setLevel(level)
if not root.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
root.addHandler(handler)
else:
for h in root.handlers:
h.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
logger = logging.getLogger("nfd.node")
flow_log = logging.getLogger("nfd.flow")
# ══════════════════════════════════════════════════════════
# 工具函数
# ══════════════════════════════════════════════════════════
def _fmt_bytes(n: int) -> str:
n = int(n or 0)
for unit in ("B", "KB", "MB", "GB"):
if n < 1024:
return f"{n:.1f}{unit}"
n /= 1024
return f"{n:.1f}TB"
def _local_ip() -> str:
"""获取本机出口 IP(Android/Linux/Windows 均可用)"""
targets = [("8.8.8.8", 80), ("1.1.1.1", 80), ("114.114.114.114", 80)]
for host, port in targets:
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.settimeout(1)
s.connect((host, port))
return s.getsockname()[0]
except OSError:
continue
return "?"
def _device_fp() -> str:
"""设备指纹:Android 优先读 Android ID,回退到通用方案"""
if IS_ANDROID:
# 尝试读取 Android ID(需 READ_PHONE_STATE 或已知路径)
try:
import subprocess
aid = subprocess.check_output(
["settings", "get", "secure", "android_id"],
timeout=2, stderr=subprocess.DEVNULL
).decode().strip()
if aid and aid != "null":
return hashlib.sha256(aid.encode()).hexdigest()[:16]
except Exception:
pass
raw = f"{platform.node()}-{PLATFORM}-{uuid.getnode()}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def _make_ssl_ctx(url: str) -> Optional[ssl.SSLContext]:
"""wss:// 创建 SSL context;Android 证书链有时需宽松验证"""
if not url.startswith("wss://"):
return None
ctx = ssl.create_default_context()
if IS_ANDROID:
# Android 系统根证书路径不标准,允许加载系统证书
for ca_path in (
"/system/etc/security/cacerts",
"/data/misc/keychain/certs-added",
):
if os.path.isdir(ca_path):
for f in os.listdir(ca_path):
try:
ctx.load_verify_locations(os.path.join(ca_path, f))
except Exception:
pass
# 如果仍有问题,可通过环境变量 NFD_SSL_NOVERIFY=1 跳过验证(不推荐生产)
if os.environ.get("NFD_SSL_NOVERIFY", "0") == "1":
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
logger.warning("SSL 证书验证已禁用(NFD_SSL_NOVERIFY=1)")
return ctx
# ══════════════════════════════════════════════════════════
# 帧协议
# ══════════════════════════════════════════════════════════
TYPE_CONNECT_REQ = 0x01
TYPE_CONNECT_OK = 0x02
TYPE_CONNECT_FAIL = 0x03
TYPE_DATA = 0x04
TYPE_CLOSE = 0x05
MAX_CONCURRENCY = 50
def pack_frame(tunnel_id: int, ftype: int, payload: bytes = b"") -> bytes:
return struct.pack(">IB", tunnel_id, ftype) + payload
def unpack_frame(data: bytes):
tunnel_id, ftype = struct.unpack(">IB", data[:5])
return tunnel_id, ftype, data[5:]
# ══════════════════════════════════════════════════════════
# 单条隧道
# ══════════════════════════════════════════════════════════
class TunnelWorker:
def __init__(self, tid: int, host: str, port: int,
ws_send_queue: asyncio.Queue, local_ip: str):
self.tid = tid
self.host = host
self.port = port
self._ws_queue = ws_send_queue
self._local_ip = local_ip
self._tcp_writer: Optional[asyncio.StreamWriter] = None
self._data_queue: asyncio.Queue = asyncio.Queue()
self._closed = False
self._t_start = time.monotonic()
self._tx_bytes = 0
self._rx_bytes = 0
async def run(self):
flow_log.debug(f"[node] tid={self.tid:08x} | {self._local_ip} → {self.host}:{self.port} | TCP_CONNECTING")
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port), timeout=10
)
self._tcp_writer = writer
local = writer.get_extra_info("sockname")
local_addr = f"{local[0]}:{local[1]}" if local else self._local_ip
except Exception as e:
flow_log.debug(f"[node] tid={self.tid:08x} | TCP_FAIL {self.host}:{self.port} err={e}")
await self._ws_queue.put(pack_frame(self.tid, TYPE_CONNECT_FAIL, str(e).encode()))
return
conn_ms = f"{(time.monotonic()-self._t_start)*1000:.0f}ms"
await self._ws_queue.put(pack_frame(self.tid, TYPE_CONNECT_OK))
flow_log.debug(f"[node] tid={self.tid:08x} | {local_addr} → {self.host}:{self.port} | TCP_CONNECTED conn={conn_ms}")
async def tcp_to_ws():
try:
while not self._closed:
chunk = await reader.read(65536)
if not chunk:
break
self._rx_bytes += len(chunk)
flow_log.debug(
f"[node] tid={self.tid:08x} | "
f"target→node chunk={_fmt_bytes(len(chunk))} ↓{_fmt_bytes(self._rx_bytes)}"
)
await self._ws_queue.put(pack_frame(self.tid, TYPE_DATA, chunk))
except Exception as e:
flow_log.debug(f"[node] tid={self.tid:08x} | tcp_to_ws err={e}")
finally:
await self._ws_queue.put(pack_frame(self.tid, TYPE_CLOSE))
self._closed = True
async def ws_to_tcp():
try:
while not self._closed:
chunk = await self._data_queue.get()
if chunk is None:
break
self._tx_bytes += len(chunk)
flow_log.debug(
f"[node] tid={self.tid:08x} | "
f"node→target chunk={_fmt_bytes(len(chunk))} ↑{_fmt_bytes(self._tx_bytes)}"
)
writer.write(chunk)
await writer.drain()
except Exception as e:
flow_log.debug(f"[node] tid={self.tid:08x} | ws_to_tcp err={e}")
finally:
try:
writer.close()
except Exception:
pass
self._closed = True
await asyncio.gather(tcp_to_ws(), ws_to_tcp(), return_exceptions=True)
logger.info(
f"[node] tid={self.tid:08x} | "
f"{self._local_ip} ⇄ {self.host}:{self.port} | "
f"CLOSED ↑{_fmt_bytes(self._tx_bytes)} ↓{_fmt_bytes(self._rx_bytes)} "
f"dur={time.monotonic()-self._t_start:.2f}s"
)
def feed_data(self, data: bytes):
if not self._closed:
self._data_queue.put_nowait(data)
def close(self):
self._closed = True
self._data_queue.put_nowait(None)
if self._tcp_writer:
try:
self._tcp_writer.close()
except Exception:
pass
# ══════════════════════════════════════════════════════════
# 节点主体
# ══════════════════════════════════════════════════════════
class ProxyNode:
RECONNECT_DELAYS = [2, 4, 8, 16, 30, 60]
def __init__(self, server_url: str, node_id: str, secret: str = "", is_default: bool = False):
self.server_url = server_url
self.node_id = node_id
self.secret = secret
self.is_default = is_default
self._local_ip = _local_ip()
self._ssl_ctx = _make_ssl_ctx(server_url)
self._tunnels: Dict[int, TunnelWorker] = {}
self._ws_queue: asyncio.Queue = asyncio.Queue()
self._sem = asyncio.Semaphore(MAX_CONCURRENCY)
self._running = False
self._total_tunnels = 0
self._total_tx = 0
self._total_rx = 0
async def run(self):
self._running = True
attempt = 0
while self._running:
try:
await self._connect_loop()
attempt = 0
except Exception as e:
delay = self.RECONNECT_DELAYS[min(attempt, len(self.RECONNECT_DELAYS)-1)]
logger.warning(
f"连接断开 ({type(e).__name__}: {e}),"
f"{delay}s 后重连... (第{attempt+1}次)"
)
await asyncio.sleep(delay)
attempt += 1
def stop(self):
self._running = False
for t in list(self._tunnels.values()):
t.close()
async def _connect_loop(self):
headers = {
"X-Node-ID": self.node_id,
"X-Platform": PLATFORM,
"X-Version": "3.0",
"X-Device-FP": _device_fp(),
"X-Local-IP": self._local_ip,
"X-Default-Node": "true" if self.is_default else "false",
}
if self.secret:
headers["Authorization"] = f"Bearer {self.secret}"
logger.info(
f"连接服务端 url={self.server_url} "
f"platform={PLATFORM} local_ip={self._local_ip} "
f"node_id={self.node_id[:8]}"
)
# Android 上 aiohttp 需要明确传入 ssl context 或 False
ws_kwargs: dict = dict(
headers=headers,
heartbeat=30,
max_msg_size=0,
)
if self._ssl_ctx is not None:
ws_kwargs["ssl"] = self._ssl_ctx
elif self.server_url.startswith("wss://"):
ws_kwargs["ssl"] = True # fallback: 使用系统默认
async with aiohttp.ClientSession() as sess:
async with sess.ws_connect(self.server_url, **ws_kwargs) as ws:
logger.info(
f"✅ 已连接 url={self.server_url} "
f"local_ip={self._local_ip} node_id={self.node_id[:8]}"
)
send_task = asyncio.create_task(self._ws_sender(ws))
try:
await self._ws_receiver(ws)
finally:
send_task.cancel()
for t in list(self._tunnels.values()):
t.close()
self._tunnels.clear()
logger.info(
f"连接断开 tunnels={self._total_tunnels} "
f"↑{_fmt_bytes(self._total_tx)} ↓{_fmt_bytes(self._total_rx)}"
)
async def _ws_sender(self, ws):
while True:
frame = await self._ws_queue.get()
try:
await ws.send_bytes(frame)
except Exception as e:
logger.warning(f"WS 发送失败: {e}")
break
async def _ws_receiver(self, ws):
async for msg in ws:
if msg.type == aiohttp.WSMsgType.BINARY:
await self._dispatch(msg.data)
elif msg.type == aiohttp.WSMsgType.TEXT:
try:
d = json.loads(msg.data)
if d.get("type") == "pong":
flow_log.debug("heartbeat pong")
except Exception:
pass
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSE):
break
async def _dispatch(self, raw: bytes):
if len(raw) < 5:
return
tid, ftype, payload = unpack_frame(raw)
if ftype == TYPE_CONNECT_REQ:
host_len = payload[0]
host = payload[1:1+host_len].decode()
port = struct.unpack(">H", payload[1+host_len:3+host_len])[0]
self._total_tunnels += 1
logger.info(
f"[node] tid={tid:08x} | "
f"新隧道 → {host}:{port} "
f"active={len(self._tunnels)+1}/{MAX_CONCURRENCY}"
)
asyncio.create_task(self._run_tunnel(tid, host, port))
elif ftype == TYPE_DATA:
t = self._tunnels.get(tid)
if t:
t.feed_data(payload)
else:
flow_log.debug(f"DATA tid={tid:08x} 无隧道")
elif ftype == TYPE_CLOSE:
t = self._tunnels.pop(tid, None)
if t:
t.close()
async def _run_tunnel(self, tid: int, host: str, port: int):
async with self._sem:
worker = TunnelWorker(tid, host, port, self._ws_queue, self._local_ip)
self._tunnels[tid] = worker
await worker.run()
self._tunnels.pop(tid, None)
self._total_tx += worker._tx_bytes
self._total_rx += worker._rx_bytes
# ══════════════════════════════════════════════════════════
# 配置加载 & CLI
# ══════════════════════════════════════════════════════════
DEFAULT_SERVER = "wss://dnode.qaiu.top/ws/node"
def load_file_config() -> dict:
"""从配置文件读取(文件不存在则创建模板)"""
defaults = {"server_url": DEFAULT_SERVER, "node_id": "", "secret": ""}
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, encoding="utf-8") as f:
return {**defaults, **json.load(f)}
except Exception as e:
logger.warning(f"配置文件读取失败: {e},使用默认值")
return defaults
# 首次运行:写入模板(含新生成 node_id)
defaults["node_id"] = uuid.uuid4().hex
try:
os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True)
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(defaults, f, indent=2, ensure_ascii=False)
logger.info(f"已生成配置文件: {CONFIG_FILE}")
except OSError as e:
logger.warning(f"配置文件写入失败: {e}")
return defaults
def save_node_id(node_id: str):
"""持久化自动生成的 node_id"""
try:
cfg = {}
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, encoding="utf-8") as f:
cfg = json.load(f)
cfg["node_id"] = node_id
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
except OSError:
pass
def parse_args():
parser = argparse.ArgumentParser(
description="NFD 代理客户端节点",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python node_client.py wss://proxy.example.com:9000/ws/node
python node_client.py wss://proxy.example.com:9000/ws/node --secret mytoken
python node_client.py --debug
NFD_SERVER=wss://... python node_client.py
""",
)
parser.add_argument(
"server_url",
nargs="?",
default=None,
metavar="SERVER_URL",
help="服务端 ws:// 或 wss:// 地址(最高优先级)",
)
parser.add_argument("--secret", default=None, help="接入密钥")
parser.add_argument("--id", default=None, help="指定 node_id(设备标识)")
parser.add_argument("--debug", action="store_true", help="开启 DEBUG 日志")
parser.add_argument(
"--default", action="store_true",
help="标记为默认节点(兜底分发,服务器本地部署时使用)"
)
return parser.parse_args()
# ══════════════════════════════════════════════════════════
# 入口
# ══════════════════════════════════════════════════════════
async def amain():
args = parse_args()
_setup_logging(args.debug)
file_cfg = load_file_config()
# 服务端地址优先级: CLI arg > 环境变量 > 配置文件 > 默认
server_url = (
args.server_url
or os.environ.get("NFD_SERVER", "")
or file_cfg.get("server_url", "")
or DEFAULT_SERVER
).strip()
# 密钥优先级: CLI > 环境变量 > 配置文件
secret = (
args.secret
or os.environ.get("NFD_SECRET", "")
or file_cfg.get("secret", "")
)
# node_id 优先级: CLI > 配置文件 > 自动生成
node_id = (
args.id
or file_cfg.get("node_id", "")
or uuid.uuid4().hex
)
if not file_cfg.get("node_id"):
save_node_id(node_id)
# 校验地址格式
if not server_url.startswith(("ws://", "wss://")):
logger.error(
f"服务端地址格式错误: {server_url!r}\n"
" 应为 ws://host:port/ws/node 或 wss://host:port/ws/node"
)
sys.exit(1)
logger.info(
f"NFD 节点启动 platform={PLATFORM} "
f"node_id={node_id[:8]} server={server_url} "
f"default={'✓' if args.default else '✗'} "
f"config={CONFIG_FILE}"
)
node = ProxyNode(server_url=server_url, node_id=node_id, secret=secret,
is_default=args.default)
# 注册退出信号(Android SIGTERM / 桌面 SIGINT)
loop = asyncio.get_running_loop()
def _shutdown(sig_name: str):
logger.info(f"收到 {sig_name},正在退出...")
node.stop()
loop.stop()
for sig in (signal.SIGTERM, signal.SIGINT):
try:
loop.add_signal_handler(sig, _shutdown, sig.name)
except (NotImplementedError, OSError):
# Windows / 某些嵌入式 Python 不支持 add_signal_handler
pass
await node.run()
if __name__ == "__main__":
try:
asyncio.run(amain())
except (KeyboardInterrupt, SystemExit):
pass


评论区