#!/usr/bin/env python3 """Compose-facing web server for MobileModels static pages and maintenance APIs.""" from __future__ import annotations import argparse import json import os import re import subprocess import threading import time from datetime import datetime, timedelta from http import HTTPStatus from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from device_mapper import build_records from project_layout import PROJECT_ROOT, WORKSPACE_ROOT from sync_upstream_mobilemodels import DEFAULT_BRANCH, DEFAULT_REPO_URL SYNC_SCRIPT = PROJECT_ROOT / "tools/sync_upstream_mobilemodels.py" INDEX_PATH = PROJECT_ROOT / "dist/device_index.json" MYSQL_SEED_PATH = PROJECT_ROOT / "dist/mobilemodels_mysql_seed.sql" MYSQL_LOADER = PROJECT_ROOT / "tools/load_mysql_seed.py" DATA_ROOT = Path(os.environ.get("MOBILEMODELS_DATA_ROOT", "/data")) MANUAL_CATALOG_PATH = WORKSPACE_ROOT / "local/manual_catalog.json" SYNC_METADATA_PATH = DATA_ROOT / "state/sync_status.json" SCHEDULE_CONFIG_PATH = DATA_ROOT / "state/sync_schedule.json" MYSQL_CONFIG_PATH = DATA_ROOT / "state/mysql_settings.json" SYNC_LOCK = threading.Lock() SCHEDULE_LOCK = threading.Lock() MYSQL_CONFIG_LOCK = threading.Lock() INDEX_ALIAS_LOCK = threading.Lock() NORMALIZE_RE = re.compile(r"[^0-9a-z\u4e00-\u9fff]+") SCHEDULE_TIME_RE = re.compile(r"^(?:[01]?\d|2[0-3]):[0-5]\d$") SCHEDULER_POLL_SECONDS = 20 INDEX_DEVICE_NAME_ALIAS_MAP: dict[str, list[str]] | None = None DEVICE_TYPES = {"phone", "tablet", "wear", "tv", "computer", "other"} def truthy_env(name: str, default: str = "0") -> bool: return os.environ.get(name, default).strip().lower() in {"1", "true", "yes", "on"} def apply_timezone_from_env() -> None: if not os.environ.get("TZ"): return try: time.tzset() except AttributeError: return def mysql_auto_load_enabled() -> bool: return bool(read_mysql_config().get("auto_load")) def mysql_probe_credentials() -> tuple[str, str]: reader_user = os.environ.get("MYSQL_READER_USER", "").strip() reader_password = os.environ.get("MYSQL_READER_PASSWORD", "") if reader_user: return reader_user, reader_password return ( os.environ.get("MYSQL_ROOT_USER", "root").strip() or "root", os.environ.get("MYSQL_ROOT_PASSWORD", ""), ) def local_now() -> datetime: return datetime.now().astimezone() def normalize_schedule_time(value: str | None, *, fallback: str = "03:00") -> str: text = str(value or "").strip() if not text: text = fallback if not SCHEDULE_TIME_RE.match(text): if fallback and text != fallback: return normalize_schedule_time(fallback, fallback="") raise RuntimeError("每日同步时间格式必须为 HH:MM,例如 03:00。") hour, minute = text.split(":", 1) return f"{int(hour):02d}:{int(minute):02d}" def normalize_github_proxy_prefix(value: str | None) -> str: text = str(value or "").strip() if not text: return "" if "://" not in text: raise RuntimeError("GitHub 加速前缀必须包含协议,例如 https://ghfast.top/") if not text.endswith("/"): text = f"{text}/" return text def get_effective_repo_url(github_proxy_prefix: str | None = None) -> str: prefix = normalize_github_proxy_prefix( github_proxy_prefix if github_proxy_prefix is not None else os.environ.get("GITHUB_PROXY_PREFIX", "") ) return f"{prefix}{DEFAULT_REPO_URL}" if prefix else DEFAULT_REPO_URL def compute_next_run_at(daily_time: str, now: datetime | None = None) -> str: current = now or local_now() hour_text, minute_text = daily_time.split(":", 1) candidate = current.replace( hour=int(hour_text), minute=int(minute_text), second=0, microsecond=0, ) if candidate <= current: candidate += timedelta(days=1) return candidate.isoformat(timespec="seconds") def default_schedule_config() -> dict[str, object]: enabled = truthy_env("SYNC_SCHEDULE_ENABLED", "0") daily_time = normalize_schedule_time(os.environ.get("SYNC_SCHEDULE_TIME", "03:00")) timezone_name = os.environ.get("TZ", "UTC").strip() or "UTC" github_proxy_prefix = normalize_github_proxy_prefix(os.environ.get("GITHUB_PROXY_PREFIX", "")) return { "enabled": enabled, "daily_time": daily_time, "timezone": timezone_name, "github_proxy_prefix": github_proxy_prefix, "next_run_at": compute_next_run_at(daily_time) if enabled else None, "last_run_time": None, "last_run_status": None, "last_run_message": None, "updated_at": None, } def default_mysql_config() -> dict[str, object]: return { "auto_load": truthy_env("MYSQL_AUTO_LOAD", "0"), "updated_at": None, } def normalize_mysql_config(raw: dict[str, object] | None) -> dict[str, object]: config = default_mysql_config() if isinstance(raw, dict): if "auto_load" in raw: value = raw.get("auto_load") config["auto_load"] = value if isinstance(value, bool) else str(value).strip().lower() in {"1", "true", "yes", "on"} if raw.get("updated_at") is not None: config["updated_at"] = raw.get("updated_at") return config def read_mysql_config() -> dict[str, object]: with MYSQL_CONFIG_LOCK: if not MYSQL_CONFIG_PATH.exists(): return normalize_mysql_config(None) try: payload = json.loads(MYSQL_CONFIG_PATH.read_text(encoding="utf-8")) except Exception: return normalize_mysql_config(None) return normalize_mysql_config(payload if isinstance(payload, dict) else None) def write_mysql_config(payload: dict[str, object]) -> dict[str, object]: normalized = normalize_mysql_config(payload) with MYSQL_CONFIG_LOCK: MYSQL_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) MYSQL_CONFIG_PATH.write_text( json.dumps(normalized, ensure_ascii=False, indent=2), encoding="utf-8", ) return normalized def update_mysql_config(payload: dict[str, object]) -> dict[str, object]: current = read_mysql_config() auto_load_raw = payload.get("auto_load", current.get("auto_load", False)) auto_load = auto_load_raw if isinstance(auto_load_raw, bool) else str(auto_load_raw).strip().lower() in {"1", "true", "yes", "on"} updated = { **current, "auto_load": auto_load, "updated_at": local_now().isoformat(timespec="seconds"), } return write_mysql_config(updated) def normalize_schedule_config(raw: dict[str, object] | None) -> dict[str, object]: config = default_schedule_config() if isinstance(raw, dict): if "enabled" in raw: value = raw.get("enabled") config["enabled"] = value if isinstance(value, bool) else str(value).strip().lower() in {"1", "true", "yes", "on"} if "daily_time" in raw: try: config["daily_time"] = normalize_schedule_time(str(raw.get("daily_time") or "")) except RuntimeError: config["daily_time"] = normalize_schedule_time(os.environ.get("SYNC_SCHEDULE_TIME", "03:00")) if raw.get("timezone"): config["timezone"] = str(raw.get("timezone")).strip() or config["timezone"] if "github_proxy_prefix" in raw: try: config["github_proxy_prefix"] = normalize_github_proxy_prefix(str(raw.get("github_proxy_prefix") or "")) except RuntimeError: config["github_proxy_prefix"] = normalize_github_proxy_prefix(os.environ.get("GITHUB_PROXY_PREFIX", "")) for key in ("last_run_time", "last_run_status", "last_run_message", "updated_at"): if raw.get(key) is not None: config[key] = raw.get(key) next_run_at = raw.get("next_run_at") if config["enabled"] and isinstance(next_run_at, str) and next_run_at.strip(): try: datetime.fromisoformat(next_run_at) config["next_run_at"] = next_run_at except ValueError: config["next_run_at"] = compute_next_run_at(str(config["daily_time"])) else: config["next_run_at"] = compute_next_run_at(str(config["daily_time"])) if config["enabled"] else None return config def read_schedule_config() -> dict[str, object]: with SCHEDULE_LOCK: if not SCHEDULE_CONFIG_PATH.exists(): return normalize_schedule_config(None) try: payload = json.loads(SCHEDULE_CONFIG_PATH.read_text(encoding="utf-8")) except Exception: return normalize_schedule_config(None) return normalize_schedule_config(payload if isinstance(payload, dict) else None) def write_schedule_config(payload: dict[str, object]) -> dict[str, object]: normalized = normalize_schedule_config(payload) with SCHEDULE_LOCK: SCHEDULE_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) SCHEDULE_CONFIG_PATH.write_text( json.dumps(normalized, ensure_ascii=False, indent=2), encoding="utf-8", ) return normalized def update_schedule_config(payload: dict[str, object]) -> dict[str, object]: current = read_schedule_config() enabled_raw = payload.get("enabled", current.get("enabled", False)) enabled = enabled_raw if isinstance(enabled_raw, bool) else str(enabled_raw).strip().lower() in {"1", "true", "yes", "on"} daily_time = normalize_schedule_time(str(payload.get("daily_time") or current.get("daily_time") or "03:00")) github_proxy_prefix = normalize_github_proxy_prefix( str(payload.get("github_proxy_prefix") if "github_proxy_prefix" in payload else current.get("github_proxy_prefix") or "") ) updated = { **current, "enabled": enabled, "daily_time": daily_time, "timezone": os.environ.get("TZ", "UTC").strip() or "UTC", "github_proxy_prefix": github_proxy_prefix, "next_run_at": compute_next_run_at(daily_time) if enabled else None, "updated_at": local_now().isoformat(timespec="seconds"), } return write_schedule_config(updated) def mark_schedule_run(status: str, message: str) -> dict[str, object]: current = read_schedule_config() updated = { **current, "last_run_time": local_now().isoformat(timespec="seconds"), "last_run_status": status, "last_run_message": message, "next_run_at": compute_next_run_at(str(current.get("daily_time") or "03:00")) if current.get("enabled") else None, "updated_at": local_now().isoformat(timespec="seconds"), } return write_schedule_config(updated) def run_command(args: list[str]) -> subprocess.CompletedProcess[str]: return subprocess.run( args, cwd=PROJECT_ROOT, text=True, capture_output=True, check=False, ) def sanitize_mysql_message(message: str, host: str | None = None, port: str | None = None) -> str: lines = [line.strip() for line in str(message or "").splitlines() if line.strip()] filtered = [line for line in lines if not line.startswith("WARNING:")] text = "\n".join(filtered or lines) host_text = host or os.environ.get("MYSQL_HOST", "mysql") port_text = port or os.environ.get("MYSQL_PORT", "3306") if "Can't connect to server on" in text or "Can't connect to MySQL server on" in text: return f"MySQL 当前无法连接: {host_text}:{port_text}" if "Access denied" in text: return f"MySQL 账号或密码无效: {host_text}:{port_text}" return text or f"MySQL 当前无法连接: {host_text}:{port_text}" def normalize_text(text: str) -> str: return NORMALIZE_RE.sub("", (text or "").lower()) def sql_string(value: str) -> str: return (value or "").replace("\\", "\\\\").replace("'", "''") def normalize_alias_list(*groups: object) -> list[str]: aliases: list[str] = [] seen: set[str] = set() for group in groups: if group is None: continue items = group if isinstance(group, list) else [group] for item in items: text = str(item or "").strip() key = normalize_text(text) if not text or not key or key in seen: continue seen.add(key) aliases.append(text) return aliases def default_manual_catalog() -> dict[str, object]: return {"brands": [], "devices": []} def read_manual_catalog() -> dict[str, object]: if not MANUAL_CATALOG_PATH.exists(): return default_manual_catalog() try: payload = json.loads(MANUAL_CATALOG_PATH.read_text(encoding="utf-8")) except Exception: return default_manual_catalog() if not isinstance(payload, dict): return default_manual_catalog() brands = payload.get("brands") devices = payload.get("devices") return { "brands": brands if isinstance(brands, list) else [], "devices": devices if isinstance(devices, list) else [], } def write_manual_catalog(payload: dict[str, object]) -> dict[str, object]: catalog = { "brands": payload.get("brands") if isinstance(payload.get("brands"), list) else [], "devices": payload.get("devices") if isinstance(payload.get("devices"), list) else [], } MANUAL_CATALOG_PATH.parent.mkdir(parents=True, exist_ok=True) MANUAL_CATALOG_PATH.write_text(json.dumps(catalog, ensure_ascii=False, indent=2), encoding="utf-8") return catalog def canonical_manual_brand(raw: object) -> dict[str, object]: if not isinstance(raw, dict): raise RuntimeError("品牌数据格式无效。") name = str(raw.get("name") or "").strip() if not name: raise RuntimeError("品牌名称不能为空。") parent_brand = str(raw.get("parent_brand") or name).strip() or name aliases = normalize_alias_list(name, raw.get("aliases")) now = local_now().isoformat(timespec="seconds") return { "name": name, "parent_brand": parent_brand, "aliases": aliases, "updated_at": str(raw.get("updated_at") or now), "created_at": str(raw.get("created_at") or now), } def canonical_manual_device(raw: object, allowed_brands: set[str]) -> dict[str, object]: if not isinstance(raw, dict): raise RuntimeError("设备数据格式无效。") brand = str(raw.get("brand") or "").strip() if not brand: raise RuntimeError("所属品牌不能为空。") if brand not in allowed_brands: raise RuntimeError(f"所属品牌不存在: {brand}") device_name = str(raw.get("device_name") or "").strip() if not device_name: raise RuntimeError("设备名称不能为空。") device_type = str(raw.get("device_type") or "").strip().lower() or "other" if device_type not in DEVICE_TYPES: raise RuntimeError("设备类型无效。") models = normalize_alias_list(raw.get("models")) if not models: raise RuntimeError("设备标识至少保留一个。") aliases = normalize_alias_list(device_name, raw.get("aliases")) section = str(raw.get("section") or "手动补录").strip() or "手动补录" stable_id = f"manual:{normalize_text(brand)}:{normalize_text(device_name)}" now = local_now().isoformat(timespec="seconds") return { "id": str(raw.get("id") or stable_id).strip() or stable_id, "brand": brand, "device_name": device_name, "device_type": device_type, "models": models, "aliases": aliases, "section": section, "updated_at": str(raw.get("updated_at") or now), "created_at": str(raw.get("created_at") or now), } def validate_manual_catalog(payload: dict[str, object]) -> dict[str, object]: brands_raw = payload.get("brands") if isinstance(payload.get("brands"), list) else [] devices_raw = payload.get("devices") if isinstance(payload.get("devices"), list) else [] brands: list[dict[str, object]] = [] seen_brand_keys: set[str] = set() for raw_brand in brands_raw: brand = canonical_manual_brand(raw_brand) brand_key = normalize_text(str(brand["name"])) if brand_key in seen_brand_keys: raise RuntimeError(f"品牌重复: {brand['name']}") seen_brand_keys.add(brand_key) brands.append(brand) builtin_brands = load_builtin_brand_names() allowed_brands = builtin_brands | {str(item["name"]) for item in brands} devices: list[dict[str, object]] = [] seen_device_ids: set[str] = set() for raw_device in devices_raw: device = canonical_manual_device(raw_device, allowed_brands) if device["id"] in seen_device_ids: raise RuntimeError(f"设备 ID 重复: {device['id']}") seen_device_ids.add(str(device["id"])) devices.append(device) return {"brands": brands, "devices": devices} def load_builtin_brand_names() -> set[str]: records = build_records(WORKSPACE_ROOT) names = { str(record.brand).strip() for record in records if str(record.source_file) != "local/manual_catalog.json" } names.update( str(record.market_brand).strip() for record in records if str(record.source_file) != "local/manual_catalog.json" ) names.update( str(record.parent_brand).strip() for record in records if str(record.source_file) != "local/manual_catalog.json" ) return {name for name in names if name} def count_manual_alias_conflicts(device: dict[str, object]) -> int: records = build_records(WORKSPACE_ROOT) alias_set = { normalize_text(alias) for alias in normalize_alias_list(device.get("models"), device.get("aliases"), device.get("device_name")) } conflicts: set[str] = set() for record in records: if str(record.source_file) == "local/manual_catalog.json": continue for alias in getattr(record, "aliases", []): alias_norm = normalize_text(str(alias)) if alias_norm and alias_norm in alias_set: conflicts.add(alias_norm) return len(conflicts) def rebuild_generated_outputs() -> dict[str, object]: build_proc = run_command( [ "python3", str(PROJECT_ROOT / "tools/device_mapper.py"), "--repo-root", str(WORKSPACE_ROOT), "build", "--output", str(INDEX_PATH), ] ) build_output = "\n".join(part for part in [build_proc.stdout.strip(), build_proc.stderr.strip()] if part).strip() if build_proc.returncode != 0: raise RuntimeError(build_output or "重建设备索引失败。") seed_proc = run_command( [ "python3", str(PROJECT_ROOT / "tools/export_mysql_seed.py"), "--repo-root", str(WORKSPACE_ROOT), "--output", str(MYSQL_SEED_PATH), ] ) seed_output = "\n".join(part for part in [seed_proc.stdout.strip(), seed_proc.stderr.strip()] if part).strip() if seed_proc.returncode != 0: raise RuntimeError(seed_output or "导出 MySQL seed 失败。") mysql_loaded = False mysql_message = "MySQL 未刷新。" if mysql_auto_load_enabled(): load_proc = run_command(["python3", str(MYSQL_LOADER)]) mysql_message = "\n".join(part for part in [load_proc.stdout.strip(), load_proc.stderr.strip()] if part).strip() or "MySQL 已刷新。" if load_proc.returncode != 0: raise RuntimeError(mysql_message) mysql_loaded = True return { "index_updated": True, "mysql_seed_updated": True, "mysql_loaded": mysql_loaded, "message": "本地覆盖库已保存,索引与 MySQL seed 已刷新。" if mysql_loaded else "本地覆盖库已保存,索引与 MySQL seed 已刷新,MySQL 未自动装载。", "build_output": build_output, "mysql_seed_output": seed_output, "mysql_message": mysql_message, } def manual_catalog_payload() -> dict[str, object]: catalog = validate_manual_catalog(read_manual_catalog()) brands = sorted(catalog["brands"], key=lambda item: str(item["name"]).lower()) devices = sorted(catalog["devices"], key=lambda item: (str(item["brand"]).lower(), str(item["device_name"]).lower())) return { "brands": brands, "devices": devices, "stats": { "brand_count": len(brands), "device_count": len(devices), }, "catalog_file": str(MANUAL_CATALOG_PATH.relative_to(PROJECT_ROOT)), } def upsert_manual_brand(payload: dict[str, object]) -> dict[str, object]: if not SYNC_LOCK.acquire(blocking=False): raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。") try: catalog = validate_manual_catalog(read_manual_catalog()) incoming = canonical_manual_brand(payload) brand_key = normalize_text(str(incoming["name"])) existing_by_key = {normalize_text(str(item["name"])): item for item in catalog["brands"]} existing = existing_by_key.get(brand_key) if existing: incoming["created_at"] = existing.get("created_at") or incoming["created_at"] catalog["brands"] = [ item for item in catalog["brands"] if normalize_text(str(item["name"])) != brand_key ] catalog["brands"].append(incoming) validated = validate_manual_catalog(catalog) write_manual_catalog(validated) rebuild_result = rebuild_generated_outputs() return { "saved_brand": incoming, "catalog": manual_catalog_payload(), **rebuild_result, } finally: SYNC_LOCK.release() def upsert_manual_device(payload: dict[str, object]) -> dict[str, object]: if not SYNC_LOCK.acquire(blocking=False): raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。") try: catalog = validate_manual_catalog(read_manual_catalog()) allowed_brands = load_builtin_brand_names() | {str(item["name"]) for item in catalog["brands"]} device_payload = canonical_manual_device(payload, allowed_brands) existing = next((item for item in catalog["devices"] if str(item.get("id")) == str(device_payload["id"])), None) if existing: device_payload["created_at"] = existing.get("created_at") or device_payload["created_at"] catalog["devices"] = [item for item in catalog["devices"] if str(item.get("id")) != str(device_payload["id"])] catalog["devices"].append(device_payload) validated = validate_manual_catalog(catalog) write_manual_catalog(validated) rebuild_result = rebuild_generated_outputs() return { "saved_device": device_payload, "alias_conflict_count": count_manual_alias_conflicts(device_payload), "catalog": manual_catalog_payload(), **rebuild_result, } finally: SYNC_LOCK.release() def delete_manual_brand(payload: dict[str, object]) -> dict[str, object]: if not SYNC_LOCK.acquire(blocking=False): raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。") try: brand_name = str(payload.get("name") or "").strip() if not brand_name: raise RuntimeError("品牌名称不能为空。") catalog = validate_manual_catalog(read_manual_catalog()) active_devices = [item for item in catalog["devices"] if str(item.get("brand") or "").strip() == brand_name] if active_devices: raise RuntimeError("该品牌下仍有关联设备,请先删除设备后再删除品牌。") next_brands = [item for item in catalog["brands"] if str(item.get("name") or "").strip() != brand_name] if len(next_brands) == len(catalog["brands"]): raise RuntimeError(f"未找到品牌: {brand_name}") write_manual_catalog({"brands": next_brands, "devices": catalog["devices"]}) rebuild_result = rebuild_generated_outputs() return { "deleted_brand": brand_name, "catalog": manual_catalog_payload(), **rebuild_result, } finally: SYNC_LOCK.release() def delete_manual_device(payload: dict[str, object]) -> dict[str, object]: if not SYNC_LOCK.acquire(blocking=False): raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。") try: device_id = str(payload.get("id") or "").strip() if not device_id: raise RuntimeError("设备 ID 不能为空。") catalog = validate_manual_catalog(read_manual_catalog()) next_devices = [item for item in catalog["devices"] if str(item.get("id") or "").strip() != device_id] if len(next_devices) == len(catalog["devices"]): raise RuntimeError(f"未找到设备: {device_id}") write_manual_catalog({"brands": catalog["brands"], "devices": next_devices}) rebuild_result = rebuild_generated_outputs() return { "deleted_device": device_id, "catalog": manual_catalog_payload(), **rebuild_result, } finally: SYNC_LOCK.release() def parse_apple_series_generation(name: str) -> dict[str, object] | None: text = re.sub(r"\s+", " ", str(name or "").strip()) if not text: return None ordinal_match = re.match(r"^(.*?)(?:\s*\((\d+)(?:st|nd|rd|th)\s+generation\)|\s+(\d+))$", text, re.IGNORECASE) if ordinal_match: base_label = re.sub(r"\s+", " ", str(ordinal_match.group(1) or "").strip()) generation = int(ordinal_match.group(2) or ordinal_match.group(3) or "0") if base_label and generation > 0: return { "base_label": base_label, "base_norm": normalize_text(base_label), "generation": generation, "chip_like": False, } chip_like = bool(re.search(r"\((?:[^)]*\b(?:a\d{1,2}|m\d{1,2})\b[^)]*)\)$", text, re.IGNORECASE)) base_label = re.sub(r"\s*\([^)]*\)\s*$", "", text).strip() if not base_label: return None return { "base_label": base_label, "base_norm": normalize_text(base_label), "generation": None, "chip_like": chip_like, } def build_index_device_name_alias_map() -> dict[str, list[str]]: if not INDEX_PATH.exists(): return {} try: payload = json.loads(INDEX_PATH.read_text(encoding="utf-8")) except Exception: return {} records = payload.get("records") if isinstance(payload, dict) else None if not isinstance(records, list): return {} grouped: dict[str, dict[str, object]] = {} for record in records: if not isinstance(record, dict): continue brand = str(record.get("market_brand") or record.get("manufacturer_brand") or record.get("brand") or "").strip() if brand != "Apple": continue device_name = str(record.get("device_name") or "").strip() parsed = parse_apple_series_generation(device_name) if not parsed or not parsed.get("base_norm"): continue base_norm = str(parsed["base_norm"]) group = grouped.setdefault( base_norm, { "base_label": str(parsed["base_label"]), "items": [], }, ) items = group["items"] if isinstance(items, list): items.append( { "device_name": device_name, "generation": parsed.get("generation"), "chip_like": bool(parsed.get("chip_like")), } ) alias_map: dict[str, set[str]] = {} for group in grouped.values(): items = group.get("items") if not isinstance(items, list): continue explicit_generations = sorted( { int(item["generation"]) for item in items if isinstance(item, dict) and isinstance(item.get("generation"), int) and int(item["generation"]) > 0 } ) max_explicit_generation = explicit_generations[-1] if explicit_generations else 0 base_label = str(group.get("base_label") or "").strip() if not base_label: continue for item in items: if not isinstance(item, dict): continue generation = item.get("generation") device_name = str(item.get("device_name") or "").strip() if not isinstance(generation, int): if device_name == base_label and max_explicit_generation >= 2: generation = 1 elif item.get("chip_like") and max_explicit_generation >= 1: generation = max_explicit_generation + 1 if not isinstance(generation, int) or generation <= 0: continue alias_key = normalize_text(f"{base_label} {generation}") if not alias_key: continue alias_map.setdefault(alias_key, set()).add(device_name) return {key: sorted(values) for key, values in alias_map.items()} def resolve_index_device_names(alias_norm: str) -> list[str]: global INDEX_DEVICE_NAME_ALIAS_MAP if INDEX_DEVICE_NAME_ALIAS_MAP is None: with INDEX_ALIAS_LOCK: if INDEX_DEVICE_NAME_ALIAS_MAP is None: INDEX_DEVICE_NAME_ALIAS_MAP = build_index_device_name_alias_map() return list((INDEX_DEVICE_NAME_ALIAS_MAP or {}).get(alias_norm, [])) def mysql_command(database: str | None = None) -> list[str]: command = [ "mysql", f"--host={os.environ.get('MYSQL_HOST', 'mysql')}", f"--port={os.environ.get('MYSQL_PORT', '3306')}", f"--user={os.environ.get('MYSQL_READER_USER', '')}", "--protocol=TCP", "--default-character-set=utf8mb4", "--batch", "--raw", ] if database: command.append(database) return command def mysql_env() -> dict[str, str]: env = os.environ.copy() env["MYSQL_PWD"] = os.environ.get("MYSQL_READER_PASSWORD", "") return env def run_mysql_query(sql: str, database: str | None = None) -> list[dict[str, str | None]]: proc = subprocess.run( mysql_command(database=database), env=mysql_env(), input=sql, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if proc.returncode != 0: message = sanitize_mysql_message( proc.stderr.strip() or proc.stdout.strip() or f"mysql exited with {proc.returncode}" ) raise RuntimeError(message) lines = [line for line in proc.stdout.splitlines() if line.strip()] if not lines: return [] headers = lines[0].split("\t") rows: list[dict[str, str | None]] = [] for line in lines[1:]: values = line.split("\t") row = {} for idx, header in enumerate(headers): value = values[idx] if idx < len(values) else "" row[header] = None if value == "NULL" else value rows.append(row) return rows def build_sql_query_payload(payload: dict[str, object]) -> dict[str, object]: raw_value = str(payload.get("model_raw") or payload.get("model") or "").strip() if not raw_value: raise RuntimeError("请填写设备标识或设备名称。") alias_norm = normalize_text(raw_value) if not alias_norm: raise RuntimeError("设备标识无法归一化,请检查输入。") limit_value = payload.get("limit", 20) try: limit = int(limit_value) except Exception as err: raise RuntimeError("limit 必须是数字。") from err limit = max(1, min(limit, 100)) exact_sql = f""" SELECT model, record_id, alias_norm, device_name, brand, manufacturer_brand, parent_brand, market_brand, device_type, source_file, section, source_rank, source_weight, code, code_alias, ver_name FROM mobilemodels.mm_device_catalog WHERE alias_norm = '{sql_string(alias_norm)}' ORDER BY source_rank ASC, record_id ASC LIMIT {limit}; """.strip() rows = run_mysql_query(exact_sql) sql = exact_sql match_strategy = "alias_norm_exact" match_strategy_label = "alias_norm 精确匹配" resolved_device_names: list[str] = [] if not rows: resolved_device_names = resolve_index_device_names(alias_norm) if resolved_device_names: name_conditions = " OR ".join( f"device_name = '{sql_string(device_name)}'" for device_name in resolved_device_names ) sql = f""" SELECT model, record_id, alias_norm, device_name, brand, manufacturer_brand, parent_brand, market_brand, device_type, source_file, section, source_rank, source_weight, code, code_alias, ver_name FROM mobilemodels.mm_device_catalog WHERE {name_conditions} ORDER BY source_rank ASC, record_id ASC LIMIT {limit}; """.strip() rows = run_mysql_query(sql) if rows: match_strategy = "device_name_alias" match_strategy_label = "设备名称别名映射" return { "query_mode": "sql", "model_raw": raw_value, "alias_norm": alias_norm, "limit": limit, "sql": sql, "match_strategy": match_strategy, "match_strategy_label": match_strategy_label, "resolved_device_names": resolved_device_names, "rows": rows, "row_count": len(rows), } def build_sql_device_name_query_payload(payload: dict[str, object]) -> dict[str, object]: raw_value = str(payload.get("device_name") or payload.get("name") or "").strip() if not raw_value: raise RuntimeError("请填写设备名称。") limit_value = payload.get("limit", 20) try: limit = int(limit_value) except Exception as err: raise RuntimeError("limit 必须是数字。") from err limit = max(1, min(limit, 100)) alias_norm = normalize_text(raw_value) resolved_device_names = resolve_index_device_names(alias_norm) if alias_norm else [] rows: list[dict[str, str | None]] = [] match_strategy = "device_name_like" match_strategy_label = "device_name 模糊匹配" if resolved_device_names: name_conditions = " OR ".join( f"device_name = '{sql_string(device_name)}'" for device_name in resolved_device_names ) sql = f""" SELECT model, record_id, alias_norm, device_name, brand, manufacturer_brand, parent_brand, market_brand, device_type, source_file, section, source_rank, source_weight, code, code_alias, ver_name FROM mobilemodels.mm_device_catalog WHERE {name_conditions} ORDER BY source_rank ASC, record_id ASC LIMIT {limit}; """.strip() rows = run_mysql_query(sql) if rows: match_strategy = "device_name_alias" match_strategy_label = "设备名称别名映射" else: sql = "" if not rows: sql = f""" SELECT model, record_id, alias_norm, device_name, brand, manufacturer_brand, parent_brand, market_brand, device_type, source_file, section, source_rank, source_weight, code, code_alias, ver_name FROM mobilemodels.mm_device_catalog WHERE device_name LIKE '%{sql_string(raw_value)}%' OR ver_name LIKE '%{sql_string(raw_value)}%' ORDER BY source_rank ASC, record_id ASC LIMIT {limit}; """.strip() rows = run_mysql_query(sql) return { "query_mode": "sql_device_name", "device_name": raw_value, "alias_norm": alias_norm, "limit": limit, "sql": sql, "match_strategy": match_strategy, "match_strategy_label": match_strategy_label, "resolved_device_names": resolved_device_names, "rows": rows, "row_count": len(rows), } def read_sync_metadata() -> dict[str, object]: if not SYNC_METADATA_PATH.exists(): return {} try: return json.loads(SYNC_METADATA_PATH.read_text(encoding="utf-8")) except Exception: return {} def write_sync_metadata(payload: dict[str, object]) -> None: SYNC_METADATA_PATH.parent.mkdir(parents=True, exist_ok=True) SYNC_METADATA_PATH.write_text( json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8", ) def get_status_payload() -> dict[str, object]: index_mtime = None mysql_seed_mtime = None if INDEX_PATH.exists(): index_mtime = datetime.fromtimestamp(INDEX_PATH.stat().st_mtime).isoformat(timespec="seconds") if MYSQL_SEED_PATH.exists(): mysql_seed_mtime = datetime.fromtimestamp(MYSQL_SEED_PATH.stat().st_mtime).isoformat(timespec="seconds") mysql_host = os.environ.get("MYSQL_HOST", "mysql") mysql_port = os.environ.get("MYSQL_PORT", "3306") mysql_database = os.environ.get("MYSQL_DATABASE", "mobilemodels") mysql_reader_user = os.environ.get("MYSQL_READER_USER", "") mysql_reader_password = os.environ.get("MYSQL_READER_PASSWORD", "") mysql_config = read_mysql_config() mysql_auto_load = bool(mysql_config.get("auto_load")) mysql_ready = False mysql_status = "" sync_metadata = read_sync_metadata() schedule_config = read_schedule_config() github_proxy_prefix = str(schedule_config.get("github_proxy_prefix") or "") effective_repo_url = get_effective_repo_url(github_proxy_prefix) probe_user, probe_password = mysql_probe_credentials() mysql_proc = run_command( [ "python3", str(MYSQL_LOADER), "--check-only", "--wait-timeout", "5", f"--user={probe_user}", f"--password={probe_password}", ] ) if mysql_proc.returncode == 0: mysql_ready = True mysql_status = mysql_proc.stdout.strip() or "MySQL ready" if not mysql_auto_load: mysql_status = f"{mysql_status}; auto load disabled" else: failure_message = sanitize_mysql_message( mysql_proc.stderr.strip() or mysql_proc.stdout.strip() or "MySQL unavailable", host=mysql_host, port=mysql_port, ) mysql_status = failure_message if mysql_auto_load else f"{failure_message}; auto load disabled" return { "supports_upstream_sync": True, "storage_mode": "docker_volume", "project_root": str(PROJECT_ROOT), "workspace_root": str(WORKSPACE_ROOT), "data_root": str(DATA_ROOT), "mysql_auto_load": mysql_auto_load, "mysql_config_file": str(MYSQL_CONFIG_PATH.relative_to(DATA_ROOT)), "mysql_config_updated_at": mysql_config.get("updated_at"), "upstream_repo_url": DEFAULT_REPO_URL, "effective_upstream_repo_url": effective_repo_url, "upstream_branch": DEFAULT_BRANCH, "last_sync_time": sync_metadata.get("last_sync_time"), "last_upstream_commit": sync_metadata.get("last_upstream_commit"), "last_sync_trigger": sync_metadata.get("last_trigger_source"), "index_file": str(INDEX_PATH.relative_to(PROJECT_ROOT)), "index_mtime": index_mtime, "mysql_seed_file": str(MYSQL_SEED_PATH.relative_to(PROJECT_ROOT)), "mysql_seed_mtime": mysql_seed_mtime, "sync_schedule_file": str(SCHEDULE_CONFIG_PATH.relative_to(DATA_ROOT)), "sync_schedule_enabled": schedule_config.get("enabled"), "sync_schedule_time": schedule_config.get("daily_time"), "sync_schedule_timezone": schedule_config.get("timezone"), "github_proxy_prefix": github_proxy_prefix, "sync_schedule_next_run": schedule_config.get("next_run_at"), "sync_schedule_last_run_time": schedule_config.get("last_run_time"), "sync_schedule_last_run_status": schedule_config.get("last_run_status"), "sync_schedule_last_run_message": schedule_config.get("last_run_message"), "mysql_host": mysql_host, "mysql_port": mysql_port, "mysql_database": mysql_database, "mysql_reader_user": mysql_reader_user, "mysql_reader_password": mysql_reader_password, "mysql_ready": mysql_ready, "mysql_status": mysql_status, } def run_upstream_sync(trigger_source: str = "manual") -> dict[str, object]: if not SYNC_LOCK.acquire(blocking=False): raise RuntimeError("已有同步任务在执行,请稍后再试。") try: schedule_config = read_schedule_config() github_proxy_prefix = str(schedule_config.get("github_proxy_prefix") or "") effective_repo_url = get_effective_repo_url(github_proxy_prefix) upstream_proc = run_command( ["git", "ls-remote", effective_repo_url, f"refs/heads/{DEFAULT_BRANCH}"] ) upstream_commit = "" if upstream_proc.returncode == 0 and upstream_proc.stdout.strip(): upstream_commit = upstream_proc.stdout.split()[0] command = [ "python3", str(SYNC_SCRIPT), f"--repo-url={effective_repo_url}", "--build-index", "--export-mysql-seed", ] if mysql_auto_load_enabled(): command.append("--load-mysql") proc = run_command(command) output = "\n".join( part for part in [proc.stdout.strip(), proc.stderr.strip()] if part ).strip() if proc.returncode != 0: raise RuntimeError(output or f"sync script failed with exit code {proc.returncode}") payload = { "storage_mode": "docker_volume", "project_root": str(PROJECT_ROOT), "workspace_root": str(WORKSPACE_ROOT), "data_root": str(DATA_ROOT), "upstream_repo_url": DEFAULT_REPO_URL, "effective_upstream_repo_url": effective_repo_url, "github_proxy_prefix": github_proxy_prefix, "upstream_branch": DEFAULT_BRANCH, "upstream_commit": upstream_commit, "trigger_source": trigger_source, "last_sync_time": datetime.now().isoformat(timespec="seconds"), "last_upstream_commit": upstream_commit, "index_file": str(INDEX_PATH.relative_to(PROJECT_ROOT)), "index_mtime": datetime.fromtimestamp(INDEX_PATH.stat().st_mtime).isoformat(timespec="seconds") if INDEX_PATH.exists() else None, "mysql_seed_file": str(MYSQL_SEED_PATH.relative_to(PROJECT_ROOT)), "mysql_seed_mtime": datetime.fromtimestamp(MYSQL_SEED_PATH.stat().st_mtime).isoformat(timespec="seconds") if MYSQL_SEED_PATH.exists() else None, "output": output or "同步脚本执行完成。", } write_sync_metadata({ "last_sync_time": payload["last_sync_time"], "last_upstream_commit": payload["last_upstream_commit"], "last_trigger_source": trigger_source, "upstream_repo_url": DEFAULT_REPO_URL, "effective_upstream_repo_url": effective_repo_url, "github_proxy_prefix": github_proxy_prefix, "upstream_branch": DEFAULT_BRANCH, }) return payload finally: SYNC_LOCK.release() def run_mysql_init(trigger_source: str = "manual") -> dict[str, object]: if not SYNC_LOCK.acquire(blocking=False): raise RuntimeError("已有同步或 MySQL 初始化任务在执行,请稍后再试。") try: proc = run_command(["python3", str(MYSQL_LOADER)]) output = "\n".join( part for part in [proc.stdout.strip(), proc.stderr.strip()] if part ).strip() if proc.returncode != 0: raise RuntimeError(output or f"mysql load failed with exit code {proc.returncode}") payload = get_status_payload() payload.update( { "trigger_source": trigger_source, "output": output or "MySQL 初始化完成。", } ) return payload finally: SYNC_LOCK.release() def run_scheduled_sync_if_due() -> None: schedule_config = read_schedule_config() if not schedule_config.get("enabled"): return next_run_at = str(schedule_config.get("next_run_at") or "").strip() if not next_run_at: write_schedule_config({ **schedule_config, "next_run_at": compute_next_run_at(str(schedule_config.get("daily_time") or "03:00")), }) return try: next_run_dt = datetime.fromisoformat(next_run_at) except ValueError: write_schedule_config({ **schedule_config, "next_run_at": compute_next_run_at(str(schedule_config.get("daily_time") or "03:00")), }) return if local_now() < next_run_dt: return try: payload = run_upstream_sync(trigger_source="schedule") message = str(payload.get("output") or "定时同步完成。") mark_schedule_run("success", message) print(f"[scheduler] upstream sync completed at {local_now().isoformat(timespec='seconds')}") except RuntimeError as err: status = "skipped" if "已有同步任务" in str(err) else "failed" mark_schedule_run(status, str(err)) print(f"[scheduler] upstream sync {status}: {err}") except Exception as err: mark_schedule_run("failed", str(err)) print(f"[scheduler] upstream sync failed: {err}") def scheduler_loop() -> None: while True: try: run_scheduled_sync_if_due() except Exception as err: print(f"[scheduler] loop error: {err}") time.sleep(SCHEDULER_POLL_SECONDS) class MobileModelsHandler(SimpleHTTPRequestHandler): def __init__(self, *args, **kwargs): super().__init__(*args, directory=str(PROJECT_ROOT), **kwargs) def guess_type(self, path: str) -> str: content_type = super().guess_type(path) lower_path = path.lower() if lower_path.endswith(".md"): return "text/markdown; charset=utf-8" if lower_path.endswith(".txt"): return "text/plain; charset=utf-8" if content_type.startswith("text/") and "charset=" not in content_type: return f"{content_type}; charset=utf-8" return content_type def _send_json(self, payload: dict[str, object], status: int = HTTPStatus.OK) -> None: data = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(data) def do_GET(self) -> None: if self.path == "/api/status": try: self._send_json(get_status_payload()) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return if self.path == "/api/manual-catalog": try: self._send_json(manual_catalog_payload()) except RuntimeError as err: self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return return super().do_GET() def do_POST(self) -> None: if self.path == "/api/sync-upstream": try: payload = run_upstream_sync() self._send_json(payload) except RuntimeError as err: status = HTTPStatus.CONFLICT if "已有同步任务" in str(err) else HTTPStatus.INTERNAL_SERVER_ERROR self._send_json({"error": str(err)}, status=status) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return if self.path == "/api/init-mysql": try: payload = run_mysql_init() self._send_json(payload) except RuntimeError as err: status = HTTPStatus.CONFLICT if "已有同步或 MySQL 初始化任务" in str(err) else HTTPStatus.INTERNAL_SERVER_ERROR self._send_json({"error": str(err)}, status=status) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return if self.path == "/api/query-sql": try: content_length = int(self.headers.get("Content-Length", "0") or "0") raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}" req = json.loads(raw_body.decode("utf-8") or "{}") payload = build_sql_query_payload(req if isinstance(req, dict) else {}) self._send_json(payload) except RuntimeError as err: self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return if self.path == "/api/query-sql-device-name": try: content_length = int(self.headers.get("Content-Length", "0") or "0") raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}" req = json.loads(raw_body.decode("utf-8") or "{}") payload = build_sql_device_name_query_payload(req if isinstance(req, dict) else {}) self._send_json(payload) except RuntimeError as err: self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return if self.path == "/api/sync-schedule": try: content_length = int(self.headers.get("Content-Length", "0") or "0") raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}" req = json.loads(raw_body.decode("utf-8") or "{}") if not isinstance(req, dict): raise RuntimeError("请求体必须是 JSON 对象。") schedule_config = update_schedule_config(req) self._send_json( { "message": "同步设置已保存。", "sync_schedule_enabled": schedule_config.get("enabled"), "sync_schedule_time": schedule_config.get("daily_time"), "sync_schedule_timezone": schedule_config.get("timezone"), "github_proxy_prefix": schedule_config.get("github_proxy_prefix"), "effective_upstream_repo_url": get_effective_repo_url( str(schedule_config.get("github_proxy_prefix") or "") ), "sync_schedule_next_run": schedule_config.get("next_run_at"), "sync_schedule_last_run_time": schedule_config.get("last_run_time"), "sync_schedule_last_run_status": schedule_config.get("last_run_status"), "sync_schedule_last_run_message": schedule_config.get("last_run_message"), } ) except RuntimeError as err: self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return if self.path == "/api/mysql-settings": try: content_length = int(self.headers.get("Content-Length", "0") or "0") raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}" req = json.loads(raw_body.decode("utf-8") or "{}") if not isinstance(req, dict): raise RuntimeError("请求体必须是 JSON 对象。") mysql_config = update_mysql_config(req) self._send_json( { "message": "MySQL 自动装载设置已保存。", "mysql_auto_load": mysql_config.get("auto_load"), "mysql_config_file": str(MYSQL_CONFIG_PATH.relative_to(DATA_ROOT)), "mysql_config_updated_at": mysql_config.get("updated_at"), } ) except RuntimeError as err: self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return if self.path in {"/api/manual-brand", "/api/manual-device", "/api/manual-brand-delete", "/api/manual-device-delete"}: try: content_length = int(self.headers.get("Content-Length", "0") or "0") raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}" req = json.loads(raw_body.decode("utf-8") or "{}") if not isinstance(req, dict): raise RuntimeError("请求体必须是 JSON 对象。") if self.path == "/api/manual-brand": payload = upsert_manual_brand(req) elif self.path == "/api/manual-device": payload = upsert_manual_device(req) elif self.path == "/api/manual-brand-delete": payload = delete_manual_brand(req) else: payload = delete_manual_device(req) self._send_json(payload) except RuntimeError as err: status = HTTPStatus.CONFLICT if "已有同步或数据重建任务" in str(err) else HTTPStatus.BAD_REQUEST self._send_json({"error": str(err)}, status=status) except Exception as err: self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) return self._send_json({"error": "Not found"}, status=HTTPStatus.NOT_FOUND) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run the MobileModels web server inside Docker Compose.") parser.add_argument("--host", default="127.0.0.1", help="Bind host") parser.add_argument("--port", type=int, default=8123, help="Bind port") return parser.parse_args() def main() -> int: apply_timezone_from_env() write_schedule_config(read_schedule_config()) write_mysql_config(read_mysql_config()) args = parse_args() scheduler = threading.Thread(target=scheduler_loop, name="sync-scheduler", daemon=True) scheduler.start() server = ThreadingHTTPServer((args.host, args.port), MobileModelsHandler) print(f"Serving MobileModels on http://{args.host}:{args.port}") server.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(main())