Files
MobileModels/tools/web_server.py
2026-03-20 13:38:00 +08:00

794 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Compose-facing web server for MobileModels static pages and maintenance APIs."""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import threading
import time
from datetime import datetime, timedelta
from http import HTTPStatus
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from project_layout import PROJECT_ROOT, WORKSPACE_ROOT
from sync_upstream_mobilemodels import DEFAULT_BRANCH, DEFAULT_REPO_URL
SYNC_SCRIPT = PROJECT_ROOT / "tools/sync_upstream_mobilemodels.py"
INDEX_PATH = PROJECT_ROOT / "dist/device_index.json"
MYSQL_SEED_PATH = PROJECT_ROOT / "dist/mobilemodels_mysql_seed.sql"
MYSQL_LOADER = PROJECT_ROOT / "tools/load_mysql_seed.py"
DATA_ROOT = Path(os.environ.get("MOBILEMODELS_DATA_ROOT", "/data"))
SYNC_METADATA_PATH = DATA_ROOT / "state/sync_status.json"
SCHEDULE_CONFIG_PATH = DATA_ROOT / "state/sync_schedule.json"
MYSQL_CONFIG_PATH = DATA_ROOT / "state/mysql_settings.json"
SYNC_LOCK = threading.Lock()
SCHEDULE_LOCK = threading.Lock()
MYSQL_CONFIG_LOCK = threading.Lock()
NORMALIZE_RE = re.compile(r"[^0-9a-z\u4e00-\u9fff]+")
SCHEDULE_TIME_RE = re.compile(r"^(?:[01]?\d|2[0-3]):[0-5]\d$")
SCHEDULER_POLL_SECONDS = 20
def truthy_env(name: str, default: str = "0") -> bool:
return os.environ.get(name, default).strip().lower() in {"1", "true", "yes", "on"}
def apply_timezone_from_env() -> None:
if not os.environ.get("TZ"):
return
try:
time.tzset()
except AttributeError:
return
def mysql_auto_load_enabled() -> bool:
return bool(read_mysql_config().get("auto_load"))
def mysql_probe_credentials() -> tuple[str, str]:
reader_user = os.environ.get("MYSQL_READER_USER", "").strip()
reader_password = os.environ.get("MYSQL_READER_PASSWORD", "")
if reader_user:
return reader_user, reader_password
return (
os.environ.get("MYSQL_ROOT_USER", "root").strip() or "root",
os.environ.get("MYSQL_ROOT_PASSWORD", ""),
)
def local_now() -> datetime:
return datetime.now().astimezone()
def normalize_schedule_time(value: str | None, *, fallback: str = "03:00") -> str:
text = str(value or "").strip()
if not text:
text = fallback
if not SCHEDULE_TIME_RE.match(text):
if fallback and text != fallback:
return normalize_schedule_time(fallback, fallback="")
raise RuntimeError("每日同步时间格式必须为 HH:MM例如 03:00。")
hour, minute = text.split(":", 1)
return f"{int(hour):02d}:{int(minute):02d}"
def normalize_github_proxy_prefix(value: str | None) -> str:
text = str(value or "").strip()
if not text:
return ""
if "://" not in text:
raise RuntimeError("GitHub 加速前缀必须包含协议,例如 https://ghfast.top/")
if not text.endswith("/"):
text = f"{text}/"
return text
def get_effective_repo_url(github_proxy_prefix: str | None = None) -> str:
prefix = normalize_github_proxy_prefix(
github_proxy_prefix if github_proxy_prefix is not None else os.environ.get("GITHUB_PROXY_PREFIX", "")
)
return f"{prefix}{DEFAULT_REPO_URL}" if prefix else DEFAULT_REPO_URL
def compute_next_run_at(daily_time: str, now: datetime | None = None) -> str:
current = now or local_now()
hour_text, minute_text = daily_time.split(":", 1)
candidate = current.replace(
hour=int(hour_text),
minute=int(minute_text),
second=0,
microsecond=0,
)
if candidate <= current:
candidate += timedelta(days=1)
return candidate.isoformat(timespec="seconds")
def default_schedule_config() -> dict[str, object]:
enabled = truthy_env("SYNC_SCHEDULE_ENABLED", "0")
daily_time = normalize_schedule_time(os.environ.get("SYNC_SCHEDULE_TIME", "03:00"))
timezone_name = os.environ.get("TZ", "UTC").strip() or "UTC"
github_proxy_prefix = normalize_github_proxy_prefix(os.environ.get("GITHUB_PROXY_PREFIX", ""))
return {
"enabled": enabled,
"daily_time": daily_time,
"timezone": timezone_name,
"github_proxy_prefix": github_proxy_prefix,
"next_run_at": compute_next_run_at(daily_time) if enabled else None,
"last_run_time": None,
"last_run_status": None,
"last_run_message": None,
"updated_at": None,
}
def default_mysql_config() -> dict[str, object]:
return {
"auto_load": truthy_env("MYSQL_AUTO_LOAD", "0"),
"updated_at": None,
}
def normalize_mysql_config(raw: dict[str, object] | None) -> dict[str, object]:
config = default_mysql_config()
if isinstance(raw, dict):
if "auto_load" in raw:
value = raw.get("auto_load")
config["auto_load"] = value if isinstance(value, bool) else str(value).strip().lower() in {"1", "true", "yes", "on"}
if raw.get("updated_at") is not None:
config["updated_at"] = raw.get("updated_at")
return config
def read_mysql_config() -> dict[str, object]:
with MYSQL_CONFIG_LOCK:
if not MYSQL_CONFIG_PATH.exists():
return normalize_mysql_config(None)
try:
payload = json.loads(MYSQL_CONFIG_PATH.read_text(encoding="utf-8"))
except Exception:
return normalize_mysql_config(None)
return normalize_mysql_config(payload if isinstance(payload, dict) else None)
def write_mysql_config(payload: dict[str, object]) -> dict[str, object]:
normalized = normalize_mysql_config(payload)
with MYSQL_CONFIG_LOCK:
MYSQL_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
MYSQL_CONFIG_PATH.write_text(
json.dumps(normalized, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return normalized
def update_mysql_config(payload: dict[str, object]) -> dict[str, object]:
current = read_mysql_config()
auto_load_raw = payload.get("auto_load", current.get("auto_load", False))
auto_load = auto_load_raw if isinstance(auto_load_raw, bool) else str(auto_load_raw).strip().lower() in {"1", "true", "yes", "on"}
updated = {
**current,
"auto_load": auto_load,
"updated_at": local_now().isoformat(timespec="seconds"),
}
return write_mysql_config(updated)
def normalize_schedule_config(raw: dict[str, object] | None) -> dict[str, object]:
config = default_schedule_config()
if isinstance(raw, dict):
if "enabled" in raw:
value = raw.get("enabled")
config["enabled"] = value if isinstance(value, bool) else str(value).strip().lower() in {"1", "true", "yes", "on"}
if "daily_time" in raw:
try:
config["daily_time"] = normalize_schedule_time(str(raw.get("daily_time") or ""))
except RuntimeError:
config["daily_time"] = normalize_schedule_time(os.environ.get("SYNC_SCHEDULE_TIME", "03:00"))
if raw.get("timezone"):
config["timezone"] = str(raw.get("timezone")).strip() or config["timezone"]
if "github_proxy_prefix" in raw:
try:
config["github_proxy_prefix"] = normalize_github_proxy_prefix(str(raw.get("github_proxy_prefix") or ""))
except RuntimeError:
config["github_proxy_prefix"] = normalize_github_proxy_prefix(os.environ.get("GITHUB_PROXY_PREFIX", ""))
for key in ("last_run_time", "last_run_status", "last_run_message", "updated_at"):
if raw.get(key) is not None:
config[key] = raw.get(key)
next_run_at = raw.get("next_run_at")
if config["enabled"] and isinstance(next_run_at, str) and next_run_at.strip():
try:
datetime.fromisoformat(next_run_at)
config["next_run_at"] = next_run_at
except ValueError:
config["next_run_at"] = compute_next_run_at(str(config["daily_time"]))
else:
config["next_run_at"] = compute_next_run_at(str(config["daily_time"])) if config["enabled"] else None
return config
def read_schedule_config() -> dict[str, object]:
with SCHEDULE_LOCK:
if not SCHEDULE_CONFIG_PATH.exists():
return normalize_schedule_config(None)
try:
payload = json.loads(SCHEDULE_CONFIG_PATH.read_text(encoding="utf-8"))
except Exception:
return normalize_schedule_config(None)
return normalize_schedule_config(payload if isinstance(payload, dict) else None)
def write_schedule_config(payload: dict[str, object]) -> dict[str, object]:
normalized = normalize_schedule_config(payload)
with SCHEDULE_LOCK:
SCHEDULE_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
SCHEDULE_CONFIG_PATH.write_text(
json.dumps(normalized, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return normalized
def update_schedule_config(payload: dict[str, object]) -> dict[str, object]:
current = read_schedule_config()
enabled_raw = payload.get("enabled", current.get("enabled", False))
enabled = enabled_raw if isinstance(enabled_raw, bool) else str(enabled_raw).strip().lower() in {"1", "true", "yes", "on"}
daily_time = normalize_schedule_time(str(payload.get("daily_time") or current.get("daily_time") or "03:00"))
github_proxy_prefix = normalize_github_proxy_prefix(
str(payload.get("github_proxy_prefix") if "github_proxy_prefix" in payload else current.get("github_proxy_prefix") or "")
)
updated = {
**current,
"enabled": enabled,
"daily_time": daily_time,
"timezone": os.environ.get("TZ", "UTC").strip() or "UTC",
"github_proxy_prefix": github_proxy_prefix,
"next_run_at": compute_next_run_at(daily_time) if enabled else None,
"updated_at": local_now().isoformat(timespec="seconds"),
}
return write_schedule_config(updated)
def mark_schedule_run(status: str, message: str) -> dict[str, object]:
current = read_schedule_config()
updated = {
**current,
"last_run_time": local_now().isoformat(timespec="seconds"),
"last_run_status": status,
"last_run_message": message,
"next_run_at": compute_next_run_at(str(current.get("daily_time") or "03:00")) if current.get("enabled") else None,
"updated_at": local_now().isoformat(timespec="seconds"),
}
return write_schedule_config(updated)
def run_command(args: list[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
cwd=PROJECT_ROOT,
text=True,
capture_output=True,
check=False,
)
def sanitize_mysql_message(message: str, host: str | None = None, port: str | None = None) -> str:
lines = [line.strip() for line in str(message or "").splitlines() if line.strip()]
filtered = [line for line in lines if not line.startswith("WARNING:")]
text = "\n".join(filtered or lines)
host_text = host or os.environ.get("MYSQL_HOST", "mysql")
port_text = port or os.environ.get("MYSQL_PORT", "3306")
if "Can't connect to server on" in text or "Can't connect to MySQL server on" in text:
return f"MySQL 当前无法连接: {host_text}:{port_text}"
if "Access denied" in text:
return f"MySQL 账号或密码无效: {host_text}:{port_text}"
return text or f"MySQL 当前无法连接: {host_text}:{port_text}"
def normalize_text(text: str) -> str:
return NORMALIZE_RE.sub("", (text or "").lower())
def sql_string(value: str) -> str:
return (value or "").replace("\\", "\\\\").replace("'", "''")
def mysql_command(database: str | None = None) -> list[str]:
command = [
"mysql",
f"--host={os.environ.get('MYSQL_HOST', 'mysql')}",
f"--port={os.environ.get('MYSQL_PORT', '3306')}",
f"--user={os.environ.get('MYSQL_READER_USER', '')}",
"--protocol=TCP",
"--default-character-set=utf8mb4",
"--batch",
"--raw",
]
if database:
command.append(database)
return command
def mysql_env() -> dict[str, str]:
env = os.environ.copy()
env["MYSQL_PWD"] = os.environ.get("MYSQL_READER_PASSWORD", "")
return env
def run_mysql_query(sql: str, database: str | None = None) -> list[dict[str, str | None]]:
proc = subprocess.run(
mysql_command(database=database),
env=mysql_env(),
input=sql,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if proc.returncode != 0:
message = sanitize_mysql_message(
proc.stderr.strip() or proc.stdout.strip() or f"mysql exited with {proc.returncode}"
)
raise RuntimeError(message)
lines = [line for line in proc.stdout.splitlines() if line.strip()]
if not lines:
return []
headers = lines[0].split("\t")
rows: list[dict[str, str | None]] = []
for line in lines[1:]:
values = line.split("\t")
row = {}
for idx, header in enumerate(headers):
value = values[idx] if idx < len(values) else ""
row[header] = None if value == "NULL" else value
rows.append(row)
return rows
def build_sql_query_payload(payload: dict[str, object]) -> dict[str, object]:
raw_value = str(payload.get("model_raw") or payload.get("model") or "").strip()
if not raw_value:
raise RuntimeError("请填写设备标识。")
alias_norm = normalize_text(raw_value)
if not alias_norm:
raise RuntimeError("设备标识无法归一化,请检查输入。")
limit_value = payload.get("limit", 20)
try:
limit = int(limit_value)
except Exception as err:
raise RuntimeError("limit 必须是数字。") from err
limit = max(1, min(limit, 100))
sql = f"""
SELECT
model,
record_id,
alias_norm,
device_name,
brand,
manufacturer_brand,
parent_brand,
market_brand,
device_type,
source_file,
section,
source_rank,
source_weight,
code,
code_alias,
ver_name
FROM mobilemodels.mm_device_catalog
WHERE alias_norm = '{sql_string(alias_norm)}'
ORDER BY source_rank ASC, record_id ASC
LIMIT {limit};
""".strip()
rows = run_mysql_query(sql)
return {
"query_mode": "sql",
"model_raw": raw_value,
"alias_norm": alias_norm,
"limit": limit,
"sql": sql,
"rows": rows,
"row_count": len(rows),
}
def read_sync_metadata() -> dict[str, object]:
if not SYNC_METADATA_PATH.exists():
return {}
try:
return json.loads(SYNC_METADATA_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
def write_sync_metadata(payload: dict[str, object]) -> None:
SYNC_METADATA_PATH.parent.mkdir(parents=True, exist_ok=True)
SYNC_METADATA_PATH.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def get_status_payload() -> dict[str, object]:
index_mtime = None
mysql_seed_mtime = None
if INDEX_PATH.exists():
index_mtime = datetime.fromtimestamp(INDEX_PATH.stat().st_mtime).isoformat(timespec="seconds")
if MYSQL_SEED_PATH.exists():
mysql_seed_mtime = datetime.fromtimestamp(MYSQL_SEED_PATH.stat().st_mtime).isoformat(timespec="seconds")
mysql_host = os.environ.get("MYSQL_HOST", "mysql")
mysql_port = os.environ.get("MYSQL_PORT", "3306")
mysql_database = os.environ.get("MYSQL_DATABASE", "mobilemodels")
mysql_reader_user = os.environ.get("MYSQL_READER_USER", "")
mysql_reader_password = os.environ.get("MYSQL_READER_PASSWORD", "")
mysql_config = read_mysql_config()
mysql_auto_load = bool(mysql_config.get("auto_load"))
mysql_ready = False
mysql_status = ""
sync_metadata = read_sync_metadata()
schedule_config = read_schedule_config()
github_proxy_prefix = str(schedule_config.get("github_proxy_prefix") or "")
effective_repo_url = get_effective_repo_url(github_proxy_prefix)
probe_user, probe_password = mysql_probe_credentials()
mysql_proc = run_command(
[
"python3",
str(MYSQL_LOADER),
"--check-only",
"--wait-timeout",
"5",
f"--user={probe_user}",
f"--password={probe_password}",
]
)
if mysql_proc.returncode == 0:
mysql_ready = True
mysql_status = mysql_proc.stdout.strip() or "MySQL ready"
if not mysql_auto_load:
mysql_status = f"{mysql_status}; auto load disabled"
else:
failure_message = sanitize_mysql_message(
mysql_proc.stderr.strip() or mysql_proc.stdout.strip() or "MySQL unavailable",
host=mysql_host,
port=mysql_port,
)
mysql_status = failure_message if mysql_auto_load else f"{failure_message}; auto load disabled"
return {
"supports_upstream_sync": True,
"storage_mode": "docker_volume",
"project_root": str(PROJECT_ROOT),
"workspace_root": str(WORKSPACE_ROOT),
"data_root": str(DATA_ROOT),
"mysql_auto_load": mysql_auto_load,
"mysql_config_file": str(MYSQL_CONFIG_PATH.relative_to(DATA_ROOT)),
"mysql_config_updated_at": mysql_config.get("updated_at"),
"upstream_repo_url": DEFAULT_REPO_URL,
"effective_upstream_repo_url": effective_repo_url,
"upstream_branch": DEFAULT_BRANCH,
"last_sync_time": sync_metadata.get("last_sync_time"),
"last_upstream_commit": sync_metadata.get("last_upstream_commit"),
"last_sync_trigger": sync_metadata.get("last_trigger_source"),
"index_file": str(INDEX_PATH.relative_to(PROJECT_ROOT)),
"index_mtime": index_mtime,
"mysql_seed_file": str(MYSQL_SEED_PATH.relative_to(PROJECT_ROOT)),
"mysql_seed_mtime": mysql_seed_mtime,
"sync_schedule_file": str(SCHEDULE_CONFIG_PATH.relative_to(DATA_ROOT)),
"sync_schedule_enabled": schedule_config.get("enabled"),
"sync_schedule_time": schedule_config.get("daily_time"),
"sync_schedule_timezone": schedule_config.get("timezone"),
"github_proxy_prefix": github_proxy_prefix,
"sync_schedule_next_run": schedule_config.get("next_run_at"),
"sync_schedule_last_run_time": schedule_config.get("last_run_time"),
"sync_schedule_last_run_status": schedule_config.get("last_run_status"),
"sync_schedule_last_run_message": schedule_config.get("last_run_message"),
"mysql_host": mysql_host,
"mysql_port": mysql_port,
"mysql_database": mysql_database,
"mysql_reader_user": mysql_reader_user,
"mysql_reader_password": mysql_reader_password,
"mysql_ready": mysql_ready,
"mysql_status": mysql_status,
}
def run_upstream_sync(trigger_source: str = "manual") -> dict[str, object]:
if not SYNC_LOCK.acquire(blocking=False):
raise RuntimeError("已有同步任务在执行,请稍后再试。")
try:
schedule_config = read_schedule_config()
github_proxy_prefix = str(schedule_config.get("github_proxy_prefix") or "")
effective_repo_url = get_effective_repo_url(github_proxy_prefix)
upstream_proc = run_command(
["git", "ls-remote", effective_repo_url, f"refs/heads/{DEFAULT_BRANCH}"]
)
upstream_commit = ""
if upstream_proc.returncode == 0 and upstream_proc.stdout.strip():
upstream_commit = upstream_proc.stdout.split()[0]
command = [
"python3",
str(SYNC_SCRIPT),
f"--repo-url={effective_repo_url}",
"--build-index",
"--export-mysql-seed",
]
if mysql_auto_load_enabled():
command.append("--load-mysql")
proc = run_command(command)
output = "\n".join(
part for part in [proc.stdout.strip(), proc.stderr.strip()] if part
).strip()
if proc.returncode != 0:
raise RuntimeError(output or f"sync script failed with exit code {proc.returncode}")
payload = {
"storage_mode": "docker_volume",
"project_root": str(PROJECT_ROOT),
"workspace_root": str(WORKSPACE_ROOT),
"data_root": str(DATA_ROOT),
"upstream_repo_url": DEFAULT_REPO_URL,
"effective_upstream_repo_url": effective_repo_url,
"github_proxy_prefix": github_proxy_prefix,
"upstream_branch": DEFAULT_BRANCH,
"upstream_commit": upstream_commit,
"trigger_source": trigger_source,
"last_sync_time": datetime.now().isoformat(timespec="seconds"),
"last_upstream_commit": upstream_commit,
"index_file": str(INDEX_PATH.relative_to(PROJECT_ROOT)),
"index_mtime": datetime.fromtimestamp(INDEX_PATH.stat().st_mtime).isoformat(timespec="seconds")
if INDEX_PATH.exists()
else None,
"mysql_seed_file": str(MYSQL_SEED_PATH.relative_to(PROJECT_ROOT)),
"mysql_seed_mtime": datetime.fromtimestamp(MYSQL_SEED_PATH.stat().st_mtime).isoformat(timespec="seconds")
if MYSQL_SEED_PATH.exists()
else None,
"output": output or "同步脚本执行完成。",
}
write_sync_metadata({
"last_sync_time": payload["last_sync_time"],
"last_upstream_commit": payload["last_upstream_commit"],
"last_trigger_source": trigger_source,
"upstream_repo_url": DEFAULT_REPO_URL,
"effective_upstream_repo_url": effective_repo_url,
"github_proxy_prefix": github_proxy_prefix,
"upstream_branch": DEFAULT_BRANCH,
})
return payload
finally:
SYNC_LOCK.release()
def run_mysql_init(trigger_source: str = "manual") -> dict[str, object]:
if not SYNC_LOCK.acquire(blocking=False):
raise RuntimeError("已有同步或 MySQL 初始化任务在执行,请稍后再试。")
try:
proc = run_command(["python3", str(MYSQL_LOADER)])
output = "\n".join(
part for part in [proc.stdout.strip(), proc.stderr.strip()] if part
).strip()
if proc.returncode != 0:
raise RuntimeError(output or f"mysql load failed with exit code {proc.returncode}")
payload = get_status_payload()
payload.update(
{
"trigger_source": trigger_source,
"output": output or "MySQL 初始化完成。",
}
)
return payload
finally:
SYNC_LOCK.release()
def run_scheduled_sync_if_due() -> None:
schedule_config = read_schedule_config()
if not schedule_config.get("enabled"):
return
next_run_at = str(schedule_config.get("next_run_at") or "").strip()
if not next_run_at:
write_schedule_config({
**schedule_config,
"next_run_at": compute_next_run_at(str(schedule_config.get("daily_time") or "03:00")),
})
return
try:
next_run_dt = datetime.fromisoformat(next_run_at)
except ValueError:
write_schedule_config({
**schedule_config,
"next_run_at": compute_next_run_at(str(schedule_config.get("daily_time") or "03:00")),
})
return
if local_now() < next_run_dt:
return
try:
payload = run_upstream_sync(trigger_source="schedule")
message = str(payload.get("output") or "定时同步完成。")
mark_schedule_run("success", message)
print(f"[scheduler] upstream sync completed at {local_now().isoformat(timespec='seconds')}")
except RuntimeError as err:
status = "skipped" if "已有同步任务" in str(err) else "failed"
mark_schedule_run(status, str(err))
print(f"[scheduler] upstream sync {status}: {err}")
except Exception as err:
mark_schedule_run("failed", str(err))
print(f"[scheduler] upstream sync failed: {err}")
def scheduler_loop() -> None:
while True:
try:
run_scheduled_sync_if_due()
except Exception as err:
print(f"[scheduler] loop error: {err}")
time.sleep(SCHEDULER_POLL_SECONDS)
class MobileModelsHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(PROJECT_ROOT), **kwargs)
def guess_type(self, path: str) -> str:
content_type = super().guess_type(path)
lower_path = path.lower()
if lower_path.endswith(".md"):
return "text/markdown; charset=utf-8"
if lower_path.endswith(".txt"):
return "text/plain; charset=utf-8"
if content_type.startswith("text/") and "charset=" not in content_type:
return f"{content_type}; charset=utf-8"
return content_type
def _send_json(self, payload: dict[str, object], status: int = HTTPStatus.OK) -> None:
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self.wfile.write(data)
def do_GET(self) -> None:
if self.path == "/api/status":
try:
self._send_json(get_status_payload())
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
return super().do_GET()
def do_POST(self) -> None:
if self.path == "/api/sync-upstream":
try:
payload = run_upstream_sync()
self._send_json(payload)
except RuntimeError as err:
status = HTTPStatus.CONFLICT if "已有同步任务" in str(err) else HTTPStatus.INTERNAL_SERVER_ERROR
self._send_json({"error": str(err)}, status=status)
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
if self.path == "/api/init-mysql":
try:
payload = run_mysql_init()
self._send_json(payload)
except RuntimeError as err:
status = HTTPStatus.CONFLICT if "已有同步或 MySQL 初始化任务" in str(err) else HTTPStatus.INTERNAL_SERVER_ERROR
self._send_json({"error": str(err)}, status=status)
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
if self.path == "/api/query-sql":
try:
content_length = int(self.headers.get("Content-Length", "0") or "0")
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
req = json.loads(raw_body.decode("utf-8") or "{}")
payload = build_sql_query_payload(req if isinstance(req, dict) else {})
self._send_json(payload)
except RuntimeError as err:
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
if self.path == "/api/sync-schedule":
try:
content_length = int(self.headers.get("Content-Length", "0") or "0")
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
req = json.loads(raw_body.decode("utf-8") or "{}")
if not isinstance(req, dict):
raise RuntimeError("请求体必须是 JSON 对象。")
schedule_config = update_schedule_config(req)
self._send_json(
{
"message": "同步设置已保存。",
"sync_schedule_enabled": schedule_config.get("enabled"),
"sync_schedule_time": schedule_config.get("daily_time"),
"sync_schedule_timezone": schedule_config.get("timezone"),
"github_proxy_prefix": schedule_config.get("github_proxy_prefix"),
"effective_upstream_repo_url": get_effective_repo_url(
str(schedule_config.get("github_proxy_prefix") or "")
),
"sync_schedule_next_run": schedule_config.get("next_run_at"),
"sync_schedule_last_run_time": schedule_config.get("last_run_time"),
"sync_schedule_last_run_status": schedule_config.get("last_run_status"),
"sync_schedule_last_run_message": schedule_config.get("last_run_message"),
}
)
except RuntimeError as err:
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
if self.path == "/api/mysql-settings":
try:
content_length = int(self.headers.get("Content-Length", "0") or "0")
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
req = json.loads(raw_body.decode("utf-8") or "{}")
if not isinstance(req, dict):
raise RuntimeError("请求体必须是 JSON 对象。")
mysql_config = update_mysql_config(req)
self._send_json(
{
"message": "MySQL 自动装载设置已保存。",
"mysql_auto_load": mysql_config.get("auto_load"),
"mysql_config_file": str(MYSQL_CONFIG_PATH.relative_to(DATA_ROOT)),
"mysql_config_updated_at": mysql_config.get("updated_at"),
}
)
except RuntimeError as err:
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
self._send_json({"error": "Not found"}, status=HTTPStatus.NOT_FOUND)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run the MobileModels web server inside Docker Compose.")
parser.add_argument("--host", default="127.0.0.1", help="Bind host")
parser.add_argument("--port", type=int, default=8123, help="Bind port")
return parser.parse_args()
def main() -> int:
apply_timezone_from_env()
write_schedule_config(read_schedule_config())
write_mysql_config(read_mysql_config())
args = parse_args()
scheduler = threading.Thread(target=scheduler_loop, name="sync-scheduler", daemon=True)
scheduler.start()
server = ThreadingHTTPServer((args.host, args.port), MobileModelsHandler)
print(f"Serving MobileModels on http://{args.host}:{args.port}")
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())