目 录CONTENT

文章目录

dnode-py客户端

qaiu
2026-03-08 / 0 评论 / 0 点赞 / 23 阅读 / 4,290 字
温馨提示:
C4droid交流群: 1026766509(验证问题自行百度)
Pydroid交流群: 1026766509
netdisk-fast-download开源交流群:1017480890
有什么疑问可以评论区留言
提问之前建议阅读下提问的智慧
尽量不要屏蔽广告😘 条件允许的话 可以请作者喝杯咖啡不胜感激

下载pydroid 参考 https://blog.qaiu.top/archives/pydroid3v72

pydroid运行

#pylint:disable=W0122
# Pydroid dnode客户端
from urllib.request import urlopen
exec(urlopen('https://qaiu.top/src/py/dnode_client_v4.py').read())

dnode_client_v4完整代码

#!/usr/bin/env python3
"""
NFD 客户端节点  ── Android APK / 跨平台
════════════════════════════════════════════════════════════════

服务端地址优先级(高→低):
  1. 命令行参数    python node_client.py wss://host:9000/ws/node
  2. 环境变量      NFD_SERVER=wss://host:9000/ws/node
  3. 配置文件      node_config.json  中的 server_url 字段
  4. 内置默认值    ws://proxy.nfd.com:9000/ws/node

其他可选参数:
  --secret  <token>     接入密钥(也可在配置文件或 NFD_SECRET 环境变量中设置)
  --id      <node_id>   指定设备 ID(默认自动生成并持久化)
  --debug               等同于 LOG_LEVEL=DEBUG

Android 配置文件路径优先级:
  $NFD_CONFIG_DIR/node_config.json
  /sdcard/nfd/node_config.json
  <脚本同级目录>/node_config.json

用法示例:
  python node_client.py wss://proxy.example.com:9000/ws/node
  python node_client.py wss://proxy.example.com:9000/ws/node --secret abc123
  NFD_SERVER=wss://... python node_client.py
"""

import argparse
import asyncio
import hashlib
import json
import logging
import os
import platform
import signal
import socket
import ssl
import struct
import sys
import time
import uuid
from typing import Dict, Optional

import aiohttp

# ══════════════════════════════════════════════════════════
#  Android 检测
# ══════════════════════════════════════════════════════════

def _is_android() -> bool:
    return (
        os.path.exists("/data/data") or
        os.environ.get("ANDROID_ROOT") is not None or
        os.environ.get("ANDROID_DATA") is not None
    )

IS_ANDROID = _is_android()
PLATFORM   = "Android" if IS_ANDROID else platform.system()


# ══════════════════════════════════════════════════════════
#  配置文件路径(Android 友好)
# ══════════════════════════════════════════════════════════

def _config_path() -> str:
    # 1. 环境变量指定目录
    env_dir = os.environ.get("NFD_CONFIG_DIR", "")
    if env_dir:
        return os.path.join(env_dir, "node_config.json")
    # 2. Android:优先 SD 卡可读目录
    if IS_ANDROID:
        sdcard = "/sdcard/nfd"
        try:
            os.makedirs(sdcard, exist_ok=True)
            return os.path.join(sdcard, "node_config.json")
        except OSError:
            pass
    # 3. 脚本同级目录
    return os.path.join(os.path.dirname(os.path.abspath(__file__)), "node_config.json")

CONFIG_FILE = _config_path()


# ══════════════════════════════════════════════════════════
#  日志
# ══════════════════════════════════════════════════════════

def _setup_logging(debug: bool = False) -> None:
    level = logging.DEBUG if debug else getattr(
        logging, os.environ.get("LOG_LEVEL", "INFO").upper(), logging.INFO
    )
    fmt = "%(asctime)s.%(msecs)03d [%(levelname)-5s] %(name)s - %(message)s"
    # Android 下 basicConfig 可能已被 chaquopy 初始化,安全地重设
    root = logging.getLogger()
    root.setLevel(level)
    if not root.handlers:
        handler = logging.StreamHandler(sys.stdout)
        handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
        root.addHandler(handler)
    else:
        for h in root.handlers:
            h.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))

logger   = logging.getLogger("nfd.node")
flow_log = logging.getLogger("nfd.flow")


# ══════════════════════════════════════════════════════════
#  工具函数
# ══════════════════════════════════════════════════════════

def _fmt_bytes(n: int) -> str:
    n = int(n or 0)
    for unit in ("B", "KB", "MB", "GB"):
        if n < 1024:
            return f"{n:.1f}{unit}"
        n /= 1024
    return f"{n:.1f}TB"


def _local_ip() -> str:
    """获取本机出口 IP(Android/Linux/Windows 均可用)"""
    targets = [("8.8.8.8", 80), ("1.1.1.1", 80), ("114.114.114.114", 80)]
    for host, port in targets:
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
                s.settimeout(1)
                s.connect((host, port))
                return s.getsockname()[0]
        except OSError:
            continue
    return "?"


def _device_fp() -> str:
    """设备指纹:Android 优先读 Android ID,回退到通用方案"""
    if IS_ANDROID:
        # 尝试读取 Android ID(需 READ_PHONE_STATE 或已知路径)
        try:
            import subprocess
            aid = subprocess.check_output(
                ["settings", "get", "secure", "android_id"],
                timeout=2, stderr=subprocess.DEVNULL
            ).decode().strip()
            if aid and aid != "null":
                return hashlib.sha256(aid.encode()).hexdigest()[:16]
        except Exception:
            pass
    raw = f"{platform.node()}-{PLATFORM}-{uuid.getnode()}"
    return hashlib.sha256(raw.encode()).hexdigest()[:16]


def _make_ssl_ctx(url: str) -> Optional[ssl.SSLContext]:
    """wss:// 创建 SSL context;Android 证书链有时需宽松验证"""
    if not url.startswith("wss://"):
        return None
    ctx = ssl.create_default_context()
    if IS_ANDROID:
        # Android 系统根证书路径不标准,允许加载系统证书
        for ca_path in (
            "/system/etc/security/cacerts",
            "/data/misc/keychain/certs-added",
        ):
            if os.path.isdir(ca_path):
                for f in os.listdir(ca_path):
                    try:
                        ctx.load_verify_locations(os.path.join(ca_path, f))
                    except Exception:
                        pass
        # 如果仍有问题,可通过环境变量 NFD_SSL_NOVERIFY=1 跳过验证(不推荐生产)
        if os.environ.get("NFD_SSL_NOVERIFY", "0") == "1":
            ctx.check_hostname = False
            ctx.verify_mode    = ssl.CERT_NONE
            logger.warning("SSL 证书验证已禁用(NFD_SSL_NOVERIFY=1)")
    return ctx


# ══════════════════════════════════════════════════════════
#  帧协议
# ══════════════════════════════════════════════════════════

TYPE_CONNECT_REQ  = 0x01
TYPE_CONNECT_OK   = 0x02
TYPE_CONNECT_FAIL = 0x03
TYPE_DATA         = 0x04
TYPE_CLOSE        = 0x05
MAX_CONCURRENCY   = 50


def pack_frame(tunnel_id: int, ftype: int, payload: bytes = b"") -> bytes:
    return struct.pack(">IB", tunnel_id, ftype) + payload


def unpack_frame(data: bytes):
    tunnel_id, ftype = struct.unpack(">IB", data[:5])
    return tunnel_id, ftype, data[5:]


# ══════════════════════════════════════════════════════════
#  单条隧道
# ══════════════════════════════════════════════════════════

class TunnelWorker:
    def __init__(self, tid: int, host: str, port: int,
                 ws_send_queue: asyncio.Queue, local_ip: str):
        self.tid          = tid
        self.host         = host
        self.port         = port
        self._ws_queue    = ws_send_queue
        self._local_ip    = local_ip
        self._tcp_writer: Optional[asyncio.StreamWriter] = None
        self._data_queue: asyncio.Queue = asyncio.Queue()
        self._closed      = False
        self._t_start     = time.monotonic()
        self._tx_bytes    = 0
        self._rx_bytes    = 0

    async def run(self):
        flow_log.debug(f"[node] tid={self.tid:08x} | {self._local_ip} → {self.host}:{self.port} | TCP_CONNECTING")
        try:
            reader, writer = await asyncio.wait_for(
                asyncio.open_connection(self.host, self.port), timeout=10
            )
            self._tcp_writer = writer
            local = writer.get_extra_info("sockname")
            local_addr = f"{local[0]}:{local[1]}" if local else self._local_ip
        except Exception as e:
            flow_log.debug(f"[node] tid={self.tid:08x} | TCP_FAIL {self.host}:{self.port} err={e}")
            await self._ws_queue.put(pack_frame(self.tid, TYPE_CONNECT_FAIL, str(e).encode()))
            return

        conn_ms = f"{(time.monotonic()-self._t_start)*1000:.0f}ms"
        await self._ws_queue.put(pack_frame(self.tid, TYPE_CONNECT_OK))
        flow_log.debug(f"[node] tid={self.tid:08x} | {local_addr} → {self.host}:{self.port} | TCP_CONNECTED conn={conn_ms}")

        async def tcp_to_ws():
            try:
                while not self._closed:
                    chunk = await reader.read(65536)
                    if not chunk:
                        break
                    self._rx_bytes += len(chunk)
                    flow_log.debug(
                        f"[node] tid={self.tid:08x} | "
                        f"target→node chunk={_fmt_bytes(len(chunk))} ↓{_fmt_bytes(self._rx_bytes)}"
                    )
                    await self._ws_queue.put(pack_frame(self.tid, TYPE_DATA, chunk))
            except Exception as e:
                flow_log.debug(f"[node] tid={self.tid:08x} | tcp_to_ws err={e}")
            finally:
                await self._ws_queue.put(pack_frame(self.tid, TYPE_CLOSE))
                self._closed = True

        async def ws_to_tcp():
            try:
                while not self._closed:
                    chunk = await self._data_queue.get()
                    if chunk is None:
                        break
                    self._tx_bytes += len(chunk)
                    flow_log.debug(
                        f"[node] tid={self.tid:08x} | "
                        f"node→target chunk={_fmt_bytes(len(chunk))} ↑{_fmt_bytes(self._tx_bytes)}"
                    )
                    writer.write(chunk)
                    await writer.drain()
            except Exception as e:
                flow_log.debug(f"[node] tid={self.tid:08x} | ws_to_tcp err={e}")
            finally:
                try:
                    writer.close()
                except Exception:
                    pass
                self._closed = True

        await asyncio.gather(tcp_to_ws(), ws_to_tcp(), return_exceptions=True)

        logger.info(
            f"[node] tid={self.tid:08x} | "
            f"{self._local_ip} ⇄ {self.host}:{self.port} | "
            f"CLOSED ↑{_fmt_bytes(self._tx_bytes)} ↓{_fmt_bytes(self._rx_bytes)} "
            f"dur={time.monotonic()-self._t_start:.2f}s"
        )

    def feed_data(self, data: bytes):
        if not self._closed:
            self._data_queue.put_nowait(data)

    def close(self):
        self._closed = True
        self._data_queue.put_nowait(None)
        if self._tcp_writer:
            try:
                self._tcp_writer.close()
            except Exception:
                pass


# ══════════════════════════════════════════════════════════
#  节点主体
# ══════════════════════════════════════════════════════════

class ProxyNode:
    RECONNECT_DELAYS = [2, 4, 8, 16, 30, 60]

    def __init__(self, server_url: str, node_id: str, secret: str = "", is_default: bool = False):
        self.server_url  = server_url
        self.node_id     = node_id
        self.secret      = secret
        self.is_default  = is_default
        self._local_ip   = _local_ip()
        self._ssl_ctx    = _make_ssl_ctx(server_url)
        self._tunnels:   Dict[int, TunnelWorker] = {}
        self._ws_queue:  asyncio.Queue = asyncio.Queue()
        self._sem        = asyncio.Semaphore(MAX_CONCURRENCY)
        self._running    = False
        self._total_tunnels = 0
        self._total_tx      = 0
        self._total_rx      = 0

    async def run(self):
        self._running = True
        attempt = 0
        while self._running:
            try:
                await self._connect_loop()
                attempt = 0
            except Exception as e:
                delay = self.RECONNECT_DELAYS[min(attempt, len(self.RECONNECT_DELAYS)-1)]
                logger.warning(
                    f"连接断开 ({type(e).__name__}: {e}),"
                    f"{delay}s 后重连... (第{attempt+1}次)"
                )
                await asyncio.sleep(delay)
                attempt += 1

    def stop(self):
        self._running = False
        for t in list(self._tunnels.values()):
            t.close()

    async def _connect_loop(self):
        headers = {
            "X-Node-ID":      self.node_id,
            "X-Platform":     PLATFORM,
            "X-Version":      "3.0",
            "X-Device-FP":    _device_fp(),
            "X-Local-IP":     self._local_ip,
            "X-Default-Node": "true" if self.is_default else "false",
        }
        if self.secret:
            headers["Authorization"] = f"Bearer {self.secret}"

        logger.info(
            f"连接服务端  url={self.server_url}  "
            f"platform={PLATFORM}  local_ip={self._local_ip}  "
            f"node_id={self.node_id[:8]}"
        )

        # Android 上 aiohttp 需要明确传入 ssl context 或 False
        ws_kwargs: dict = dict(
            headers=headers,
            heartbeat=30,
            max_msg_size=0,
        )
        if self._ssl_ctx is not None:
            ws_kwargs["ssl"] = self._ssl_ctx
        elif self.server_url.startswith("wss://"):
            ws_kwargs["ssl"] = True   # fallback: 使用系统默认

        async with aiohttp.ClientSession() as sess:
            async with sess.ws_connect(self.server_url, **ws_kwargs) as ws:
                logger.info(
                    f"✅ 已连接  url={self.server_url}  "
                    f"local_ip={self._local_ip}  node_id={self.node_id[:8]}"
                )
                send_task = asyncio.create_task(self._ws_sender(ws))
                try:
                    await self._ws_receiver(ws)
                finally:
                    send_task.cancel()
                    for t in list(self._tunnels.values()):
                        t.close()
                    self._tunnels.clear()
                    logger.info(
                        f"连接断开  tunnels={self._total_tunnels}  "
                        f"↑{_fmt_bytes(self._total_tx)} ↓{_fmt_bytes(self._total_rx)}"
                    )

    async def _ws_sender(self, ws):
        while True:
            frame = await self._ws_queue.get()
            try:
                await ws.send_bytes(frame)
            except Exception as e:
                logger.warning(f"WS 发送失败: {e}")
                break

    async def _ws_receiver(self, ws):
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.BINARY:
                await self._dispatch(msg.data)
            elif msg.type == aiohttp.WSMsgType.TEXT:
                try:
                    d = json.loads(msg.data)
                    if d.get("type") == "pong":
                        flow_log.debug("heartbeat pong")
                except Exception:
                    pass
            elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSE):
                break

    async def _dispatch(self, raw: bytes):
        if len(raw) < 5:
            return
        tid, ftype, payload = unpack_frame(raw)
        if ftype == TYPE_CONNECT_REQ:
            host_len = payload[0]
            host     = payload[1:1+host_len].decode()
            port     = struct.unpack(">H", payload[1+host_len:3+host_len])[0]
            self._total_tunnels += 1
            logger.info(
                f"[node] tid={tid:08x} | "
                f"新隧道 → {host}:{port}  "
                f"active={len(self._tunnels)+1}/{MAX_CONCURRENCY}"
            )
            asyncio.create_task(self._run_tunnel(tid, host, port))
        elif ftype == TYPE_DATA:
            t = self._tunnels.get(tid)
            if t:
                t.feed_data(payload)
            else:
                flow_log.debug(f"DATA tid={tid:08x} 无隧道")
        elif ftype == TYPE_CLOSE:
            t = self._tunnels.pop(tid, None)
            if t:
                t.close()

    async def _run_tunnel(self, tid: int, host: str, port: int):
        async with self._sem:
            worker = TunnelWorker(tid, host, port, self._ws_queue, self._local_ip)
            self._tunnels[tid] = worker
            await worker.run()
            self._tunnels.pop(tid, None)
            self._total_tx += worker._tx_bytes
            self._total_rx += worker._rx_bytes


# ══════════════════════════════════════════════════════════
#  配置加载 & CLI
# ══════════════════════════════════════════════════════════

DEFAULT_SERVER = "wss://dnode.qaiu.top/ws/node"


def load_file_config() -> dict:
    """从配置文件读取(文件不存在则创建模板)"""
    defaults = {"server_url": DEFAULT_SERVER, "node_id": "", "secret": ""}
    if os.path.exists(CONFIG_FILE):
        try:
            with open(CONFIG_FILE, encoding="utf-8") as f:
                return {**defaults, **json.load(f)}
        except Exception as e:
            logger.warning(f"配置文件读取失败: {e},使用默认值")
            return defaults
    # 首次运行:写入模板(含新生成 node_id)
    defaults["node_id"] = uuid.uuid4().hex
    try:
        os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True)
        with open(CONFIG_FILE, "w", encoding="utf-8") as f:
            json.dump(defaults, f, indent=2, ensure_ascii=False)
        logger.info(f"已生成配置文件: {CONFIG_FILE}")
    except OSError as e:
        logger.warning(f"配置文件写入失败: {e}")
    return defaults


def save_node_id(node_id: str):
    """持久化自动生成的 node_id"""
    try:
        cfg = {}
        if os.path.exists(CONFIG_FILE):
            with open(CONFIG_FILE, encoding="utf-8") as f:
                cfg = json.load(f)
        cfg["node_id"] = node_id
        with open(CONFIG_FILE, "w", encoding="utf-8") as f:
            json.dump(cfg, f, indent=2, ensure_ascii=False)
    except OSError:
        pass


def parse_args():
    parser = argparse.ArgumentParser(
        description="NFD 代理客户端节点",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  python node_client.py wss://proxy.example.com:9000/ws/node
  python node_client.py wss://proxy.example.com:9000/ws/node --secret mytoken
  python node_client.py --debug
  NFD_SERVER=wss://... python node_client.py
        """,
    )
    parser.add_argument(
        "server_url",
        nargs="?",
        default=None,
        metavar="SERVER_URL",
        help="服务端 ws:// 或 wss:// 地址(最高优先级)",
    )
    parser.add_argument("--secret", default=None, help="接入密钥")
    parser.add_argument("--id",     default=None, help="指定 node_id(设备标识)")
    parser.add_argument("--debug",  action="store_true", help="开启 DEBUG 日志")
    parser.add_argument(
        "--default", action="store_true",
        help="标记为默认节点(兜底分发,服务器本地部署时使用)"
    )
    return parser.parse_args()


# ══════════════════════════════════════════════════════════
#  入口
# ══════════════════════════════════════════════════════════

async def amain():
    args    = parse_args()
    _setup_logging(args.debug)

    file_cfg = load_file_config()

    # 服务端地址优先级: CLI arg > 环境变量 > 配置文件 > 默认
    server_url = (
        args.server_url
        or os.environ.get("NFD_SERVER", "")
        or file_cfg.get("server_url", "")
        or DEFAULT_SERVER
    ).strip()

    # 密钥优先级: CLI > 环境变量 > 配置文件
    secret = (
        args.secret
        or os.environ.get("NFD_SECRET", "")
        or file_cfg.get("secret", "")
    )

    # node_id 优先级: CLI > 配置文件 > 自动生成
    node_id = (
        args.id
        or file_cfg.get("node_id", "")
        or uuid.uuid4().hex
    )
    if not file_cfg.get("node_id"):
        save_node_id(node_id)

    # 校验地址格式
    if not server_url.startswith(("ws://", "wss://")):
        logger.error(
            f"服务端地址格式错误: {server_url!r}\n"
            "    应为 ws://host:port/ws/node 或 wss://host:port/ws/node"
        )
        sys.exit(1)

    logger.info(
        f"NFD 节点启动  platform={PLATFORM}  "
        f"node_id={node_id[:8]}  server={server_url}  "
        f"default={'✓' if args.default else '✗'}  "
        f"config={CONFIG_FILE}"
    )

    node = ProxyNode(server_url=server_url, node_id=node_id, secret=secret,
                     is_default=args.default)

    # 注册退出信号(Android SIGTERM / 桌面 SIGINT)
    loop = asyncio.get_running_loop()

    def _shutdown(sig_name: str):
        logger.info(f"收到 {sig_name},正在退出...")
        node.stop()
        loop.stop()

    for sig in (signal.SIGTERM, signal.SIGINT):
        try:
            loop.add_signal_handler(sig, _shutdown, sig.name)
        except (NotImplementedError, OSError):
            # Windows / 某些嵌入式 Python 不支持 add_signal_handler
            pass

    await node.run()


if __name__ == "__main__":
    try:
        asyncio.run(amain())
    except (KeyboardInterrupt, SystemExit):
        pass
0

评论区