#!/usr/bin/env python3 """Compose-facing web server for MobileModels static pages and maintenance APIs.""" from __future__ import annotations import argparse import json import os import re import subprocess import threading import time from datetime import datetime, timedelta from http import HTTPStatus from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from project_layout import PROJECT_ROOT, WORKSPACE_ROOT from sync_upstream_mobilemodels import DEFAULT_BRANCH, DEFAULT_REPO_URL SYNC_SCRIPT = PROJECT_ROOT / "tools/sync_upstream_mobilemodels.py" INDEX_PATH = PROJECT_ROOT / "dist/device_index.json" MYSQL_SEED_PATH = PROJECT_ROOT / "dist/mobilemodels_mysql_seed.sql" MYSQL_LOADER = PROJECT_ROOT / "tools/load_mysql_seed.py" DATA_ROOT = Path(os.environ.get("MOBILEMODELS_DATA_ROOT", "/data")) SYNC_METADATA_PATH = DATA_ROOT / "state/sync_status.json" SCHEDULE_CONFIG_PATH = DATA_ROOT / "state/sync_schedule.json" SYNC_LOCK = threading.Lock() SCHEDULE_LOCK = threading.Lock() NORMALIZE_RE = re.compile(r"[^0-9a-z\u4e00-\u9fff]+") SCHEDULE_TIME_RE = re.compile(r"^(?:[01]?\d|2[0-3]):[0-5]\d$") SCHEDULER_POLL_SECONDS = 20 def truthy_env(name: str, default: str = "0") -> bool: return os.environ.get(name, default).strip().lower() in {"1", "true", "yes", "on"} def apply_timezone_from_env() -> None: if not os.environ.get("TZ"): return try: time.tzset() except AttributeError: return def mysql_auto_load_enabled() -> bool: return truthy_env("MYSQL_AUTO_LOAD", "0") def local_now() -> datetime: return datetime.now().astimezone() def normalize_schedule_time(value: str | None, *, fallback: str = "03:00") -> str: text = str(value or "").strip() if not text: text = fallback if not SCHEDULE_TIME_RE.match(text): if fallback and text != fallback: return normalize_schedule_time(fallback, fallback="") raise RuntimeError("每日同步时间格式必须为 HH:MM,例如 03:00。") hour, minute = text.split(":", 1) return f"{int(hour):02d}:{int(minute):02d}" def normalize_github_proxy_prefix(value: str | None) -> str: text = str(value or "").strip() if not text: return "" if "://" not in text: raise RuntimeError("GitHub 加速前缀必须包含协议,例如 https://ghfast.top/") if not text.endswith("/"): text = f"{text}/" return text def get_effective_repo_url(github_proxy_prefix: str | None = None) -> str: prefix = normalize_github_proxy_prefix( github_proxy_prefix if github_proxy_prefix is not None else os.environ.get("GITHUB_PROXY_PREFIX", "") ) return f"{prefix}{DEFAULT_REPO_URL}" if prefix else DEFAULT_REPO_URL def compute_next_run_at(daily_time: str, now: datetime | None = None) -> str: current = now or local_now() hour_text, minute_text = daily_time.split(":", 1) candidate = current.replace( hour=int(hour_text), minute=int(minute_text), second=0, microsecond=0, ) if candidate <= current: candidate += timedelta(days=1) return candidate.isoformat(timespec="seconds") def default_schedule_config() -> dict[str, object]: enabled = truthy_env("SYNC_SCHEDULE_ENABLED", "0") daily_time = normalize_schedule_time(os.environ.get("SYNC_SCHEDULE_TIME", "03:00")) timezone_name = os.environ.get("TZ", "UTC").strip() or "UTC" github_proxy_prefix = normalize_github_proxy_prefix(os.environ.get("GITHUB_PROXY_PREFIX", "")) return { "enabled": enabled, "daily_time": daily_time, "timezone": timezone_name, "github_proxy_prefix": github_proxy_prefix, "next_run_at": compute_next_run_at(daily_time) if enabled else None, "last_run_time": None, "last_run_status": None, "last_run_message": None, "updated_at": None, } def normalize_schedule_config(raw: dict[str, object] | None) -> dict[str, object]: config = default_schedule_config() if isinstance(raw, dict): if "enabled" in raw: value = raw.get("enabled") config["enabled"] = value if isinstance(value, bool) else str(value).strip().lower() in {"1", "true", "yes", "on"} if "daily_time" in raw: try: config["daily_time"] = normalize_schedule_time(str(raw.get("daily_time") or "")) except RuntimeError: config["daily_time"] = normalize_schedule_time(os.environ.get("SYNC_SCHEDULE_TIME", "03:00")) if raw.get("timezone"): config["timezone"] = str(raw.get("timezone")).strip() or config["timezone"] if "github_proxy_prefix" in raw: try: config["github_proxy_prefix"] = normalize_github_proxy_prefix(str(raw.get("github_proxy_prefix") or "")) except RuntimeError: config["github_proxy_prefix"] = normalize_github_proxy_prefix(os.environ.get("GITHUB_PROXY_PREFIX", "")) for key in ("last_run_time", "last_run_status", "last_run_message", "updated_at"): if raw.get(key) is not None: config[key] = raw.get(key) next_run_at = raw.get("next_run_at") if config["enabled"] and isinstance(next_run_at, str) and next_run_at.strip(): try: datetime.fromisoformat(next_run_at) config["next_run_at"] = next_run_at except ValueError: config["next_run_at"] = compute_next_run_at(str(config["daily_time"])) else: config["next_run_at"] = compute_next_run_at(str(config["daily_time"])) if config["enabled"] else None return config def read_schedule_config() -> dict[str, object]: with SCHEDULE_LOCK: if not SCHEDULE_CONFIG_PATH.exists(): return normalize_schedule_config(None) try: payload = json.loads(SCHEDULE_CONFIG_PATH.read_text(encoding="utf-8")) except Exception: return normalize_schedule_config(None) return normalize_schedule_config(payload if isinstance(payload, dict) else None) def write_schedule_config(payload: dict[str, object]) -> dict[str, object]: normalized = normalize_schedule_config(payload) with SCHEDULE_LOCK: SCHEDULE_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) SCHEDULE_CONFIG_PATH.write_text( json.dumps(normalized, ensure_ascii=False, indent=2), encoding="utf-8", ) return normalized def update_schedule_config(payload: dict[str, object]) -> dict[str, object]: current = read_schedule_config() enabled_raw = payload.get("enabled", current.get("enabled", False)) enabled = enabled_raw if isinstance(enabled_raw, bool) else str(enabled_raw).strip().lower() in {"1", "true", "yes", "on"} daily_time = normalize_schedule_time(str(payload.get("daily_time") or current.get("daily_time") or "03:00")) github_proxy_prefix = normalize_github_proxy_prefix( str(payload.get("github_proxy_prefix") if "github_proxy_prefix" in payload else current.get("github_proxy_prefix") or "") ) updated = { **current, "enabled": enabled, "daily_time": daily_time, "timezone": os.environ.get("TZ", "UTC").strip() or "UTC", "github_proxy_prefix": github_proxy_prefix, "next_run_at": compute_next_run_at(daily_time) if enabled else None, "updated_at": local_now().isoformat(timespec="seconds"), } return write_schedule_config(updated) def mark_schedule_run(status: str, message: str) -> dict[str, object]: current = read_schedule_config() updated = { **current, "last_run_time": local_now().isoformat(timespec="seconds"), "last_run_status": status, "last_run_message": message, "next_run_at": compute_next_run_at(str(current.get("daily_time") or "03:00")) if current.get("enabled") else None, "updated_at": local_now().isoformat(timespec="seconds"), } return write_schedule_config(updated) def run_command(args: list[str]) -> subprocess.CompletedProcess[str]: return subprocess.run( args, cwd=PROJECT_ROOT, text=True, capture_output=True, check=False, ) def normalize_text(text: str) -> str: return NORMALIZE_RE.sub("", (text or "").lower()) def sql_string(value: str) -> str: return (value or "").replace("\\", "\\\\").replace("'", "''") def mysql_command(database: str | None = None) -> list[str]: command = [ "mysql", f"--host={os.environ.get('MYSQL_HOST', 'mysql')}", f"--port={os.environ.get('MYSQL_PORT', '3306')}", f"--user={os.environ.get('MYSQL_READER_USER', '')}", "--protocol=TCP", "--default-character-set=utf8mb4", "--batch", "--raw", ] if database: command.append(database) return command def mysql_env() -> dict[str, str]: env = os.environ.copy() env["MYSQL_PWD"] = os.environ.get("MYSQL_READER_PASSWORD", "") return env def run_mysql_query(sql: str, database: str | None = None) -> list[dict[str, str | None]]: proc = subprocess.run( mysql_command(database=database), env=mysql_env(), input=sql, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if proc.returncode != 0: message = proc.stderr.strip() or proc.stdout.strip() or f"mysql exited with {proc.returncode}" raise RuntimeError(message) lines = [line for line in proc.stdout.splitlines() if line.strip()] if not lines: return [] headers = lines[0].split("\t") rows: list[dict[str, str | None]] = [] for line in lines[1:]: values = line.split("\t") row = {} for idx, header in enumerate(headers): value = values[idx] if idx < len(values) else "" row[header] = None if value == "NULL" else value rows.append(row) return rows def build_sql_query_payload(payload: dict[str, object]) -> dict[str, object]: raw_value = str(payload.get("model_raw") or payload.get("model") or "").strip() if not raw_value: raise RuntimeError("请填写设备标识。") alias_norm = normalize_text(raw_value) if not alias_norm: raise RuntimeError("设备标识无法归一化,请检查输入。") limit_value = payload.get("limit", 20) try: limit = int(limit_value) except Exception as err: raise RuntimeError("limit 必须是数字。") from err limit = max(1, min(limit, 100)) sql = f""" SELECT model, record_id, alias_norm, device_name, brand, manufacturer_brand, parent_brand, market_brand, device_type, source_file, section, source_rank, source_weight, code, code_alias, ver_name FROM mobilemodels.mm_device_catalog WHERE alias_norm = '{sql_string(alias_norm)}' ORDER BY source_rank ASC, record_id ASC LIMIT {limit}; """.strip() rows = run_mysql_query(sql) return { "query_mode": "sql", "model_raw": raw_value, "alias_norm": alias_norm, "limit": limit, "sql": sql, "rows": rows, "row_count": len(rows), } def read_sync_metadata() -> dict[str, object]: if not SYNC_METADATA_PATH.exists(): return {} try: return json.loads(SYNC_METADATA_PATH.read_text(encoding="utf-8")) except Exception: return {} def write_sync_metadata(payload: dict[str, object]) -> None: SYNC_METADATA_PATH.parent.mkdir(parents=True, exist_ok=True) SYNC_METADATA_PATH.write_text( json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8", ) def get_status_payload() -> dict[str, object]: index_mtime = None mysql_seed_mtime = None if INDEX_PATH.exists(): index_mtime = datetime.fromtimestamp(INDEX_PATH.stat().st_mtime).isoformat(timespec="seconds") if MYSQL_SEED_PATH.exists(): mysql_seed_mtime = datetime.fromtimestamp(MYSQL_SEED_PATH.stat().st_mtime).isoformat(timespec="seconds") mysql_host = os.environ.get("MYSQL_HOST", "mysql") mysql_port = os.environ.get("MYSQL_PORT", "3306") mysql_database = os.environ.get("MYSQL_DATABASE", "mobilemodels") mysql_reader_user = os.environ.get("MYSQL_READER_USER", "") mysql_reader_password = os.environ.get("MYSQL_READER_PASSWORD", "") mysql_auto_load = mysql_auto_load_enabled() mysql_ready = False mysql_status = "" sync_metadata = read_sync_metadata() schedule_config = read_schedule_config() github_proxy_prefix = str(schedule_config.get("github_proxy_prefix") or "") effective_repo_url = get_effective_repo_url(github_proxy_prefix) if mysql_auto_load: mysql_proc = run_command(["python3", str(MYSQL_LOADER), "--check-only", "--wait-timeout", "5"]) if mysql_proc.returncode == 0: mysql_ready = True mysql_status = mysql_proc.stdout.strip() or "MySQL ready" else: mysql_status = mysql_proc.stderr.strip() or mysql_proc.stdout.strip() or "MySQL unavailable" else: mysql_status = "MySQL auto load disabled" return { "supports_upstream_sync": True, "storage_mode": "docker_volume", "project_root": str(PROJECT_ROOT), "workspace_root": str(WORKSPACE_ROOT), "data_root": str(DATA_ROOT), "mysql_auto_load": mysql_auto_load, "upstream_repo_url": DEFAULT_REPO_URL, "effective_upstream_repo_url": effective_repo_url, "upstream_branch": DEFAULT_BRANCH, "last_sync_time": sync_metadata.get("last_sync_time"), "last_upstream_commit": sync_metadata.get("last_upstream_commit"), "last_sync_trigger": sync_metadata.get("last_trigger_source"), "index_file": str(INDEX_PATH.relative_to(PROJECT_ROOT)), "index_mtime": index_mtime, "mysql_seed_file": str(MYSQL_SEED_PATH.relative_to(PROJECT_ROOT)), "mysql_seed_mtime": mysql_seed_mtime, "sync_schedule_file": str(SCHEDULE_CONFIG_PATH.relative_to(DATA_ROOT)), "sync_schedule_enabled": schedule_config.get("enabled"), "sync_schedule_time": schedule_config.get("daily_time"), "sync_schedule_timezone": schedule_config.get("timezone"), "github_proxy_prefix": github_proxy_prefix, "sync_schedule_next_run": schedule_config.get("next_run_at"), "sync_schedule_last_run_time": schedule_config.get("last_run_time"), "sync_schedule_last_run_status": schedule_config.get("last_run_status"), "sync_schedule_last_run_message": schedule_config.get("last_run_message"), "mysql_host": mysql_host, "mysql_port": mysql_port, "mysql_database": mysql_database, "mysql_reader_user": mysql_reader_user, "mysql_reader_password": mysql_reader_password, "mysql_ready": mysql_ready, "mysql_status": mysql_status, } def run_upstream_sync(trigger_source: str = "manual") -> dict[str, object]: if not SYNC_LOCK.acquire(blocking=False): raise RuntimeError("已有同步任务在执行,请稍后再试。") try: schedule_config = read_schedule_config() github_proxy_prefix = str(schedule_config.get("github_proxy_prefix") or "") effective_repo_url = get_effective_repo_url(github_proxy_prefix) upstream_proc = run_command( ["git", "ls-remote", effective_repo_url, f"refs/heads/{DEFAULT_BRANCH}"] ) upstream_commit = "" if upstream_proc.returncode == 0 and upstream_proc.stdout.strip(): upstream_commit = upstream_proc.stdout.split()[0] command = [ "python3", str(SYNC_SCRIPT), f"--repo-url={effective_repo_url}", "--build-index", "--export-mysql-seed", ] if mysql_auto_load_enabled(): command.append("--load-mysql") proc = run_command(command) output = "\n".join( part for part in [proc.stdout.strip(), proc.stderr.strip()] if part ).strip() if proc.returncode != 0: raise RuntimeError(output or f"sync script failed with exit code {proc.returncode}") payload = { "storage_mode": "docker_volume", "project_root": str(PROJECT_ROOT), "workspace_root": str(WORKSPACE_ROOT), "data_root": str(DATA_ROOT), "upstream_repo_url": DEFAULT_REPO_URL, "effective_upstream_repo_url": effective_repo_url, "github_proxy_prefix": github_proxy_prefix, "upstream_branch": DEFAULT_BRANCH, "upstream_commit": upstream_commit, "trigger_source": trigger_source, "last_sync_time": datetime.now().isoformat(timespec="seconds"), "last_upstream_commit": upstream_commit, "index_file": str(INDEX_PATH.relative_to(PROJECT_ROOT)), "index_mtime": datetime.fromtimestamp(INDEX_PATH.stat().st_mtime).isoformat(timespec="seconds") if INDEX_PATH.exists() else None, "mysql_seed_file": str(MYSQL_SEED_PATH.relative_to(PROJECT_ROOT)), "mysql_seed_mtime": datetime.fromtimestamp(MYSQL_SEED_PATH.stat().st_mtime).isoformat(timespec="seconds") if MYSQL_SEED_PATH.exists() else None, "output": output or "同步脚本执行完成。", } write_sync_metadata({ "last_sync_time": payload["last_sync_time"], "last_upstream_commit": payload["last_upstream_commit"], "last_trigger_source": trigger_source, "upstream_repo_url": DEFAULT_REPO_URL, "effective_upstream_repo_url": effective_repo_url, "github_proxy_prefix": github_proxy_prefix, "upstream_branch": DEFAULT_BRANCH, }) return payload finally: SYNC_LOCK.release() def run_scheduled_sync_if_due() -> None: schedule_config = read_schedule_config() if not schedule_config.get("enabled"): return next_run_at = str(schedule_config.get("next_run_at") or "").strip() if not next_run_at: write_schedule_config({ **schedule_config, "next_run_at": compute_next_run_at(str(schedule_config.get("daily_time") or "03:00")), }) return try: next_run_dt = datetime.fromisoformat(next_run_at) except ValueError: write_schedule_config({ **schedule_config, "next_run_at": compute_next_run_at(str(schedule_config.get("daily_time") or "03:00")), }) return if local_now() < next_run_dt: return try: payload = run_upstream_sync(trigger_source="schedule") message = str(payload.get("output") or "定时同步完成。") mark_schedule_run("success", message) print(f"[scheduler] upstream sync completed at {local_now().isoformat(timespec='seconds')}") except RuntimeError as err: status = "skipped" if "已有同步任务" in str(err) else "failed" mark_schedule_run(status, str(err)) print(f"[scheduler] upstream sync {status}: {err}") except Exception as err: mark_schedule_run("failed", str(err)) print(f"[scheduler] upstream sync failed: {err}") def scheduler_loop() -> None: while True: try: run_scheduled_sync_if_due() except Exception as err: print(f"[scheduler] loop error: {err}") time.sleep(SCHEDULER_POLL_SECONDS) class MobileModelsHandler(SimpleHTTPRequestHandler): def __init__(self, *args, **kwargs): super().__init__(*args, directory=str(PROJECT_ROOT), **kwargs) def guess_type(self, path: str) -> str: content_type = super().guess_type(path) lower_path = path.lower() if lower_path.endswith(".md"): return "text/markdown; charset=utf-8" if lower_path.endswith(".txt"): return "text/plain; charset=utf-8" if content_type.startswith("text/") and "charset=" not in content_type: return f"{content_type}; charset=utf-8" return content_type def _send_json(self, payload: dict[str, object], status: int = HTTPStatus.OK) -> None: data = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(data) def do_GET(self) -> None: if self.path == "/api/status": try: self._send_json(get_status_payload()) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return return super().do_GET() def do_POST(self) -> None: if self.path == "/api/sync-upstream": try: payload = run_upstream_sync() self._send_json(payload) except RuntimeError as err: status = HTTPStatus.CONFLICT if "已有同步任务" in str(err) else HTTPStatus.INTERNAL_SERVER_ERROR self._send_json({"error": str(err)}, status=status) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return if self.path == "/api/query-sql": try: content_length = int(self.headers.get("Content-Length", "0") or "0") raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}" req = json.loads(raw_body.decode("utf-8") or "{}") payload = build_sql_query_payload(req if isinstance(req, dict) else {}) self._send_json(payload) except RuntimeError as err: self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return if self.path == "/api/sync-schedule": try: content_length = int(self.headers.get("Content-Length", "0") or "0") raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}" req = json.loads(raw_body.decode("utf-8") or "{}") if not isinstance(req, dict): raise RuntimeError("请求体必须是 JSON 对象。") schedule_config = update_schedule_config(req) self._send_json( { "message": "同步设置已保存。", "sync_schedule_enabled": schedule_config.get("enabled"), "sync_schedule_time": schedule_config.get("daily_time"), "sync_schedule_timezone": schedule_config.get("timezone"), "github_proxy_prefix": schedule_config.get("github_proxy_prefix"), "effective_upstream_repo_url": get_effective_repo_url( str(schedule_config.get("github_proxy_prefix") or "") ), "sync_schedule_next_run": schedule_config.get("next_run_at"), "sync_schedule_last_run_time": schedule_config.get("last_run_time"), "sync_schedule_last_run_status": schedule_config.get("last_run_status"), "sync_schedule_last_run_message": schedule_config.get("last_run_message"), } ) except RuntimeError as err: self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return self._send_json({"error": "Not found"}, status=HTTPStatus.NOT_FOUND) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run the MobileModels web server inside Docker Compose.") parser.add_argument("--host", default="127.0.0.1", help="Bind host") parser.add_argument("--port", type=int, default=8123, help="Bind port") return parser.parse_args() def main() -> int: apply_timezone_from_env() write_schedule_config(read_schedule_config()) args = parse_args() scheduler = threading.Thread(target=scheduler_loop, name="sync-scheduler", daemon=True) scheduler.start() server = ThreadingHTTPServer((args.host, args.port), MobileModelsHandler) print(f"Serving MobileModels on http://{args.host}:{args.port}") server.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(main())