1474 lines
57 KiB
Python
1474 lines
57 KiB
Python
#!/usr/bin/env python3
|
|
"""Compose-facing web server for MobileModels static pages and maintenance APIs."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from http import HTTPStatus
|
|
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
|
|
from device_mapper import build_records
|
|
from project_layout import PROJECT_ROOT, WORKSPACE_ROOT
|
|
from sync_upstream_mobilemodels import DEFAULT_BRANCH, DEFAULT_REPO_URL
|
|
|
|
|
|
SYNC_SCRIPT = PROJECT_ROOT / "tools/sync_upstream_mobilemodels.py"
|
|
INDEX_PATH = PROJECT_ROOT / "dist/device_index.json"
|
|
MYSQL_SEED_PATH = PROJECT_ROOT / "dist/mobilemodels_mysql_seed.sql"
|
|
MYSQL_LOADER = PROJECT_ROOT / "tools/load_mysql_seed.py"
|
|
DATA_ROOT = Path(os.environ.get("MOBILEMODELS_DATA_ROOT", "/data"))
|
|
MANUAL_CATALOG_PATH = WORKSPACE_ROOT / "local/manual_catalog.json"
|
|
SYNC_METADATA_PATH = DATA_ROOT / "state/sync_status.json"
|
|
SCHEDULE_CONFIG_PATH = DATA_ROOT / "state/sync_schedule.json"
|
|
MYSQL_CONFIG_PATH = DATA_ROOT / "state/mysql_settings.json"
|
|
SYNC_LOCK = threading.Lock()
|
|
SCHEDULE_LOCK = threading.Lock()
|
|
MYSQL_CONFIG_LOCK = threading.Lock()
|
|
INDEX_ALIAS_LOCK = threading.Lock()
|
|
MANUAL_REBUILD_LOCK = threading.Lock()
|
|
NORMALIZE_RE = re.compile(r"[^0-9a-z\u4e00-\u9fff]+")
|
|
SCHEDULE_TIME_RE = re.compile(r"^(?:[01]?\d|2[0-3]):[0-5]\d$")
|
|
SCHEDULER_POLL_SECONDS = 20
|
|
INDEX_DEVICE_NAME_ALIAS_MAP: dict[str, list[str]] | None = None
|
|
DEVICE_TYPES = {"phone", "tablet", "wear", "tv", "computer", "other"}
|
|
LAST_MANUAL_MYSQL_LOAD_STATUS: dict[str, object] = {
|
|
"running": False,
|
|
"last_started_at": None,
|
|
"last_finished_at": None,
|
|
"last_status": None,
|
|
"last_message": None,
|
|
}
|
|
|
|
|
|
def truthy_env(name: str, default: str = "0") -> bool:
|
|
return os.environ.get(name, default).strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def apply_timezone_from_env() -> None:
|
|
if not os.environ.get("TZ"):
|
|
return
|
|
try:
|
|
time.tzset()
|
|
except AttributeError:
|
|
return
|
|
|
|
|
|
def mysql_auto_load_enabled() -> bool:
|
|
return bool(read_mysql_config().get("auto_load"))
|
|
|
|
|
|
def mysql_probe_credentials() -> tuple[str, str]:
|
|
reader_user = os.environ.get("MYSQL_READER_USER", "").strip()
|
|
reader_password = os.environ.get("MYSQL_READER_PASSWORD", "")
|
|
if reader_user:
|
|
return reader_user, reader_password
|
|
return (
|
|
os.environ.get("MYSQL_ROOT_USER", "root").strip() or "root",
|
|
os.environ.get("MYSQL_ROOT_PASSWORD", ""),
|
|
)
|
|
|
|
|
|
def local_now() -> datetime:
|
|
return datetime.now().astimezone()
|
|
|
|
|
|
def normalize_schedule_time(value: str | None, *, fallback: str = "03:00") -> str:
|
|
text = str(value or "").strip()
|
|
if not text:
|
|
text = fallback
|
|
if not SCHEDULE_TIME_RE.match(text):
|
|
if fallback and text != fallback:
|
|
return normalize_schedule_time(fallback, fallback="")
|
|
raise RuntimeError("每日同步时间格式必须为 HH:MM,例如 03:00。")
|
|
hour, minute = text.split(":", 1)
|
|
return f"{int(hour):02d}:{int(minute):02d}"
|
|
|
|
|
|
def normalize_github_proxy_prefix(value: str | None) -> str:
|
|
text = str(value or "").strip()
|
|
if not text:
|
|
return ""
|
|
if "://" not in text:
|
|
raise RuntimeError("GitHub 加速前缀必须包含协议,例如 https://ghfast.top/")
|
|
if not text.endswith("/"):
|
|
text = f"{text}/"
|
|
return text
|
|
|
|
|
|
def get_effective_repo_url(github_proxy_prefix: str | None = None) -> str:
|
|
prefix = normalize_github_proxy_prefix(
|
|
github_proxy_prefix if github_proxy_prefix is not None else os.environ.get("GITHUB_PROXY_PREFIX", "")
|
|
)
|
|
return f"{prefix}{DEFAULT_REPO_URL}" if prefix else DEFAULT_REPO_URL
|
|
|
|
|
|
def compute_next_run_at(daily_time: str, now: datetime | None = None) -> str:
|
|
current = now or local_now()
|
|
hour_text, minute_text = daily_time.split(":", 1)
|
|
candidate = current.replace(
|
|
hour=int(hour_text),
|
|
minute=int(minute_text),
|
|
second=0,
|
|
microsecond=0,
|
|
)
|
|
if candidate <= current:
|
|
candidate += timedelta(days=1)
|
|
return candidate.isoformat(timespec="seconds")
|
|
|
|
|
|
def default_schedule_config() -> dict[str, object]:
|
|
enabled = truthy_env("SYNC_SCHEDULE_ENABLED", "0")
|
|
daily_time = normalize_schedule_time(os.environ.get("SYNC_SCHEDULE_TIME", "03:00"))
|
|
timezone_name = os.environ.get("TZ", "UTC").strip() or "UTC"
|
|
github_proxy_prefix = normalize_github_proxy_prefix(os.environ.get("GITHUB_PROXY_PREFIX", ""))
|
|
return {
|
|
"enabled": enabled,
|
|
"daily_time": daily_time,
|
|
"timezone": timezone_name,
|
|
"github_proxy_prefix": github_proxy_prefix,
|
|
"next_run_at": compute_next_run_at(daily_time) if enabled else None,
|
|
"last_run_time": None,
|
|
"last_run_status": None,
|
|
"last_run_message": None,
|
|
"updated_at": None,
|
|
}
|
|
|
|
|
|
def default_mysql_config() -> dict[str, object]:
|
|
return {
|
|
"auto_load": truthy_env("MYSQL_AUTO_LOAD", "0"),
|
|
"updated_at": None,
|
|
}
|
|
|
|
|
|
def normalize_mysql_config(raw: dict[str, object] | None) -> dict[str, object]:
|
|
config = default_mysql_config()
|
|
if isinstance(raw, dict):
|
|
if "auto_load" in raw:
|
|
value = raw.get("auto_load")
|
|
config["auto_load"] = value if isinstance(value, bool) else str(value).strip().lower() in {"1", "true", "yes", "on"}
|
|
if raw.get("updated_at") is not None:
|
|
config["updated_at"] = raw.get("updated_at")
|
|
return config
|
|
|
|
|
|
def read_mysql_config() -> dict[str, object]:
|
|
with MYSQL_CONFIG_LOCK:
|
|
if not MYSQL_CONFIG_PATH.exists():
|
|
return normalize_mysql_config(None)
|
|
try:
|
|
payload = json.loads(MYSQL_CONFIG_PATH.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return normalize_mysql_config(None)
|
|
return normalize_mysql_config(payload if isinstance(payload, dict) else None)
|
|
|
|
|
|
def write_mysql_config(payload: dict[str, object]) -> dict[str, object]:
|
|
normalized = normalize_mysql_config(payload)
|
|
with MYSQL_CONFIG_LOCK:
|
|
MYSQL_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
MYSQL_CONFIG_PATH.write_text(
|
|
json.dumps(normalized, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
return normalized
|
|
|
|
|
|
def update_mysql_config(payload: dict[str, object]) -> dict[str, object]:
|
|
current = read_mysql_config()
|
|
auto_load_raw = payload.get("auto_load", current.get("auto_load", False))
|
|
auto_load = auto_load_raw if isinstance(auto_load_raw, bool) else str(auto_load_raw).strip().lower() in {"1", "true", "yes", "on"}
|
|
updated = {
|
|
**current,
|
|
"auto_load": auto_load,
|
|
"updated_at": local_now().isoformat(timespec="seconds"),
|
|
}
|
|
return write_mysql_config(updated)
|
|
|
|
|
|
def normalize_schedule_config(raw: dict[str, object] | None) -> dict[str, object]:
|
|
config = default_schedule_config()
|
|
if isinstance(raw, dict):
|
|
if "enabled" in raw:
|
|
value = raw.get("enabled")
|
|
config["enabled"] = value if isinstance(value, bool) else str(value).strip().lower() in {"1", "true", "yes", "on"}
|
|
if "daily_time" in raw:
|
|
try:
|
|
config["daily_time"] = normalize_schedule_time(str(raw.get("daily_time") or ""))
|
|
except RuntimeError:
|
|
config["daily_time"] = normalize_schedule_time(os.environ.get("SYNC_SCHEDULE_TIME", "03:00"))
|
|
if raw.get("timezone"):
|
|
config["timezone"] = str(raw.get("timezone")).strip() or config["timezone"]
|
|
if "github_proxy_prefix" in raw:
|
|
try:
|
|
config["github_proxy_prefix"] = normalize_github_proxy_prefix(str(raw.get("github_proxy_prefix") or ""))
|
|
except RuntimeError:
|
|
config["github_proxy_prefix"] = normalize_github_proxy_prefix(os.environ.get("GITHUB_PROXY_PREFIX", ""))
|
|
for key in ("last_run_time", "last_run_status", "last_run_message", "updated_at"):
|
|
if raw.get(key) is not None:
|
|
config[key] = raw.get(key)
|
|
next_run_at = raw.get("next_run_at")
|
|
if config["enabled"] and isinstance(next_run_at, str) and next_run_at.strip():
|
|
try:
|
|
datetime.fromisoformat(next_run_at)
|
|
config["next_run_at"] = next_run_at
|
|
except ValueError:
|
|
config["next_run_at"] = compute_next_run_at(str(config["daily_time"]))
|
|
else:
|
|
config["next_run_at"] = compute_next_run_at(str(config["daily_time"])) if config["enabled"] else None
|
|
return config
|
|
|
|
|
|
def read_schedule_config() -> dict[str, object]:
|
|
with SCHEDULE_LOCK:
|
|
if not SCHEDULE_CONFIG_PATH.exists():
|
|
return normalize_schedule_config(None)
|
|
try:
|
|
payload = json.loads(SCHEDULE_CONFIG_PATH.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return normalize_schedule_config(None)
|
|
return normalize_schedule_config(payload if isinstance(payload, dict) else None)
|
|
|
|
|
|
def write_schedule_config(payload: dict[str, object]) -> dict[str, object]:
|
|
normalized = normalize_schedule_config(payload)
|
|
with SCHEDULE_LOCK:
|
|
SCHEDULE_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
SCHEDULE_CONFIG_PATH.write_text(
|
|
json.dumps(normalized, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
return normalized
|
|
|
|
|
|
def update_schedule_config(payload: dict[str, object]) -> dict[str, object]:
|
|
current = read_schedule_config()
|
|
enabled_raw = payload.get("enabled", current.get("enabled", False))
|
|
enabled = enabled_raw if isinstance(enabled_raw, bool) else str(enabled_raw).strip().lower() in {"1", "true", "yes", "on"}
|
|
daily_time = normalize_schedule_time(str(payload.get("daily_time") or current.get("daily_time") or "03:00"))
|
|
github_proxy_prefix = normalize_github_proxy_prefix(
|
|
str(payload.get("github_proxy_prefix") if "github_proxy_prefix" in payload else current.get("github_proxy_prefix") or "")
|
|
)
|
|
updated = {
|
|
**current,
|
|
"enabled": enabled,
|
|
"daily_time": daily_time,
|
|
"timezone": os.environ.get("TZ", "UTC").strip() or "UTC",
|
|
"github_proxy_prefix": github_proxy_prefix,
|
|
"next_run_at": compute_next_run_at(daily_time) if enabled else None,
|
|
"updated_at": local_now().isoformat(timespec="seconds"),
|
|
}
|
|
return write_schedule_config(updated)
|
|
|
|
|
|
def mark_schedule_run(status: str, message: str) -> dict[str, object]:
|
|
current = read_schedule_config()
|
|
updated = {
|
|
**current,
|
|
"last_run_time": local_now().isoformat(timespec="seconds"),
|
|
"last_run_status": status,
|
|
"last_run_message": message,
|
|
"next_run_at": compute_next_run_at(str(current.get("daily_time") or "03:00")) if current.get("enabled") else None,
|
|
"updated_at": local_now().isoformat(timespec="seconds"),
|
|
}
|
|
return write_schedule_config(updated)
|
|
|
|
|
|
def run_command(args: list[str]) -> subprocess.CompletedProcess[str]:
|
|
return subprocess.run(
|
|
args,
|
|
cwd=PROJECT_ROOT,
|
|
text=True,
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
|
|
|
|
def sanitize_mysql_message(message: str, host: str | None = None, port: str | None = None) -> str:
|
|
lines = [line.strip() for line in str(message or "").splitlines() if line.strip()]
|
|
filtered = [line for line in lines if not line.startswith("WARNING:")]
|
|
text = "\n".join(filtered or lines)
|
|
host_text = host or os.environ.get("MYSQL_HOST", "mysql")
|
|
port_text = port or os.environ.get("MYSQL_PORT", "3306")
|
|
|
|
if "Can't connect to server on" in text or "Can't connect to MySQL server on" in text:
|
|
return f"MySQL 当前无法连接: {host_text}:{port_text}"
|
|
if "Access denied" in text:
|
|
return f"MySQL 账号或密码无效: {host_text}:{port_text}"
|
|
return text or f"MySQL 当前无法连接: {host_text}:{port_text}"
|
|
|
|
|
|
def normalize_text(text: str) -> str:
|
|
return NORMALIZE_RE.sub("", (text or "").lower())
|
|
|
|
|
|
def sql_string(value: str) -> str:
|
|
return (value or "").replace("\\", "\\\\").replace("'", "''")
|
|
|
|
|
|
def normalize_alias_list(*groups: object) -> list[str]:
|
|
aliases: list[str] = []
|
|
seen: set[str] = set()
|
|
for group in groups:
|
|
if group is None:
|
|
continue
|
|
items = group if isinstance(group, list) else [group]
|
|
for item in items:
|
|
text = str(item or "").strip()
|
|
key = normalize_text(text)
|
|
if not text or not key or key in seen:
|
|
continue
|
|
seen.add(key)
|
|
aliases.append(text)
|
|
return aliases
|
|
|
|
|
|
def default_manual_catalog() -> dict[str, object]:
|
|
return {"brands": [], "devices": []}
|
|
|
|
|
|
def read_manual_catalog() -> dict[str, object]:
|
|
if not MANUAL_CATALOG_PATH.exists():
|
|
return default_manual_catalog()
|
|
try:
|
|
payload = json.loads(MANUAL_CATALOG_PATH.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return default_manual_catalog()
|
|
if not isinstance(payload, dict):
|
|
return default_manual_catalog()
|
|
brands = payload.get("brands")
|
|
devices = payload.get("devices")
|
|
return {
|
|
"brands": brands if isinstance(brands, list) else [],
|
|
"devices": devices if isinstance(devices, list) else [],
|
|
}
|
|
|
|
|
|
def write_manual_catalog(payload: dict[str, object]) -> dict[str, object]:
|
|
catalog = {
|
|
"brands": payload.get("brands") if isinstance(payload.get("brands"), list) else [],
|
|
"devices": payload.get("devices") if isinstance(payload.get("devices"), list) else [],
|
|
}
|
|
MANUAL_CATALOG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
MANUAL_CATALOG_PATH.write_text(json.dumps(catalog, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
return catalog
|
|
|
|
|
|
def canonical_manual_brand(raw: object) -> dict[str, object]:
|
|
if not isinstance(raw, dict):
|
|
raise RuntimeError("品牌数据格式无效。")
|
|
name = str(raw.get("name") or "").strip()
|
|
if not name:
|
|
raise RuntimeError("品牌名称不能为空。")
|
|
parent_brand = str(raw.get("parent_brand") or name).strip() or name
|
|
aliases = normalize_alias_list(name, raw.get("aliases"))
|
|
now = local_now().isoformat(timespec="seconds")
|
|
return {
|
|
"name": name,
|
|
"parent_brand": parent_brand,
|
|
"aliases": aliases,
|
|
"updated_at": str(raw.get("updated_at") or now),
|
|
"created_at": str(raw.get("created_at") or now),
|
|
}
|
|
|
|
|
|
def canonical_manual_device(raw: object, allowed_brands: set[str]) -> dict[str, object]:
|
|
if not isinstance(raw, dict):
|
|
raise RuntimeError("设备数据格式无效。")
|
|
brand = str(raw.get("brand") or "").strip()
|
|
if not brand:
|
|
raise RuntimeError("所属品牌不能为空。")
|
|
if brand not in allowed_brands:
|
|
raise RuntimeError(f"所属品牌不存在: {brand}")
|
|
device_name = str(raw.get("device_name") or "").strip()
|
|
if not device_name:
|
|
raise RuntimeError("设备名称不能为空。")
|
|
device_type = str(raw.get("device_type") or "").strip().lower() or "other"
|
|
if device_type not in DEVICE_TYPES:
|
|
raise RuntimeError("设备类型无效。")
|
|
models = normalize_alias_list(raw.get("models"))
|
|
if not models:
|
|
raise RuntimeError("设备标识至少保留一个。")
|
|
aliases = normalize_alias_list(device_name, raw.get("aliases"))
|
|
section = str(raw.get("section") or "手动补录").strip() or "手动补录"
|
|
stable_id = f"manual:{normalize_text(brand)}:{normalize_text(device_name)}"
|
|
now = local_now().isoformat(timespec="seconds")
|
|
return {
|
|
"id": str(raw.get("id") or stable_id).strip() or stable_id,
|
|
"brand": brand,
|
|
"device_name": device_name,
|
|
"device_type": device_type,
|
|
"models": models,
|
|
"aliases": aliases,
|
|
"section": section,
|
|
"updated_at": str(raw.get("updated_at") or now),
|
|
"created_at": str(raw.get("created_at") or now),
|
|
}
|
|
|
|
|
|
def validate_manual_catalog(payload: dict[str, object]) -> dict[str, object]:
|
|
brands_raw = payload.get("brands") if isinstance(payload.get("brands"), list) else []
|
|
devices_raw = payload.get("devices") if isinstance(payload.get("devices"), list) else []
|
|
|
|
brands: list[dict[str, object]] = []
|
|
seen_brand_keys: set[str] = set()
|
|
for raw_brand in brands_raw:
|
|
brand = canonical_manual_brand(raw_brand)
|
|
brand_key = normalize_text(str(brand["name"]))
|
|
if brand_key in seen_brand_keys:
|
|
raise RuntimeError(f"品牌重复: {brand['name']}")
|
|
seen_brand_keys.add(brand_key)
|
|
brands.append(brand)
|
|
|
|
builtin_brands = load_builtin_brand_names()
|
|
allowed_brands = builtin_brands | {str(item["name"]) for item in brands}
|
|
devices: list[dict[str, object]] = []
|
|
seen_device_ids: set[str] = set()
|
|
for raw_device in devices_raw:
|
|
device = canonical_manual_device(raw_device, allowed_brands)
|
|
if device["id"] in seen_device_ids:
|
|
raise RuntimeError(f"设备 ID 重复: {device['id']}")
|
|
seen_device_ids.add(str(device["id"]))
|
|
devices.append(device)
|
|
|
|
return {"brands": brands, "devices": devices}
|
|
|
|
|
|
def load_builtin_brand_names() -> set[str]:
|
|
records = build_records(WORKSPACE_ROOT)
|
|
names = {
|
|
str(record.brand).strip()
|
|
for record in records
|
|
if str(record.source_file) != "local/manual_catalog.json"
|
|
}
|
|
names.update(
|
|
str(record.market_brand).strip()
|
|
for record in records
|
|
if str(record.source_file) != "local/manual_catalog.json"
|
|
)
|
|
names.update(
|
|
str(record.parent_brand).strip()
|
|
for record in records
|
|
if str(record.source_file) != "local/manual_catalog.json"
|
|
)
|
|
return {name for name in names if name}
|
|
|
|
|
|
def count_manual_alias_conflicts(device: dict[str, object]) -> int:
|
|
records = build_records(WORKSPACE_ROOT)
|
|
alias_set = {
|
|
normalize_text(alias)
|
|
for alias in normalize_alias_list(device.get("models"), device.get("aliases"), device.get("device_name"))
|
|
}
|
|
conflicts: set[str] = set()
|
|
for record in records:
|
|
if str(record.source_file) == "local/manual_catalog.json":
|
|
continue
|
|
for alias in getattr(record, "aliases", []):
|
|
alias_norm = normalize_text(str(alias))
|
|
if alias_norm and alias_norm in alias_set:
|
|
conflicts.add(alias_norm)
|
|
return len(conflicts)
|
|
|
|
|
|
def _manual_mysql_loader_task() -> None:
|
|
with MANUAL_REBUILD_LOCK:
|
|
LAST_MANUAL_MYSQL_LOAD_STATUS["running"] = True
|
|
LAST_MANUAL_MYSQL_LOAD_STATUS["last_started_at"] = local_now().isoformat(timespec="seconds")
|
|
LAST_MANUAL_MYSQL_LOAD_STATUS["last_status"] = "running"
|
|
LAST_MANUAL_MYSQL_LOAD_STATUS["last_message"] = "手动补录触发的 MySQL 刷新进行中。"
|
|
try:
|
|
load_proc = run_command(["python3", str(MYSQL_LOADER)])
|
|
message = "\n".join(part for part in [load_proc.stdout.strip(), load_proc.stderr.strip()] if part).strip() or "MySQL 已刷新。"
|
|
LAST_MANUAL_MYSQL_LOAD_STATUS["last_status"] = "success" if load_proc.returncode == 0 else "failed"
|
|
LAST_MANUAL_MYSQL_LOAD_STATUS["last_message"] = message
|
|
except Exception as err:
|
|
LAST_MANUAL_MYSQL_LOAD_STATUS["last_status"] = "failed"
|
|
LAST_MANUAL_MYSQL_LOAD_STATUS["last_message"] = str(err)
|
|
finally:
|
|
LAST_MANUAL_MYSQL_LOAD_STATUS["running"] = False
|
|
LAST_MANUAL_MYSQL_LOAD_STATUS["last_finished_at"] = local_now().isoformat(timespec="seconds")
|
|
|
|
|
|
def start_manual_mysql_loader() -> bool:
|
|
if LAST_MANUAL_MYSQL_LOAD_STATUS.get("running"):
|
|
return False
|
|
thread = threading.Thread(target=_manual_mysql_loader_task, name="manual-mysql-loader", daemon=True)
|
|
thread.start()
|
|
return True
|
|
|
|
|
|
def rebuild_generated_outputs(*, defer_mysql_load: bool = False) -> dict[str, object]:
|
|
build_proc = run_command(
|
|
[
|
|
"python3",
|
|
str(PROJECT_ROOT / "tools/device_mapper.py"),
|
|
"--repo-root",
|
|
str(WORKSPACE_ROOT),
|
|
"build",
|
|
"--output",
|
|
str(INDEX_PATH),
|
|
]
|
|
)
|
|
build_output = "\n".join(part for part in [build_proc.stdout.strip(), build_proc.stderr.strip()] if part).strip()
|
|
if build_proc.returncode != 0:
|
|
raise RuntimeError(build_output or "重建设备索引失败。")
|
|
|
|
seed_proc = run_command(
|
|
[
|
|
"python3",
|
|
str(PROJECT_ROOT / "tools/export_mysql_seed.py"),
|
|
"--repo-root",
|
|
str(WORKSPACE_ROOT),
|
|
"--output",
|
|
str(MYSQL_SEED_PATH),
|
|
]
|
|
)
|
|
seed_output = "\n".join(part for part in [seed_proc.stdout.strip(), seed_proc.stderr.strip()] if part).strip()
|
|
if seed_proc.returncode != 0:
|
|
raise RuntimeError(seed_output or "导出 MySQL seed 失败。")
|
|
|
|
mysql_loaded = False
|
|
mysql_message = "MySQL 未刷新。"
|
|
if mysql_auto_load_enabled():
|
|
if defer_mysql_load:
|
|
started = start_manual_mysql_loader()
|
|
mysql_message = "MySQL 后台刷新中。" if started else "MySQL 后台刷新已在进行中。"
|
|
else:
|
|
load_proc = run_command(["python3", str(MYSQL_LOADER)])
|
|
mysql_message = "\n".join(part for part in [load_proc.stdout.strip(), load_proc.stderr.strip()] if part).strip() or "MySQL 已刷新。"
|
|
if load_proc.returncode != 0:
|
|
raise RuntimeError(mysql_message)
|
|
mysql_loaded = True
|
|
return {
|
|
"index_updated": True,
|
|
"mysql_seed_updated": True,
|
|
"mysql_loaded": mysql_loaded,
|
|
"message": (
|
|
"本地覆盖库已保存,索引与 MySQL seed 已刷新,MySQL 正在后台刷新。"
|
|
if defer_mysql_load and mysql_auto_load_enabled()
|
|
else "本地覆盖库已保存,索引与 MySQL seed 已刷新。"
|
|
if mysql_loaded
|
|
else "本地覆盖库已保存,索引与 MySQL seed 已刷新,MySQL 未自动装载。"
|
|
),
|
|
"build_output": build_output,
|
|
"mysql_seed_output": seed_output,
|
|
"mysql_message": mysql_message,
|
|
}
|
|
|
|
|
|
def manual_catalog_payload() -> dict[str, object]:
|
|
catalog = validate_manual_catalog(read_manual_catalog())
|
|
brands = sorted(catalog["brands"], key=lambda item: str(item["name"]).lower())
|
|
devices = sorted(catalog["devices"], key=lambda item: (str(item["brand"]).lower(), str(item["device_name"]).lower()))
|
|
return {
|
|
"brands": brands,
|
|
"devices": devices,
|
|
"stats": {
|
|
"brand_count": len(brands),
|
|
"device_count": len(devices),
|
|
},
|
|
"catalog_file": str(MANUAL_CATALOG_PATH.relative_to(PROJECT_ROOT)),
|
|
"mysql_refresh": {
|
|
"running": bool(LAST_MANUAL_MYSQL_LOAD_STATUS.get("running")),
|
|
"last_started_at": LAST_MANUAL_MYSQL_LOAD_STATUS.get("last_started_at"),
|
|
"last_finished_at": LAST_MANUAL_MYSQL_LOAD_STATUS.get("last_finished_at"),
|
|
"last_status": LAST_MANUAL_MYSQL_LOAD_STATUS.get("last_status"),
|
|
"last_message": LAST_MANUAL_MYSQL_LOAD_STATUS.get("last_message"),
|
|
},
|
|
}
|
|
|
|
|
|
def upsert_manual_brand(payload: dict[str, object]) -> dict[str, object]:
|
|
if not SYNC_LOCK.acquire(blocking=False):
|
|
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
|
|
try:
|
|
catalog = validate_manual_catalog(read_manual_catalog())
|
|
incoming = canonical_manual_brand(payload)
|
|
brand_key = normalize_text(str(incoming["name"]))
|
|
existing_by_key = {normalize_text(str(item["name"])): item for item in catalog["brands"]}
|
|
existing = existing_by_key.get(brand_key)
|
|
if existing:
|
|
incoming["created_at"] = existing.get("created_at") or incoming["created_at"]
|
|
catalog["brands"] = [
|
|
item for item in catalog["brands"] if normalize_text(str(item["name"])) != brand_key
|
|
]
|
|
catalog["brands"].append(incoming)
|
|
validated = validate_manual_catalog(catalog)
|
|
write_manual_catalog(validated)
|
|
rebuild_result = rebuild_generated_outputs(defer_mysql_load=True)
|
|
return {
|
|
"saved_brand": incoming,
|
|
"catalog": manual_catalog_payload(),
|
|
**rebuild_result,
|
|
}
|
|
finally:
|
|
SYNC_LOCK.release()
|
|
|
|
|
|
def upsert_manual_device(payload: dict[str, object]) -> dict[str, object]:
|
|
if not SYNC_LOCK.acquire(blocking=False):
|
|
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
|
|
try:
|
|
catalog = validate_manual_catalog(read_manual_catalog())
|
|
allowed_brands = load_builtin_brand_names() | {str(item["name"]) for item in catalog["brands"]}
|
|
device_payload = canonical_manual_device(payload, allowed_brands)
|
|
existing = next((item for item in catalog["devices"] if str(item.get("id")) == str(device_payload["id"])), None)
|
|
if existing:
|
|
device_payload["created_at"] = existing.get("created_at") or device_payload["created_at"]
|
|
catalog["devices"] = [item for item in catalog["devices"] if str(item.get("id")) != str(device_payload["id"])]
|
|
catalog["devices"].append(device_payload)
|
|
validated = validate_manual_catalog(catalog)
|
|
write_manual_catalog(validated)
|
|
rebuild_result = rebuild_generated_outputs(defer_mysql_load=True)
|
|
return {
|
|
"saved_device": device_payload,
|
|
"alias_conflict_count": count_manual_alias_conflicts(device_payload),
|
|
"catalog": manual_catalog_payload(),
|
|
**rebuild_result,
|
|
}
|
|
finally:
|
|
SYNC_LOCK.release()
|
|
|
|
|
|
def delete_manual_brand(payload: dict[str, object]) -> dict[str, object]:
|
|
if not SYNC_LOCK.acquire(blocking=False):
|
|
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
|
|
try:
|
|
brand_name = str(payload.get("name") or "").strip()
|
|
if not brand_name:
|
|
raise RuntimeError("品牌名称不能为空。")
|
|
catalog = validate_manual_catalog(read_manual_catalog())
|
|
active_devices = [item for item in catalog["devices"] if str(item.get("brand") or "").strip() == brand_name]
|
|
if active_devices:
|
|
raise RuntimeError("该品牌下仍有关联设备,请先删除设备后再删除品牌。")
|
|
next_brands = [item for item in catalog["brands"] if str(item.get("name") or "").strip() != brand_name]
|
|
if len(next_brands) == len(catalog["brands"]):
|
|
raise RuntimeError(f"未找到品牌: {brand_name}")
|
|
write_manual_catalog({"brands": next_brands, "devices": catalog["devices"]})
|
|
rebuild_result = rebuild_generated_outputs(defer_mysql_load=True)
|
|
return {
|
|
"deleted_brand": brand_name,
|
|
"catalog": manual_catalog_payload(),
|
|
**rebuild_result,
|
|
}
|
|
finally:
|
|
SYNC_LOCK.release()
|
|
|
|
|
|
def delete_manual_device(payload: dict[str, object]) -> dict[str, object]:
|
|
if not SYNC_LOCK.acquire(blocking=False):
|
|
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
|
|
try:
|
|
device_id = str(payload.get("id") or "").strip()
|
|
if not device_id:
|
|
raise RuntimeError("设备 ID 不能为空。")
|
|
catalog = validate_manual_catalog(read_manual_catalog())
|
|
next_devices = [item for item in catalog["devices"] if str(item.get("id") or "").strip() != device_id]
|
|
if len(next_devices) == len(catalog["devices"]):
|
|
raise RuntimeError(f"未找到设备: {device_id}")
|
|
write_manual_catalog({"brands": catalog["brands"], "devices": next_devices})
|
|
rebuild_result = rebuild_generated_outputs(defer_mysql_load=True)
|
|
return {
|
|
"deleted_device": device_id,
|
|
"catalog": manual_catalog_payload(),
|
|
**rebuild_result,
|
|
}
|
|
finally:
|
|
SYNC_LOCK.release()
|
|
|
|
|
|
def parse_apple_series_generation(name: str) -> dict[str, object] | None:
|
|
text = re.sub(r"\s+", " ", str(name or "").strip())
|
|
if not text:
|
|
return None
|
|
|
|
ordinal_match = re.match(r"^(.*?)(?:\s*\((\d+)(?:st|nd|rd|th)\s+generation\)|\s+(\d+))$", text, re.IGNORECASE)
|
|
if ordinal_match:
|
|
base_label = re.sub(r"\s+", " ", str(ordinal_match.group(1) or "").strip())
|
|
generation = int(ordinal_match.group(2) or ordinal_match.group(3) or "0")
|
|
if base_label and generation > 0:
|
|
return {
|
|
"base_label": base_label,
|
|
"base_norm": normalize_text(base_label),
|
|
"generation": generation,
|
|
"chip_like": False,
|
|
}
|
|
|
|
chip_like = bool(re.search(r"\((?:[^)]*\b(?:a\d{1,2}|m\d{1,2})\b[^)]*)\)$", text, re.IGNORECASE))
|
|
base_label = re.sub(r"\s*\([^)]*\)\s*$", "", text).strip()
|
|
if not base_label:
|
|
return None
|
|
|
|
return {
|
|
"base_label": base_label,
|
|
"base_norm": normalize_text(base_label),
|
|
"generation": None,
|
|
"chip_like": chip_like,
|
|
}
|
|
|
|
|
|
def build_index_device_name_alias_map() -> dict[str, list[str]]:
|
|
if not INDEX_PATH.exists():
|
|
return {}
|
|
try:
|
|
payload = json.loads(INDEX_PATH.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
|
|
records = payload.get("records") if isinstance(payload, dict) else None
|
|
if not isinstance(records, list):
|
|
return {}
|
|
|
|
grouped: dict[str, dict[str, object]] = {}
|
|
for record in records:
|
|
if not isinstance(record, dict):
|
|
continue
|
|
brand = str(record.get("market_brand") or record.get("manufacturer_brand") or record.get("brand") or "").strip()
|
|
if brand != "Apple":
|
|
continue
|
|
device_name = str(record.get("device_name") or "").strip()
|
|
parsed = parse_apple_series_generation(device_name)
|
|
if not parsed or not parsed.get("base_norm"):
|
|
continue
|
|
base_norm = str(parsed["base_norm"])
|
|
group = grouped.setdefault(
|
|
base_norm,
|
|
{
|
|
"base_label": str(parsed["base_label"]),
|
|
"items": [],
|
|
},
|
|
)
|
|
items = group["items"]
|
|
if isinstance(items, list):
|
|
items.append(
|
|
{
|
|
"device_name": device_name,
|
|
"generation": parsed.get("generation"),
|
|
"chip_like": bool(parsed.get("chip_like")),
|
|
}
|
|
)
|
|
|
|
alias_map: dict[str, set[str]] = {}
|
|
for group in grouped.values():
|
|
items = group.get("items")
|
|
if not isinstance(items, list):
|
|
continue
|
|
explicit_generations = sorted(
|
|
{
|
|
int(item["generation"])
|
|
for item in items
|
|
if isinstance(item, dict) and isinstance(item.get("generation"), int) and int(item["generation"]) > 0
|
|
}
|
|
)
|
|
max_explicit_generation = explicit_generations[-1] if explicit_generations else 0
|
|
base_label = str(group.get("base_label") or "").strip()
|
|
if not base_label:
|
|
continue
|
|
|
|
for item in items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
generation = item.get("generation")
|
|
device_name = str(item.get("device_name") or "").strip()
|
|
if not isinstance(generation, int):
|
|
if device_name == base_label and max_explicit_generation >= 2:
|
|
generation = 1
|
|
elif item.get("chip_like") and max_explicit_generation >= 1:
|
|
generation = max_explicit_generation + 1
|
|
if not isinstance(generation, int) or generation <= 0:
|
|
continue
|
|
alias_key = normalize_text(f"{base_label} {generation}")
|
|
if not alias_key:
|
|
continue
|
|
alias_map.setdefault(alias_key, set()).add(device_name)
|
|
|
|
return {key: sorted(values) for key, values in alias_map.items()}
|
|
|
|
|
|
def resolve_index_device_names(alias_norm: str) -> list[str]:
|
|
global INDEX_DEVICE_NAME_ALIAS_MAP
|
|
if INDEX_DEVICE_NAME_ALIAS_MAP is None:
|
|
with INDEX_ALIAS_LOCK:
|
|
if INDEX_DEVICE_NAME_ALIAS_MAP is None:
|
|
INDEX_DEVICE_NAME_ALIAS_MAP = build_index_device_name_alias_map()
|
|
return list((INDEX_DEVICE_NAME_ALIAS_MAP or {}).get(alias_norm, []))
|
|
|
|
|
|
def mysql_command(database: str | None = None) -> list[str]:
|
|
command = [
|
|
"mysql",
|
|
f"--host={os.environ.get('MYSQL_HOST', 'mysql')}",
|
|
f"--port={os.environ.get('MYSQL_PORT', '3306')}",
|
|
f"--user={os.environ.get('MYSQL_READER_USER', '')}",
|
|
"--protocol=TCP",
|
|
"--default-character-set=utf8mb4",
|
|
"--batch",
|
|
"--raw",
|
|
]
|
|
if database:
|
|
command.append(database)
|
|
return command
|
|
|
|
|
|
def mysql_env() -> dict[str, str]:
|
|
env = os.environ.copy()
|
|
env["MYSQL_PWD"] = os.environ.get("MYSQL_READER_PASSWORD", "")
|
|
return env
|
|
|
|
|
|
def run_mysql_query(sql: str, database: str | None = None) -> list[dict[str, str | None]]:
|
|
proc = subprocess.run(
|
|
mysql_command(database=database),
|
|
env=mysql_env(),
|
|
input=sql,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
if proc.returncode != 0:
|
|
message = sanitize_mysql_message(
|
|
proc.stderr.strip() or proc.stdout.strip() or f"mysql exited with {proc.returncode}"
|
|
)
|
|
raise RuntimeError(message)
|
|
|
|
lines = [line for line in proc.stdout.splitlines() if line.strip()]
|
|
if not lines:
|
|
return []
|
|
|
|
headers = lines[0].split("\t")
|
|
rows: list[dict[str, str | None]] = []
|
|
for line in lines[1:]:
|
|
values = line.split("\t")
|
|
row = {}
|
|
for idx, header in enumerate(headers):
|
|
value = values[idx] if idx < len(values) else ""
|
|
row[header] = None if value == "NULL" else value
|
|
rows.append(row)
|
|
return rows
|
|
|
|
|
|
def build_sql_query_payload(payload: dict[str, object]) -> dict[str, object]:
|
|
raw_value = str(payload.get("model_raw") or payload.get("model") or "").strip()
|
|
if not raw_value:
|
|
raise RuntimeError("请填写设备标识或设备名称。")
|
|
|
|
alias_norm = normalize_text(raw_value)
|
|
if not alias_norm:
|
|
raise RuntimeError("设备标识无法归一化,请检查输入。")
|
|
|
|
limit_value = payload.get("limit", 20)
|
|
try:
|
|
limit = int(limit_value)
|
|
except Exception as err:
|
|
raise RuntimeError("limit 必须是数字。") from err
|
|
limit = max(1, min(limit, 100))
|
|
|
|
exact_sql = f"""
|
|
SELECT
|
|
model,
|
|
record_id,
|
|
alias_norm,
|
|
device_name,
|
|
brand,
|
|
manufacturer_brand,
|
|
parent_brand,
|
|
market_brand,
|
|
device_type,
|
|
source_file,
|
|
section,
|
|
source_rank,
|
|
source_weight,
|
|
code,
|
|
code_alias,
|
|
ver_name
|
|
FROM mobilemodels.mm_device_catalog
|
|
WHERE alias_norm = '{sql_string(alias_norm)}'
|
|
ORDER BY source_rank ASC, record_id ASC
|
|
LIMIT {limit};
|
|
""".strip()
|
|
|
|
rows = run_mysql_query(exact_sql)
|
|
sql = exact_sql
|
|
match_strategy = "alias_norm_exact"
|
|
match_strategy_label = "alias_norm 精确匹配"
|
|
resolved_device_names: list[str] = []
|
|
|
|
if not rows:
|
|
resolved_device_names = resolve_index_device_names(alias_norm)
|
|
if resolved_device_names:
|
|
name_conditions = " OR ".join(
|
|
f"device_name = '{sql_string(device_name)}'" for device_name in resolved_device_names
|
|
)
|
|
sql = f"""
|
|
SELECT
|
|
model,
|
|
record_id,
|
|
alias_norm,
|
|
device_name,
|
|
brand,
|
|
manufacturer_brand,
|
|
parent_brand,
|
|
market_brand,
|
|
device_type,
|
|
source_file,
|
|
section,
|
|
source_rank,
|
|
source_weight,
|
|
code,
|
|
code_alias,
|
|
ver_name
|
|
FROM mobilemodels.mm_device_catalog
|
|
WHERE {name_conditions}
|
|
ORDER BY source_rank ASC, record_id ASC
|
|
LIMIT {limit};
|
|
""".strip()
|
|
rows = run_mysql_query(sql)
|
|
if rows:
|
|
match_strategy = "device_name_alias"
|
|
match_strategy_label = "设备名称别名映射"
|
|
|
|
return {
|
|
"query_mode": "sql",
|
|
"model_raw": raw_value,
|
|
"alias_norm": alias_norm,
|
|
"limit": limit,
|
|
"sql": sql,
|
|
"match_strategy": match_strategy,
|
|
"match_strategy_label": match_strategy_label,
|
|
"resolved_device_names": resolved_device_names,
|
|
"rows": rows,
|
|
"row_count": len(rows),
|
|
}
|
|
|
|
|
|
def build_sql_device_name_query_payload(payload: dict[str, object]) -> dict[str, object]:
|
|
raw_value = str(payload.get("device_name") or payload.get("name") or "").strip()
|
|
if not raw_value:
|
|
raise RuntimeError("请填写设备名称。")
|
|
|
|
limit_value = payload.get("limit", 20)
|
|
try:
|
|
limit = int(limit_value)
|
|
except Exception as err:
|
|
raise RuntimeError("limit 必须是数字。") from err
|
|
limit = max(1, min(limit, 100))
|
|
|
|
alias_norm = normalize_text(raw_value)
|
|
resolved_device_names = resolve_index_device_names(alias_norm) if alias_norm else []
|
|
|
|
rows: list[dict[str, str | None]] = []
|
|
match_strategy = "device_name_like"
|
|
match_strategy_label = "device_name 模糊匹配"
|
|
|
|
if resolved_device_names:
|
|
name_conditions = " OR ".join(
|
|
f"device_name = '{sql_string(device_name)}'" for device_name in resolved_device_names
|
|
)
|
|
sql = f"""
|
|
SELECT
|
|
model,
|
|
record_id,
|
|
alias_norm,
|
|
device_name,
|
|
brand,
|
|
manufacturer_brand,
|
|
parent_brand,
|
|
market_brand,
|
|
device_type,
|
|
source_file,
|
|
section,
|
|
source_rank,
|
|
source_weight,
|
|
code,
|
|
code_alias,
|
|
ver_name
|
|
FROM mobilemodels.mm_device_catalog
|
|
WHERE {name_conditions}
|
|
ORDER BY source_rank ASC, record_id ASC
|
|
LIMIT {limit};
|
|
""".strip()
|
|
rows = run_mysql_query(sql)
|
|
if rows:
|
|
match_strategy = "device_name_alias"
|
|
match_strategy_label = "设备名称别名映射"
|
|
else:
|
|
sql = ""
|
|
|
|
if not rows:
|
|
sql = f"""
|
|
SELECT
|
|
model,
|
|
record_id,
|
|
alias_norm,
|
|
device_name,
|
|
brand,
|
|
manufacturer_brand,
|
|
parent_brand,
|
|
market_brand,
|
|
device_type,
|
|
source_file,
|
|
section,
|
|
source_rank,
|
|
source_weight,
|
|
code,
|
|
code_alias,
|
|
ver_name
|
|
FROM mobilemodels.mm_device_catalog
|
|
WHERE device_name LIKE '%{sql_string(raw_value)}%'
|
|
OR ver_name LIKE '%{sql_string(raw_value)}%'
|
|
ORDER BY source_rank ASC, record_id ASC
|
|
LIMIT {limit};
|
|
""".strip()
|
|
rows = run_mysql_query(sql)
|
|
|
|
return {
|
|
"query_mode": "sql_device_name",
|
|
"device_name": raw_value,
|
|
"alias_norm": alias_norm,
|
|
"limit": limit,
|
|
"sql": sql,
|
|
"match_strategy": match_strategy,
|
|
"match_strategy_label": match_strategy_label,
|
|
"resolved_device_names": resolved_device_names,
|
|
"rows": rows,
|
|
"row_count": len(rows),
|
|
}
|
|
|
|
|
|
def read_sync_metadata() -> dict[str, object]:
|
|
if not SYNC_METADATA_PATH.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(SYNC_METADATA_PATH.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def write_sync_metadata(payload: dict[str, object]) -> None:
|
|
SYNC_METADATA_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
SYNC_METADATA_PATH.write_text(
|
|
json.dumps(payload, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def get_status_payload() -> dict[str, object]:
|
|
index_mtime = None
|
|
mysql_seed_mtime = None
|
|
if INDEX_PATH.exists():
|
|
index_mtime = datetime.fromtimestamp(INDEX_PATH.stat().st_mtime).isoformat(timespec="seconds")
|
|
if MYSQL_SEED_PATH.exists():
|
|
mysql_seed_mtime = datetime.fromtimestamp(MYSQL_SEED_PATH.stat().st_mtime).isoformat(timespec="seconds")
|
|
|
|
mysql_host = os.environ.get("MYSQL_HOST", "mysql")
|
|
mysql_port = os.environ.get("MYSQL_PORT", "3306")
|
|
mysql_database = os.environ.get("MYSQL_DATABASE", "mobilemodels")
|
|
mysql_reader_user = os.environ.get("MYSQL_READER_USER", "")
|
|
mysql_reader_password = os.environ.get("MYSQL_READER_PASSWORD", "")
|
|
mysql_config = read_mysql_config()
|
|
mysql_auto_load = bool(mysql_config.get("auto_load"))
|
|
mysql_ready = False
|
|
mysql_status = ""
|
|
sync_metadata = read_sync_metadata()
|
|
schedule_config = read_schedule_config()
|
|
github_proxy_prefix = str(schedule_config.get("github_proxy_prefix") or "")
|
|
effective_repo_url = get_effective_repo_url(github_proxy_prefix)
|
|
probe_user, probe_password = mysql_probe_credentials()
|
|
mysql_proc = run_command(
|
|
[
|
|
"python3",
|
|
str(MYSQL_LOADER),
|
|
"--check-only",
|
|
"--wait-timeout",
|
|
"5",
|
|
f"--user={probe_user}",
|
|
f"--password={probe_password}",
|
|
]
|
|
)
|
|
if mysql_proc.returncode == 0:
|
|
mysql_ready = True
|
|
mysql_status = mysql_proc.stdout.strip() or "MySQL ready"
|
|
if not mysql_auto_load:
|
|
mysql_status = f"{mysql_status}; auto load disabled"
|
|
else:
|
|
failure_message = sanitize_mysql_message(
|
|
mysql_proc.stderr.strip() or mysql_proc.stdout.strip() or "MySQL unavailable",
|
|
host=mysql_host,
|
|
port=mysql_port,
|
|
)
|
|
mysql_status = failure_message if mysql_auto_load else f"{failure_message}; auto load disabled"
|
|
|
|
return {
|
|
"supports_upstream_sync": True,
|
|
"storage_mode": "docker_volume",
|
|
"project_root": str(PROJECT_ROOT),
|
|
"workspace_root": str(WORKSPACE_ROOT),
|
|
"data_root": str(DATA_ROOT),
|
|
"mysql_auto_load": mysql_auto_load,
|
|
"mysql_config_file": str(MYSQL_CONFIG_PATH.relative_to(DATA_ROOT)),
|
|
"mysql_config_updated_at": mysql_config.get("updated_at"),
|
|
"upstream_repo_url": DEFAULT_REPO_URL,
|
|
"effective_upstream_repo_url": effective_repo_url,
|
|
"upstream_branch": DEFAULT_BRANCH,
|
|
"last_sync_time": sync_metadata.get("last_sync_time"),
|
|
"last_upstream_commit": sync_metadata.get("last_upstream_commit"),
|
|
"last_sync_trigger": sync_metadata.get("last_trigger_source"),
|
|
"index_file": str(INDEX_PATH.relative_to(PROJECT_ROOT)),
|
|
"index_mtime": index_mtime,
|
|
"mysql_seed_file": str(MYSQL_SEED_PATH.relative_to(PROJECT_ROOT)),
|
|
"mysql_seed_mtime": mysql_seed_mtime,
|
|
"sync_schedule_file": str(SCHEDULE_CONFIG_PATH.relative_to(DATA_ROOT)),
|
|
"sync_schedule_enabled": schedule_config.get("enabled"),
|
|
"sync_schedule_time": schedule_config.get("daily_time"),
|
|
"sync_schedule_timezone": schedule_config.get("timezone"),
|
|
"github_proxy_prefix": github_proxy_prefix,
|
|
"sync_schedule_next_run": schedule_config.get("next_run_at"),
|
|
"sync_schedule_last_run_time": schedule_config.get("last_run_time"),
|
|
"sync_schedule_last_run_status": schedule_config.get("last_run_status"),
|
|
"sync_schedule_last_run_message": schedule_config.get("last_run_message"),
|
|
"mysql_host": mysql_host,
|
|
"mysql_port": mysql_port,
|
|
"mysql_database": mysql_database,
|
|
"mysql_reader_user": mysql_reader_user,
|
|
"mysql_reader_password": mysql_reader_password,
|
|
"mysql_ready": mysql_ready,
|
|
"mysql_status": mysql_status,
|
|
}
|
|
|
|
|
|
def run_upstream_sync(trigger_source: str = "manual") -> dict[str, object]:
|
|
if not SYNC_LOCK.acquire(blocking=False):
|
|
raise RuntimeError("已有同步任务在执行,请稍后再试。")
|
|
|
|
try:
|
|
schedule_config = read_schedule_config()
|
|
github_proxy_prefix = str(schedule_config.get("github_proxy_prefix") or "")
|
|
effective_repo_url = get_effective_repo_url(github_proxy_prefix)
|
|
upstream_proc = run_command(
|
|
["git", "ls-remote", effective_repo_url, f"refs/heads/{DEFAULT_BRANCH}"]
|
|
)
|
|
upstream_commit = ""
|
|
if upstream_proc.returncode == 0 and upstream_proc.stdout.strip():
|
|
upstream_commit = upstream_proc.stdout.split()[0]
|
|
|
|
command = [
|
|
"python3",
|
|
str(SYNC_SCRIPT),
|
|
f"--repo-url={effective_repo_url}",
|
|
"--build-index",
|
|
"--export-mysql-seed",
|
|
]
|
|
if mysql_auto_load_enabled():
|
|
command.append("--load-mysql")
|
|
proc = run_command(command)
|
|
output = "\n".join(
|
|
part for part in [proc.stdout.strip(), proc.stderr.strip()] if part
|
|
).strip()
|
|
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(output or f"sync script failed with exit code {proc.returncode}")
|
|
|
|
payload = {
|
|
"storage_mode": "docker_volume",
|
|
"project_root": str(PROJECT_ROOT),
|
|
"workspace_root": str(WORKSPACE_ROOT),
|
|
"data_root": str(DATA_ROOT),
|
|
"upstream_repo_url": DEFAULT_REPO_URL,
|
|
"effective_upstream_repo_url": effective_repo_url,
|
|
"github_proxy_prefix": github_proxy_prefix,
|
|
"upstream_branch": DEFAULT_BRANCH,
|
|
"upstream_commit": upstream_commit,
|
|
"trigger_source": trigger_source,
|
|
"last_sync_time": datetime.now().isoformat(timespec="seconds"),
|
|
"last_upstream_commit": upstream_commit,
|
|
"index_file": str(INDEX_PATH.relative_to(PROJECT_ROOT)),
|
|
"index_mtime": datetime.fromtimestamp(INDEX_PATH.stat().st_mtime).isoformat(timespec="seconds")
|
|
if INDEX_PATH.exists()
|
|
else None,
|
|
"mysql_seed_file": str(MYSQL_SEED_PATH.relative_to(PROJECT_ROOT)),
|
|
"mysql_seed_mtime": datetime.fromtimestamp(MYSQL_SEED_PATH.stat().st_mtime).isoformat(timespec="seconds")
|
|
if MYSQL_SEED_PATH.exists()
|
|
else None,
|
|
"output": output or "同步脚本执行完成。",
|
|
}
|
|
write_sync_metadata({
|
|
"last_sync_time": payload["last_sync_time"],
|
|
"last_upstream_commit": payload["last_upstream_commit"],
|
|
"last_trigger_source": trigger_source,
|
|
"upstream_repo_url": DEFAULT_REPO_URL,
|
|
"effective_upstream_repo_url": effective_repo_url,
|
|
"github_proxy_prefix": github_proxy_prefix,
|
|
"upstream_branch": DEFAULT_BRANCH,
|
|
})
|
|
return payload
|
|
finally:
|
|
SYNC_LOCK.release()
|
|
|
|
|
|
def run_mysql_init(trigger_source: str = "manual") -> dict[str, object]:
|
|
if not SYNC_LOCK.acquire(blocking=False):
|
|
raise RuntimeError("已有同步或 MySQL 初始化任务在执行,请稍后再试。")
|
|
|
|
try:
|
|
proc = run_command(["python3", str(MYSQL_LOADER)])
|
|
output = "\n".join(
|
|
part for part in [proc.stdout.strip(), proc.stderr.strip()] if part
|
|
).strip()
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(output or f"mysql load failed with exit code {proc.returncode}")
|
|
|
|
payload = get_status_payload()
|
|
payload.update(
|
|
{
|
|
"trigger_source": trigger_source,
|
|
"output": output or "MySQL 初始化完成。",
|
|
}
|
|
)
|
|
return payload
|
|
finally:
|
|
SYNC_LOCK.release()
|
|
|
|
|
|
def run_scheduled_sync_if_due() -> None:
|
|
schedule_config = read_schedule_config()
|
|
if not schedule_config.get("enabled"):
|
|
return
|
|
|
|
next_run_at = str(schedule_config.get("next_run_at") or "").strip()
|
|
if not next_run_at:
|
|
write_schedule_config({
|
|
**schedule_config,
|
|
"next_run_at": compute_next_run_at(str(schedule_config.get("daily_time") or "03:00")),
|
|
})
|
|
return
|
|
|
|
try:
|
|
next_run_dt = datetime.fromisoformat(next_run_at)
|
|
except ValueError:
|
|
write_schedule_config({
|
|
**schedule_config,
|
|
"next_run_at": compute_next_run_at(str(schedule_config.get("daily_time") or "03:00")),
|
|
})
|
|
return
|
|
|
|
if local_now() < next_run_dt:
|
|
return
|
|
|
|
try:
|
|
payload = run_upstream_sync(trigger_source="schedule")
|
|
message = str(payload.get("output") or "定时同步完成。")
|
|
mark_schedule_run("success", message)
|
|
print(f"[scheduler] upstream sync completed at {local_now().isoformat(timespec='seconds')}")
|
|
except RuntimeError as err:
|
|
status = "skipped" if "已有同步任务" in str(err) else "failed"
|
|
mark_schedule_run(status, str(err))
|
|
print(f"[scheduler] upstream sync {status}: {err}")
|
|
except Exception as err:
|
|
mark_schedule_run("failed", str(err))
|
|
print(f"[scheduler] upstream sync failed: {err}")
|
|
|
|
|
|
def scheduler_loop() -> None:
|
|
while True:
|
|
try:
|
|
run_scheduled_sync_if_due()
|
|
except Exception as err:
|
|
print(f"[scheduler] loop error: {err}")
|
|
time.sleep(SCHEDULER_POLL_SECONDS)
|
|
|
|
|
|
class MobileModelsHandler(SimpleHTTPRequestHandler):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, directory=str(PROJECT_ROOT), **kwargs)
|
|
|
|
def guess_type(self, path: str) -> str:
|
|
content_type = super().guess_type(path)
|
|
lower_path = path.lower()
|
|
if lower_path.endswith(".md"):
|
|
return "text/markdown; charset=utf-8"
|
|
if lower_path.endswith(".txt"):
|
|
return "text/plain; charset=utf-8"
|
|
if content_type.startswith("text/") and "charset=" not in content_type:
|
|
return f"{content_type}; charset=utf-8"
|
|
return content_type
|
|
|
|
def _send_json(self, payload: dict[str, object], status: int = HTTPStatus.OK) -> None:
|
|
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(data)))
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
|
|
def do_GET(self) -> None:
|
|
if self.path == "/api/status":
|
|
try:
|
|
self._send_json(get_status_payload())
|
|
except Exception as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
return
|
|
if self.path == "/api/manual-catalog":
|
|
try:
|
|
self._send_json(manual_catalog_payload())
|
|
except RuntimeError as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
|
|
except Exception as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
return
|
|
return super().do_GET()
|
|
|
|
def do_POST(self) -> None:
|
|
if self.path == "/api/sync-upstream":
|
|
try:
|
|
payload = run_upstream_sync()
|
|
self._send_json(payload)
|
|
except RuntimeError as err:
|
|
status = HTTPStatus.CONFLICT if "已有同步任务" in str(err) else HTTPStatus.INTERNAL_SERVER_ERROR
|
|
self._send_json({"error": str(err)}, status=status)
|
|
except Exception as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
return
|
|
if self.path == "/api/init-mysql":
|
|
try:
|
|
payload = run_mysql_init()
|
|
self._send_json(payload)
|
|
except RuntimeError as err:
|
|
status = HTTPStatus.CONFLICT if "已有同步或 MySQL 初始化任务" in str(err) else HTTPStatus.INTERNAL_SERVER_ERROR
|
|
self._send_json({"error": str(err)}, status=status)
|
|
except Exception as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
return
|
|
if self.path == "/api/query-sql":
|
|
try:
|
|
content_length = int(self.headers.get("Content-Length", "0") or "0")
|
|
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
|
|
req = json.loads(raw_body.decode("utf-8") or "{}")
|
|
payload = build_sql_query_payload(req if isinstance(req, dict) else {})
|
|
self._send_json(payload)
|
|
except RuntimeError as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
|
|
except Exception as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
return
|
|
if self.path == "/api/query-sql-device-name":
|
|
try:
|
|
content_length = int(self.headers.get("Content-Length", "0") or "0")
|
|
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
|
|
req = json.loads(raw_body.decode("utf-8") or "{}")
|
|
payload = build_sql_device_name_query_payload(req if isinstance(req, dict) else {})
|
|
self._send_json(payload)
|
|
except RuntimeError as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
|
|
except Exception as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
return
|
|
if self.path == "/api/sync-schedule":
|
|
try:
|
|
content_length = int(self.headers.get("Content-Length", "0") or "0")
|
|
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
|
|
req = json.loads(raw_body.decode("utf-8") or "{}")
|
|
if not isinstance(req, dict):
|
|
raise RuntimeError("请求体必须是 JSON 对象。")
|
|
schedule_config = update_schedule_config(req)
|
|
self._send_json(
|
|
{
|
|
"message": "同步设置已保存。",
|
|
"sync_schedule_enabled": schedule_config.get("enabled"),
|
|
"sync_schedule_time": schedule_config.get("daily_time"),
|
|
"sync_schedule_timezone": schedule_config.get("timezone"),
|
|
"github_proxy_prefix": schedule_config.get("github_proxy_prefix"),
|
|
"effective_upstream_repo_url": get_effective_repo_url(
|
|
str(schedule_config.get("github_proxy_prefix") or "")
|
|
),
|
|
"sync_schedule_next_run": schedule_config.get("next_run_at"),
|
|
"sync_schedule_last_run_time": schedule_config.get("last_run_time"),
|
|
"sync_schedule_last_run_status": schedule_config.get("last_run_status"),
|
|
"sync_schedule_last_run_message": schedule_config.get("last_run_message"),
|
|
}
|
|
)
|
|
except RuntimeError as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
|
|
except Exception as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
return
|
|
if self.path == "/api/mysql-settings":
|
|
try:
|
|
content_length = int(self.headers.get("Content-Length", "0") or "0")
|
|
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
|
|
req = json.loads(raw_body.decode("utf-8") or "{}")
|
|
if not isinstance(req, dict):
|
|
raise RuntimeError("请求体必须是 JSON 对象。")
|
|
mysql_config = update_mysql_config(req)
|
|
self._send_json(
|
|
{
|
|
"message": "MySQL 自动装载设置已保存。",
|
|
"mysql_auto_load": mysql_config.get("auto_load"),
|
|
"mysql_config_file": str(MYSQL_CONFIG_PATH.relative_to(DATA_ROOT)),
|
|
"mysql_config_updated_at": mysql_config.get("updated_at"),
|
|
}
|
|
)
|
|
except RuntimeError as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
|
|
except Exception as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
return
|
|
if self.path in {"/api/manual-brand", "/api/manual-device", "/api/manual-brand-delete", "/api/manual-device-delete"}:
|
|
try:
|
|
content_length = int(self.headers.get("Content-Length", "0") or "0")
|
|
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
|
|
req = json.loads(raw_body.decode("utf-8") or "{}")
|
|
if not isinstance(req, dict):
|
|
raise RuntimeError("请求体必须是 JSON 对象。")
|
|
if self.path == "/api/manual-brand":
|
|
payload = upsert_manual_brand(req)
|
|
elif self.path == "/api/manual-device":
|
|
payload = upsert_manual_device(req)
|
|
elif self.path == "/api/manual-brand-delete":
|
|
payload = delete_manual_brand(req)
|
|
else:
|
|
payload = delete_manual_device(req)
|
|
self._send_json(payload)
|
|
except RuntimeError as err:
|
|
status = HTTPStatus.CONFLICT if "已有同步或数据重建任务" in str(err) else HTTPStatus.BAD_REQUEST
|
|
self._send_json({"error": str(err)}, status=status)
|
|
except Exception as err:
|
|
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
return
|
|
|
|
self._send_json({"error": "Not found"}, status=HTTPStatus.NOT_FOUND)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Run the MobileModels web server inside Docker Compose.")
|
|
parser.add_argument("--host", default="127.0.0.1", help="Bind host")
|
|
parser.add_argument("--port", type=int, default=8123, help="Bind port")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
apply_timezone_from_env()
|
|
write_schedule_config(read_schedule_config())
|
|
write_mysql_config(read_mysql_config())
|
|
args = parse_args()
|
|
scheduler = threading.Thread(target=scheduler_loop, name="sync-scheduler", daemon=True)
|
|
scheduler.start()
|
|
server = ThreadingHTTPServer((args.host, args.port), MobileModelsHandler)
|
|
print(f"Serving MobileModels on http://{args.host}:{args.port}")
|
|
server.serve_forever()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|