Refine web UI and usage docs

This commit is contained in:
2026-04-14 13:54:02 +08:00
parent 0ba4fef55e
commit 0c01b91fd7
13 changed files with 21990 additions and 18993 deletions
+270 -3
View File
@@ -30,9 +30,11 @@ MYSQL_CONFIG_PATH = DATA_ROOT / "state/mysql_settings.json"
SYNC_LOCK = threading.Lock()
SCHEDULE_LOCK = threading.Lock()
MYSQL_CONFIG_LOCK = threading.Lock()
INDEX_ALIAS_LOCK = threading.Lock()
NORMALIZE_RE = re.compile(r"[^0-9a-z\u4e00-\u9fff]+")
SCHEDULE_TIME_RE = re.compile(r"^(?:[01]?\d|2[0-3]):[0-5]\d$")
SCHEDULER_POLL_SECONDS = 20
INDEX_DEVICE_NAME_ALIAS_MAP: dict[str, list[str]] | None = None
def truthy_env(name: str, default: str = "0") -> bool:
@@ -301,6 +303,123 @@ def sql_string(value: str) -> str:
return (value or "").replace("\\", "\\\\").replace("'", "''")
def parse_apple_series_generation(name: str) -> dict[str, object] | None:
text = re.sub(r"\s+", " ", str(name or "").strip())
if not text:
return None
ordinal_match = re.match(r"^(.*?)(?:\s*\((\d+)(?:st|nd|rd|th)\s+generation\)|\s+(\d+))$", text, re.IGNORECASE)
if ordinal_match:
base_label = re.sub(r"\s+", " ", str(ordinal_match.group(1) or "").strip())
generation = int(ordinal_match.group(2) or ordinal_match.group(3) or "0")
if base_label and generation > 0:
return {
"base_label": base_label,
"base_norm": normalize_text(base_label),
"generation": generation,
"chip_like": False,
}
chip_like = bool(re.search(r"\((?:[^)]*\b(?:a\d{1,2}|m\d{1,2})\b[^)]*)\)$", text, re.IGNORECASE))
base_label = re.sub(r"\s*\([^)]*\)\s*$", "", text).strip()
if not base_label:
return None
return {
"base_label": base_label,
"base_norm": normalize_text(base_label),
"generation": None,
"chip_like": chip_like,
}
def build_index_device_name_alias_map() -> dict[str, list[str]]:
if not INDEX_PATH.exists():
return {}
try:
payload = json.loads(INDEX_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
records = payload.get("records") if isinstance(payload, dict) else None
if not isinstance(records, list):
return {}
grouped: dict[str, dict[str, object]] = {}
for record in records:
if not isinstance(record, dict):
continue
brand = str(record.get("market_brand") or record.get("manufacturer_brand") or record.get("brand") or "").strip()
if brand != "Apple":
continue
device_name = str(record.get("device_name") or "").strip()
parsed = parse_apple_series_generation(device_name)
if not parsed or not parsed.get("base_norm"):
continue
base_norm = str(parsed["base_norm"])
group = grouped.setdefault(
base_norm,
{
"base_label": str(parsed["base_label"]),
"items": [],
},
)
items = group["items"]
if isinstance(items, list):
items.append(
{
"device_name": device_name,
"generation": parsed.get("generation"),
"chip_like": bool(parsed.get("chip_like")),
}
)
alias_map: dict[str, set[str]] = {}
for group in grouped.values():
items = group.get("items")
if not isinstance(items, list):
continue
explicit_generations = sorted(
{
int(item["generation"])
for item in items
if isinstance(item, dict) and isinstance(item.get("generation"), int) and int(item["generation"]) > 0
}
)
max_explicit_generation = explicit_generations[-1] if explicit_generations else 0
base_label = str(group.get("base_label") or "").strip()
if not base_label:
continue
for item in items:
if not isinstance(item, dict):
continue
generation = item.get("generation")
device_name = str(item.get("device_name") or "").strip()
if not isinstance(generation, int):
if device_name == base_label and max_explicit_generation >= 2:
generation = 1
elif item.get("chip_like") and max_explicit_generation >= 1:
generation = max_explicit_generation + 1
if not isinstance(generation, int) or generation <= 0:
continue
alias_key = normalize_text(f"{base_label} {generation}")
if not alias_key:
continue
alias_map.setdefault(alias_key, set()).add(device_name)
return {key: sorted(values) for key, values in alias_map.items()}
def resolve_index_device_names(alias_norm: str) -> list[str]:
global INDEX_DEVICE_NAME_ALIAS_MAP
if INDEX_DEVICE_NAME_ALIAS_MAP is None:
with INDEX_ALIAS_LOCK:
if INDEX_DEVICE_NAME_ALIAS_MAP is None:
INDEX_DEVICE_NAME_ALIAS_MAP = build_index_device_name_alias_map()
return list((INDEX_DEVICE_NAME_ALIAS_MAP or {}).get(alias_norm, []))
def mysql_command(database: str | None = None) -> list[str]:
command = [
"mysql",
@@ -358,7 +477,7 @@ def run_mysql_query(sql: str, database: str | None = None) -> list[dict[str, str
def build_sql_query_payload(payload: dict[str, object]) -> dict[str, object]:
raw_value = str(payload.get("model_raw") or payload.get("model") or "").strip()
if not raw_value:
raise RuntimeError("请填写设备标识。")
raise RuntimeError("请填写设备标识或设备名称")
alias_norm = normalize_text(raw_value)
if not alias_norm:
@@ -371,7 +490,7 @@ def build_sql_query_payload(payload: dict[str, object]) -> dict[str, object]:
raise RuntimeError("limit 必须是数字。") from err
limit = max(1, min(limit, 100))
sql = f"""
exact_sql = f"""
SELECT
model,
record_id,
@@ -395,13 +514,149 @@ ORDER BY source_rank ASC, record_id ASC
LIMIT {limit};
""".strip()
rows = run_mysql_query(sql)
rows = run_mysql_query(exact_sql)
sql = exact_sql
match_strategy = "alias_norm_exact"
match_strategy_label = "alias_norm 精确匹配"
resolved_device_names: list[str] = []
if not rows:
resolved_device_names = resolve_index_device_names(alias_norm)
if resolved_device_names:
name_conditions = " OR ".join(
f"device_name = '{sql_string(device_name)}'" for device_name in resolved_device_names
)
sql = f"""
SELECT
model,
record_id,
alias_norm,
device_name,
brand,
manufacturer_brand,
parent_brand,
market_brand,
device_type,
source_file,
section,
source_rank,
source_weight,
code,
code_alias,
ver_name
FROM mobilemodels.mm_device_catalog
WHERE {name_conditions}
ORDER BY source_rank ASC, record_id ASC
LIMIT {limit};
""".strip()
rows = run_mysql_query(sql)
if rows:
match_strategy = "device_name_alias"
match_strategy_label = "设备名称别名映射"
return {
"query_mode": "sql",
"model_raw": raw_value,
"alias_norm": alias_norm,
"limit": limit,
"sql": sql,
"match_strategy": match_strategy,
"match_strategy_label": match_strategy_label,
"resolved_device_names": resolved_device_names,
"rows": rows,
"row_count": len(rows),
}
def build_sql_device_name_query_payload(payload: dict[str, object]) -> dict[str, object]:
raw_value = str(payload.get("device_name") or payload.get("name") or "").strip()
if not raw_value:
raise RuntimeError("请填写设备名称。")
limit_value = payload.get("limit", 20)
try:
limit = int(limit_value)
except Exception as err:
raise RuntimeError("limit 必须是数字。") from err
limit = max(1, min(limit, 100))
alias_norm = normalize_text(raw_value)
resolved_device_names = resolve_index_device_names(alias_norm) if alias_norm else []
rows: list[dict[str, str | None]] = []
match_strategy = "device_name_like"
match_strategy_label = "device_name 模糊匹配"
if resolved_device_names:
name_conditions = " OR ".join(
f"device_name = '{sql_string(device_name)}'" for device_name in resolved_device_names
)
sql = f"""
SELECT
model,
record_id,
alias_norm,
device_name,
brand,
manufacturer_brand,
parent_brand,
market_brand,
device_type,
source_file,
section,
source_rank,
source_weight,
code,
code_alias,
ver_name
FROM mobilemodels.mm_device_catalog
WHERE {name_conditions}
ORDER BY source_rank ASC, record_id ASC
LIMIT {limit};
""".strip()
rows = run_mysql_query(sql)
if rows:
match_strategy = "device_name_alias"
match_strategy_label = "设备名称别名映射"
else:
sql = ""
if not rows:
sql = f"""
SELECT
model,
record_id,
alias_norm,
device_name,
brand,
manufacturer_brand,
parent_brand,
market_brand,
device_type,
source_file,
section,
source_rank,
source_weight,
code,
code_alias,
ver_name
FROM mobilemodels.mm_device_catalog
WHERE device_name LIKE '%{sql_string(raw_value)}%'
OR ver_name LIKE '%{sql_string(raw_value)}%'
ORDER BY source_rank ASC, record_id ASC
LIMIT {limit};
""".strip()
rows = run_mysql_query(sql)
return {
"query_mode": "sql_device_name",
"device_name": raw_value,
"alias_norm": alias_norm,
"limit": limit,
"sql": sql,
"match_strategy": match_strategy,
"match_strategy_label": match_strategy_label,
"resolved_device_names": resolved_device_names,
"rows": rows,
"row_count": len(rows),
}
@@ -715,6 +970,18 @@ class MobileModelsHandler(SimpleHTTPRequestHandler):
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
if self.path == "/api/query-sql-device-name":
try:
content_length = int(self.headers.get("Content-Length", "0") or "0")
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
req = json.loads(raw_body.decode("utf-8") or "{}")
payload = build_sql_device_name_query_payload(req if isinstance(req, dict) else {})
self._send_json(payload)
except RuntimeError as err:
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
if self.path == "/api/sync-schedule":
try:
content_length = int(self.headers.get("Content-Length", "0") or "0")