refactor: split workspace and delivery layout

This commit is contained in:
2026-03-19 13:00:40 +08:00
parent f12b3d5ecd
commit 74e50a2b30
83 changed files with 592 additions and 896 deletions

View File

@@ -1,66 +0,0 @@
# Device Mapper Usage
This tool builds a cross-platform lookup index from `brands/*.md`.
## 1) Build index
```bash
python3 tools/device_mapper.py build
```
Output file: `dist/device_index.json`
## 2) Query from command line
```bash
python3 tools/device_mapper.py find --name 'iPhone14,5' --brand Apple
python3 tools/device_mapper.py find --name 'M2102J2SC' --brand Xiaomi
python3 tools/device_mapper.py find --name 'L55M5-AD' --brand Xiaomi
```
## 3) JSON structure
- `records`: normalized device records
- `device_name`: standard marketing name
- `brand`: normalized brand
- `manufacturer_brand`: manufacturer-level brand (e.g. `Xiaomi`)
- `market_brand`: market sub-brand (e.g. `Xiaomi` / `Redmi` / `POCO`)
- `device_type`: `phone | tablet | wear | tv | other`
- `aliases`: all known searchable aliases
- `lookup`: normalized alias -> candidate `record.id[]`
- `brand_aliases`: normalized brand aliases to filter by app-provided brand
- `brand_management`: brand governance metadata
- `manufacturer_aliases`: canonical brand aliases (`华为/huawei/HUAWEI` -> `HUAWEI`)
- `manufacturer_to_parent`: child -> parent brand (e.g. `OnePlus -> OPPO`)
- `parent_to_children`: parent -> child brands
- `market_brand_aliases`: sub-brand aliases (`redmi` -> `Redmi`)
- `stats`: includes parent/manufacturer/market-level counts and Xiaomi combined + separated counts
## 4) App-side integration (iOS / Android / Harmony)
1. Load `dist/device_index.json` into memory.
2. Normalize input `name` and optional `brand`.
3. Use `lookup[normalized_name]` to fetch candidate records.
4. If `brand` exists, normalize it by `brand_management`:
- manufacturer aliases (e.g. `华为/huawei/HUAWEI` -> `HUAWEI`)
- market sub-brand aliases (e.g. `redmi` -> `Redmi`, `xiaomi` -> manufacturer-level `Xiaomi` combined)
5. Filter records by normalized manufacturer/market brand as needed.
6. Return first candidate (or show all candidates when ambiguous).
Normalization rule should match this script:
- lower-case
- keep only `[0-9a-z\u4e00-\u9fff]`
- remove all spaces, hyphens, underscores and punctuation
## 5) Device type mapping
Supported categories:
- `phone`
- `tablet`
- `wear`
- `tv`
- `other`
Type inference uses keyword + source-file default fallback to reduce misses.

View File

@@ -1,12 +0,0 @@
#!/bin/sh
set -eu
cd /app
sh tools/init_runtime_data.sh
python3 tools/device_mapper.py build
python3 tools/export_mysql_seed.py
python3 tools/load_mysql_seed.py
exec python3 tools/web_server.py --host 0.0.0.0 --port 8123

View File

@@ -1,757 +0,0 @@
#!/usr/bin/env python3
"""Build and query a cross-platform device mapping index from MobileModels markdown data."""
from __future__ import annotations
import argparse
from collections import Counter
import json
import re
from dataclasses import asdict, dataclass
from datetime import date
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Set
ENTRY_RE = re.compile(r"^\*\*(.+?)\*\*\s*$")
VARIANT_RE = re.compile(r"^\s*((?:`[^`]+`\s*)+):\s*(.+?)\s*$")
BACKTICK_RE = re.compile(r"`([^`]+)`")
SECTION_RE = re.compile(r"^##\s+(.+?)\s*$")
FILE_BRAND_MAP: Dict[str, str] = {
"360shouji": "360",
"apple_all": "Apple",
"apple_all_en": "Apple",
"apple_cn": "Apple",
"asus_cn": "ASUS",
"asus_en": "ASUS",
"blackshark": "Black Shark",
"blackshark_en": "Black Shark",
"coolpad": "Coolpad",
"google": "Google",
"honor_cn": "HONOR",
"honor_global_en": "HONOR",
"huawei_cn": "HUAWEI",
"huawei_global_en": "HUAWEI",
"lenovo_cn": "Lenovo",
"letv": "LeTV",
"meizu": "Meizu",
"meizu_en": "Meizu",
"mitv_cn": "Xiaomi",
"mitv_global_en": "Xiaomi",
"motorola_cn": "Motorola",
"nokia_cn": "Nokia",
"nothing": "Nothing",
"nubia": "nubia",
"oneplus": "OnePlus",
"oneplus_en": "OnePlus",
"oppo_cn": "OPPO",
"oppo_global_en": "OPPO",
"realme_cn": "realme",
"realme_global_en": "realme",
"samsung_cn": "Samsung",
"samsung_global_en": "Samsung",
"smartisan": "Smartisan",
"sony": "Sony",
"sony_cn": "Sony",
"vivo_cn": "vivo",
"vivo_global_en": "vivo",
"xiaomi": "Xiaomi",
"xiaomi_cn": "Xiaomi",
"xiaomi_en": "Xiaomi",
"xiaomi-wear": "Xiaomi",
"zhixuan": "HUAWEI Smart Selection",
"zte_cn": "ZTE",
}
FILE_DEFAULT_DEVICE_TYPE: Dict[str, str] = {
"mitv_cn": "tv",
"mitv_global_en": "tv",
"xiaomi-wear": "wear",
"apple_all": "phone",
"apple_all_en": "phone",
"apple_cn": "phone",
"google": "phone",
"honor_cn": "phone",
"honor_global_en": "phone",
"huawei_cn": "phone",
"huawei_global_en": "phone",
"xiaomi": "phone",
"xiaomi_cn": "phone",
"xiaomi_en": "phone",
"zhixuan": "phone",
}
BRAND_ALIASES: Dict[str, List[str]] = {
"360": ["360", "360手机", "奇酷", "qiku"],
"Apple": ["apple", "苹果", "iphone", "ipad", "ipod"],
"ASUS": ["asus", "华硕", "rog", "zenfone"],
"Black Shark": ["black shark", "blackshark", "黑鲨"],
"Coolpad": ["coolpad", "酷派"],
"Google": ["google", "pixel"],
"HONOR": ["honor", "荣耀"],
"HUAWEI": ["huawei", "华为"],
"HUAWEI Smart Selection": ["华为智选", "zhixuan", "umagic", "wiko", "hi nova", "nzone"],
"Lenovo": ["lenovo", "联想", "zuk", "拯救者"],
"LeTV": ["letv", "乐视"],
"Meizu": ["meizu", "魅族"],
"Motorola": ["motorola", "摩托罗拉", "moto"],
"Nokia": ["nokia", "诺基亚"],
"Nothing": ["nothing", "cmf"],
"nubia": ["nubia", "努比亚", "红魔", "redmagic"],
"iQOO": ["iqoo", "i qoo", "艾酷"],
"OnePlus": ["oneplus", "一加"],
"OPPO": ["oppo"],
"POCO": ["poco"],
"Redmi": ["redmi", "红米", "hongmi"],
"realme": ["realme", "真我"],
"Samsung": ["samsung", "三星", "galaxy"],
"Smartisan": ["smartisan", "锤子", "坚果"],
"Sony": ["sony", "索尼", "xperia"],
"vivo": ["vivo"],
"Xiaomi": ["xiaomi", "小米", "mi", "米家", "mipad"],
"ZTE": ["zte", "中兴"],
}
MANUFACTURER_PARENT_BRAND: Dict[str, str] = {
"Black Shark": "Xiaomi",
"HUAWEI Smart Selection": "HUAWEI",
"Motorola": "Lenovo",
"iQOO": "vivo",
"POCO": "Xiaomi",
"Redmi": "Xiaomi",
"OnePlus": "OPPO",
"realme": "OPPO",
"nubia": "ZTE",
}
MARKET_BRAND_ALIASES: Dict[str, List[str]] = {
"iQOO": ["iqoo", "i qoo", "艾酷"],
"POCO": ["poco"],
"Redmi": ["redmi", "红米", "hongmi"],
"Xiaomi": ["xiaomi", "小米", "mi", "mipad", "米家"],
}
MARKET_BRAND_TO_MANUFACTURER: Dict[str, str] = {
"iQOO": "vivo",
"POCO": "Xiaomi",
"Redmi": "Xiaomi",
"Xiaomi": "Xiaomi",
}
TV_KEYWORDS = [
"tv",
"电视",
"智慧屏",
"smart tv",
"机顶盒",
"tv box",
"stick",
"dongle",
]
TABLET_KEYWORDS = [
"ipad",
"tablet",
"tab",
"pad",
"平板",
"matepad",
]
WEAR_KEYWORDS = [
"watch",
"smartwatch",
"手表",
"手环",
"band",
"wear",
"wearable",
"buds",
"earbuds",
"耳机",
"tws",
"eyewear",
"glasses",
"眼镜",
]
OTHER_KEYWORDS = [
"matebook",
"笔记本",
"laptop",
"notebook",
"vision",
"vr",
"ipod",
"airpods",
]
PHONE_KEYWORDS = [
"iphone",
"phone",
"手机",
"galaxy",
"pixel",
"xiaomi",
"redmi",
"poco",
"honor",
"huawei",
"mate",
"nova",
"oppo",
"vivo",
"realme",
"oneplus",
"nokia",
"nubia",
"meizu",
"lenovo",
"motorola",
"zte",
"smartisan",
"zenfone",
"rog",
"麦芒",
"畅享",
"优畅享",
]
@dataclass
class DeviceRecord:
id: str
device_name: str
brand: str
manufacturer_brand: str
parent_brand: str
market_brand: str
device_type: str
aliases: List[str]
source_file: str
section: str
def normalize_text(text: str) -> str:
return re.sub(r"[^0-9a-z\u4e00-\u9fff]+", "", text.lower())
def canonical_brand(file_stem: str) -> str:
return FILE_BRAND_MAP.get(file_stem, file_stem)
def brand_aliases(brand: str) -> List[str]:
aliases = set(BRAND_ALIASES.get(brand, []))
aliases.add(brand)
return sorted(aliases)
def has_keyword(text: str, keywords: Iterable[str]) -> bool:
norm_text = normalize_text(text)
for kw in keywords:
if normalize_text(kw) and normalize_text(kw) in norm_text:
return True
return False
def resolve_parent_brand(manufacturer_brand: str) -> str:
return MANUFACTURER_PARENT_BRAND.get(manufacturer_brand, manufacturer_brand)
def infer_market_brand(
manufacturer_brand: str,
device_name: str,
section: str,
aliases: Iterable[str],
) -> str:
corpus = normalize_text(" ".join([device_name, section, *aliases]))
if manufacturer_brand == "Xiaomi":
poco_keys = [normalize_text(v) for v in MARKET_BRAND_ALIASES["POCO"]]
redmi_keys = [normalize_text(v) for v in MARKET_BRAND_ALIASES["Redmi"]]
if any(key and key in corpus for key in poco_keys):
return "POCO"
if any(key and key in corpus for key in redmi_keys):
return "Redmi"
return "Xiaomi"
if manufacturer_brand == "vivo":
iqoo_keys = [normalize_text(v) for v in MARKET_BRAND_ALIASES["iQOO"]]
if any(key and key in corpus for key in iqoo_keys):
return "iQOO"
return "vivo"
return manufacturer_brand
def infer_device_type(
device_name: str,
section: str,
source_file: str,
aliases: Iterable[str],
default_type: str,
) -> str:
corpus = " ".join([device_name, section, *aliases, source_file])
if has_keyword(corpus, TV_KEYWORDS):
return "tv"
if has_keyword(corpus, TABLET_KEYWORDS):
return "tablet"
if has_keyword(corpus, WEAR_KEYWORDS):
return "wear"
if has_keyword(corpus, OTHER_KEYWORDS):
return "other"
if has_keyword(corpus, PHONE_KEYWORDS):
return "phone"
return default_type or "other"
def clean_entry_title(raw_title: str) -> str:
title = raw_title.strip()
if title.endswith(":"):
title = title[:-1].strip()
# remove leading tag like: [`X1`] or [X1]
title = re.sub(r"^\[[^\]]+\]\s*", "", title)
# remove one or more trailing codenames like: (`foo`) (`bar`)
title = re.sub(r"(?:\s*\(\s*`[^`]+`\s*\))+\s*$", "", title)
title = re.sub(r"\s*\((?:codename|代号)[^)]*\)\s*$", "", title, flags=re.IGNORECASE)
# strip markdown links while keeping text: [Foo](url) -> Foo
title = re.sub(r"\[([^\]]+)\]\([^)]*\)", r"\1", title)
title = " ".join(title.split())
return title
def extract_codes(text: str) -> List[str]:
return [code.strip() for code in BACKTICK_RE.findall(text) if code.strip()]
def parse_brand_file(path: Path) -> List[DeviceRecord]:
file_stem = path.stem
brand = canonical_brand(file_stem)
default_type = FILE_DEFAULT_DEVICE_TYPE.get(file_stem, "phone")
records: List[DeviceRecord] = []
lines = path.read_text(encoding="utf-8").splitlines()
section = ""
current_title = ""
current_aliases: Set[str] = set()
def flush_current() -> None:
nonlocal current_title, current_aliases
if not current_title:
return
aliases = sorted({alias.strip() for alias in current_aliases if alias.strip()})
record_id = f"{file_stem}:{len(records) + 1}"
device_type = infer_device_type(
device_name=current_title,
section=section,
source_file=path.name,
aliases=aliases,
default_type=default_type,
)
records.append(
DeviceRecord(
id=record_id,
device_name=current_title,
brand=brand,
manufacturer_brand=brand,
parent_brand=resolve_parent_brand(brand),
market_brand=infer_market_brand(
manufacturer_brand=brand,
device_name=current_title,
section=section,
aliases=aliases,
),
device_type=device_type,
aliases=aliases,
source_file=f"brands/{path.name}",
section=section,
)
)
current_title = ""
current_aliases = set()
for raw in lines:
line = raw.strip()
if not line:
continue
section_match = SECTION_RE.match(line)
if section_match:
section = section_match.group(1).strip()
continue
entry_match = ENTRY_RE.match(line)
if entry_match:
flush_current()
raw_title = entry_match.group(1).strip()
current_title = clean_entry_title(raw_title)
current_aliases = set(extract_codes(raw_title))
current_aliases.add(current_title)
continue
if not current_title:
continue
variant_match = VARIANT_RE.match(line)
if variant_match:
variant_codes = extract_codes(variant_match.group(1))
variant_name = variant_match.group(2).strip()
current_aliases.update(variant_codes)
current_aliases.add(variant_name)
flush_current()
return records
class DeviceMapper:
def __init__(self, records: List[DeviceRecord]) -> None:
self.records = records
self.records_by_id = {record.id: record for record in records}
self.manufacturer_alias_lookup: Dict[str, str] = {}
self.parent_alias_lookup: Dict[str, str] = {}
self.market_alias_lookup: Dict[str, str] = {}
self.parent_to_children: Dict[str, Set[str]] = {}
self.alias_index: Dict[str, Set[str]] = {}
for record in records:
for alias in record.aliases:
key = normalize_text(alias)
if not key:
continue
self.alias_index.setdefault(key, set()).add(record.id)
manufacturers = sorted({record.manufacturer_brand for record in records})
parents = sorted({record.parent_brand for record in records})
for brand in manufacturers:
for alias in brand_aliases(brand):
key = normalize_text(alias)
if key:
self.manufacturer_alias_lookup[key] = brand
for parent in parents:
for alias in brand_aliases(parent):
key = normalize_text(alias)
if key:
self.parent_alias_lookup[key] = parent
for manufacturer in manufacturers:
parent = resolve_parent_brand(manufacturer)
self.parent_to_children.setdefault(parent, set()).add(manufacturer)
for market_brand, aliases in MARKET_BRAND_ALIASES.items():
for alias in set([market_brand, *aliases]):
key = normalize_text(alias)
if key:
self.market_alias_lookup[key] = market_brand
def _parse_brand_filter(self, input_brand: Optional[str]) -> Dict[str, Optional[str]]:
if not input_brand:
return {
"parent_brand": None,
"manufacturer_brand": None,
"market_brand": None,
"source": "none",
}
input_norm = normalize_text(input_brand)
if not input_norm:
return {
"parent_brand": None,
"manufacturer_brand": None,
"market_brand": None,
"source": "none",
}
if input_norm in self.market_alias_lookup:
market_brand = self.market_alias_lookup[input_norm]
manufacturer_brand = MARKET_BRAND_TO_MANUFACTURER.get(market_brand, market_brand)
parent_brand = resolve_parent_brand(manufacturer_brand)
if market_brand == "Xiaomi":
return {
"parent_brand": parent_brand,
"manufacturer_brand": manufacturer_brand,
"market_brand": None,
"source": "manufacturer_alias_from_market",
}
return {
"parent_brand": parent_brand,
"manufacturer_brand": manufacturer_brand,
"market_brand": market_brand,
"source": "market_alias_exact",
}
if input_norm in self.manufacturer_alias_lookup:
manufacturer_brand = self.manufacturer_alias_lookup[input_norm]
parent_brand = resolve_parent_brand(manufacturer_brand)
children = self.parent_to_children.get(manufacturer_brand, set())
if manufacturer_brand == parent_brand and len(children) > 1:
return {
"parent_brand": parent_brand,
"manufacturer_brand": None,
"market_brand": None,
"source": "parent_alias_exact",
}
return {
"parent_brand": parent_brand,
"manufacturer_brand": manufacturer_brand,
"market_brand": None,
"source": "manufacturer_alias_exact",
}
if input_norm in self.parent_alias_lookup:
parent_brand = self.parent_alias_lookup[input_norm]
return {
"parent_brand": parent_brand,
"manufacturer_brand": None,
"market_brand": None,
"source": "parent_alias_exact",
}
for alias_norm, market_brand in self.market_alias_lookup.items():
if alias_norm and alias_norm in input_norm:
manufacturer_brand = MARKET_BRAND_TO_MANUFACTURER.get(market_brand, market_brand)
return {
"parent_brand": resolve_parent_brand(manufacturer_brand),
"manufacturer_brand": manufacturer_brand,
"market_brand": market_brand,
"source": "market_alias_contains",
}
for alias_norm, manufacturer_brand in self.manufacturer_alias_lookup.items():
if alias_norm and alias_norm in input_norm:
parent_brand = resolve_parent_brand(manufacturer_brand)
children = self.parent_to_children.get(manufacturer_brand, set())
if manufacturer_brand == parent_brand and len(children) > 1:
return {
"parent_brand": parent_brand,
"manufacturer_brand": None,
"market_brand": None,
"source": "parent_alias_contains",
}
return {
"parent_brand": parent_brand,
"manufacturer_brand": manufacturer_brand,
"market_brand": None,
"source": "manufacturer_alias_contains",
}
for alias_norm, parent_brand in self.parent_alias_lookup.items():
if alias_norm and alias_norm in input_norm:
return {
"parent_brand": parent_brand,
"manufacturer_brand": None,
"market_brand": None,
"source": "parent_alias_contains",
}
return {
"parent_brand": None,
"manufacturer_brand": None,
"market_brand": None,
"source": "none",
}
@staticmethod
def _brand_match(
brand_filter: Dict[str, Optional[str]],
record: DeviceRecord,
) -> bool:
parent = brand_filter.get("parent_brand")
manufacturer = brand_filter.get("manufacturer_brand")
market = brand_filter.get("market_brand")
if parent and record.parent_brand != parent:
return False
if manufacturer and record.manufacturer_brand != manufacturer:
return False
if market and record.market_brand != market:
return False
return True
def find(self, name: str, brand: Optional[str] = None, limit: int = 5) -> Dict[str, object]:
query = normalize_text(name)
if not query:
return {
"matched": False,
"reason": "Empty device name.",
"query_name": name,
"query_brand": brand,
"candidates": [],
}
candidate_ids = list(self.alias_index.get(query, set()))
matched_records = [self.records_by_id[rid] for rid in candidate_ids]
brand_filter = self._parse_brand_filter(brand)
if brand:
matched_records = [r for r in matched_records if self._brand_match(brand_filter, r)]
if not matched_records and brand_filter.get("manufacturer_brand"):
fallback_filter = {
"parent_brand": brand_filter.get("parent_brand"),
"manufacturer_brand": brand_filter.get("manufacturer_brand"),
"market_brand": None,
}
matched_records = [r for r in [self.records_by_id[rid] for rid in candidate_ids] if self._brand_match(fallback_filter, r)]
matched_records.sort(key=lambda r: (r.device_name, r.source_file, r.id))
if matched_records:
best = matched_records[0]
return {
"matched": True,
"query_name": name,
"query_brand": brand,
"query_brand_parsed": brand_filter,
"best": asdict(best),
"candidates": [asdict(r) for r in matched_records[:limit]],
}
suggestions: List[str] = []
for alias in self.alias_index:
if query in alias or alias in query:
suggestions.append(alias)
if len(suggestions) >= limit:
break
return {
"matched": False,
"query_name": name,
"query_brand": brand,
"query_brand_parsed": brand_filter,
"reason": "No exact alias match.",
"candidates": [],
"suggestions": suggestions,
}
def build_records(repo_root: Path) -> List[DeviceRecord]:
brands_dir = repo_root / "brands"
records: List[DeviceRecord] = []
for md_path in sorted(brands_dir.glob("*.md")):
records.extend(parse_brand_file(md_path))
return records
def export_index(records: List[DeviceRecord], output_path: Path) -> None:
lookup: Dict[str, List[str]] = {}
manufacturer_brands_in_data = sorted({record.manufacturer_brand for record in records})
parent_brands_in_data = sorted({record.parent_brand for record in records})
market_brands_in_data = sorted({record.market_brand for record in records})
all_brands_in_data = sorted(
set(manufacturer_brands_in_data)
| set(market_brands_in_data)
| set(MARKET_BRAND_TO_MANUFACTURER.keys())
)
manufacturer_stats = dict(sorted(Counter(record.manufacturer_brand for record in records).items()))
parent_stats = dict(sorted(Counter(record.parent_brand for record in records).items()))
market_brand_stats = dict(sorted(Counter(record.market_brand for record in records).items()))
brand_to_manufacturer = {}
for brand in all_brands_in_data:
if brand in MARKET_BRAND_TO_MANUFACTURER:
brand_to_manufacturer[brand] = MARKET_BRAND_TO_MANUFACTURER[brand]
else:
brand_to_manufacturer[brand] = resolve_parent_brand(brand)
parent_to_children: Dict[str, List[str]] = {}
for child, parent in brand_to_manufacturer.items():
parent_to_children.setdefault(parent, []).append(child)
for parent in parent_to_children:
parent_to_children[parent] = sorted(parent_to_children[parent])
all_aliases = {brand: brand_aliases(brand) for brand in all_brands_in_data}
for record in records:
for alias in record.aliases:
key = normalize_text(alias)
if not key:
continue
lookup.setdefault(key, []).append(record.id)
for key, ids in lookup.items():
lookup[key] = sorted(set(ids))
output = {
"generated_on": date.today().isoformat(),
"total_records": len(records),
"brands": manufacturer_brands_in_data,
"brand_aliases": all_aliases,
"brand_management": {
"brands": all_brands_in_data,
"manufacturers": sorted(set(brand_to_manufacturer.values())),
"manufacturer_aliases": all_aliases,
"manufacturer_to_parent": brand_to_manufacturer,
"brand_to_manufacturer": brand_to_manufacturer,
"parent_to_children": parent_to_children,
"parent_aliases": {brand: brand_aliases(brand) for brand in parent_brands_in_data},
"market_brand_aliases": MARKET_BRAND_ALIASES,
"market_brand_to_manufacturer": MARKET_BRAND_TO_MANUFACTURER,
"market_brands": market_brands_in_data,
"parent_brands": parent_brands_in_data,
"stats": {
"manufacturer_brand": manufacturer_stats,
"parent_brand": parent_stats,
"market_brand": market_brand_stats,
},
},
"lookup": lookup,
"records": [asdict(r) for r in records],
}
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding="utf-8")
def main() -> None:
parser = argparse.ArgumentParser(description="MobileModels device mapper")
parser.add_argument(
"--repo-root",
type=Path,
default=Path(__file__).resolve().parents[1],
help="Path to MobileModels repository root",
)
subparsers = parser.add_subparsers(dest="command", required=True)
build_cmd = subparsers.add_parser("build", help="Build JSON index")
build_cmd.add_argument(
"--output",
type=Path,
default=Path("dist/device_index.json"),
help="Output JSON path",
)
find_cmd = subparsers.add_parser("find", help="Find a device by name + optional brand")
find_cmd.add_argument("--name", required=True, help="Raw device name from app")
find_cmd.add_argument("--brand", default=None, help="Optional raw brand from app")
find_cmd.add_argument("--limit", type=int, default=5, help="Max matched candidates")
args = parser.parse_args()
records = build_records(args.repo_root)
mapper = DeviceMapper(records)
if args.command == "build":
output_path: Path = args.output
if not output_path.is_absolute():
output_path = args.repo_root / output_path
export_index(records, output_path)
print(f"Built index: {output_path}")
print(f"Total records: {len(records)}")
return
if args.command == "find":
result = mapper.find(name=args.name, brand=args.brand, limit=args.limit)
print(json.dumps(result, ensure_ascii=False, indent=2))
return
if __name__ == "__main__":
main()

View File

@@ -1,281 +0,0 @@
#!/usr/bin/env python3
"""Export MobileModels records into MySQL-friendly seed SQL."""
from __future__ import annotations
import argparse
import re
from pathlib import Path
from typing import Iterable
from device_mapper import (
MARKET_BRAND_ALIASES,
MARKET_BRAND_TO_MANUFACTURER,
build_records,
brand_aliases,
normalize_text,
resolve_parent_brand,
)
REPO_ROOT = Path(__file__).resolve().parent.parent
LEGACY_CODE_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9,._/+\\-]{1,63}$")
def is_cn_source_file(source_file: str) -> bool:
return source_file.endswith("_cn.md")
def build_source_order(records: list[object]) -> list[str]:
source_files = sorted({record.source_file for record in records})
cn = [source for source in source_files if is_cn_source_file(source)]
other = [source for source in source_files if not is_cn_source_file(source)]
return sorted(cn) + sorted(other)
def build_source_weights(records: list[object]) -> tuple[dict[str, int], dict[str, float]]:
order = build_source_order(records)
total = len(order)
rank_map: dict[str, int] = {}
weight_map: dict[str, float] = {}
for idx, source_file in enumerate(order):
rank = idx + 1
weight = (((total - idx) / total) * 6) if total > 1 else 6
rank_map[source_file] = rank
weight_map[source_file] = round(weight, 3)
return rank_map, weight_map
def sql_quote(value: object | None) -> str:
if value is None:
return "NULL"
if isinstance(value, bool):
return "1" if value else "0"
if isinstance(value, (int, float)):
return str(value)
text = str(value)
text = text.replace("\\", "\\\\").replace("'", "\\'")
return f"'{text}'"
def batched(items: list[tuple[str, ...]], batch_size: int) -> Iterable[list[tuple[str, ...]]]:
for start in range(0, len(items), batch_size):
yield items[start:start + batch_size]
def build_catalog_rows(records: list[object]) -> list[tuple[str, ...]]:
rank_map, weight_map = build_source_weights(records)
rows = []
seen_keys: set[tuple[str, str]] = set()
for record in records:
aliases = sorted({alias.strip() for alias in record.aliases if alias.strip()})
code_aliases = [alias for alias in aliases if is_legacy_code_alias(alias)]
primary_code = code_aliases[0] if code_aliases else None
other_codes = [alias for alias in code_aliases if alias != primary_code]
code_alias = " | ".join(other_codes) if other_codes else None
version_names = [alias for alias in aliases if not is_legacy_code_alias(alias)]
ver_name = " | ".join(version_names) if version_names else None
for alias in aliases:
alias_norm = normalize_text(alias)
if not alias_norm:
continue
dedupe_key = (record.id, alias_norm)
if dedupe_key in seen_keys:
continue
seen_keys.add(dedupe_key)
rows.append((
sql_quote(record.id),
sql_quote(alias),
sql_quote(alias_norm),
sql_quote(record.device_name),
sql_quote(record.brand),
sql_quote(record.manufacturer_brand),
sql_quote(record.parent_brand),
sql_quote(record.market_brand),
sql_quote(record.device_type),
sql_quote(primary_code),
sql_quote(code_alias),
sql_quote(ver_name),
sql_quote(record.source_file),
sql_quote(record.section),
sql_quote(rank_map[record.source_file]),
sql_quote(f"{weight_map[record.source_file]:.3f}"),
))
rows.sort(key=lambda item: (item[2], item[14], item[0], item[1]))
return rows
def build_brand_rows(records: list[object]) -> list[tuple[str, ...]]:
manufacturer_brands = sorted({record.manufacturer_brand for record in records})
parent_brands = sorted({record.parent_brand for record in records})
rows: dict[tuple[str, str], tuple[str, ...]] = {}
for brand in manufacturer_brands:
parent_brand = resolve_parent_brand(brand)
for alias in brand_aliases(brand):
alias_norm = normalize_text(alias)
if not alias_norm:
continue
rows[(alias_norm, "manufacturer")] = (
sql_quote(alias_norm),
sql_quote("manufacturer"),
sql_quote(brand),
sql_quote(brand),
sql_quote(parent_brand),
sql_quote(None),
)
for brand in parent_brands:
for alias in brand_aliases(brand):
alias_norm = normalize_text(alias)
if not alias_norm:
continue
rows[(alias_norm, "parent")] = (
sql_quote(alias_norm),
sql_quote("parent"),
sql_quote(brand),
sql_quote(None),
sql_quote(brand),
sql_quote(None),
)
for market_brand, aliases in MARKET_BRAND_ALIASES.items():
manufacturer_brand = MARKET_BRAND_TO_MANUFACTURER.get(market_brand, market_brand)
parent_brand = resolve_parent_brand(manufacturer_brand)
for alias in sorted(set([market_brand, *aliases])):
alias_norm = normalize_text(alias)
if not alias_norm:
continue
rows[(alias_norm, "market")] = (
sql_quote(alias_norm),
sql_quote("market"),
sql_quote(market_brand),
sql_quote(manufacturer_brand),
sql_quote(parent_brand),
sql_quote(market_brand),
)
return [rows[key] for key in sorted(rows)]
def is_legacy_code_alias(text: str) -> bool:
value = (text or "").strip()
if not value or not LEGACY_CODE_RE.match(value):
return False
return any(ch.isdigit() for ch in value)
def append_insert_block(lines: list[str], table_name: str, columns: list[str], rows: list[tuple[str, ...]], batch_size: int = 500) -> None:
if not rows:
return
column_sql = ", ".join(f"`{column}`" for column in columns)
for chunk in batched(rows, batch_size):
values_sql = ",\n".join(f" ({', '.join(row)})" for row in chunk)
lines.append(f"INSERT INTO `{table_name}` ({column_sql}) VALUES\n{values_sql};")
lines.append("")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Export MobileModels MySQL seed SQL.")
parser.add_argument(
"--repo-root",
type=Path,
default=REPO_ROOT,
help="Path to MobileModels repository root",
)
parser.add_argument(
"--output",
type=Path,
default=Path("dist/mobilemodels_mysql_seed.sql"),
help="Output SQL path",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
repo_root = args.repo_root.resolve()
output_path = args.output if args.output.is_absolute() else repo_root / args.output
records = build_records(repo_root)
device_record_count = len(records)
catalog_rows = build_catalog_rows(records)
brand_rows = build_brand_rows(records)
lines = [
"-- MobileModels MySQL seed",
"-- Generated by tools/export_mysql_seed.py",
"USE `mobilemodels`;",
"",
"START TRANSACTION;",
"",
"DELETE FROM `mm_device_catalog`;",
"DELETE FROM `mm_brand_lookup`;",
"",
]
append_insert_block(
lines,
"mm_device_catalog",
[
"record_id",
"model",
"alias_norm",
"device_name",
"brand",
"manufacturer_brand",
"parent_brand",
"market_brand",
"device_type",
"code",
"code_alias",
"ver_name",
"source_file",
"section",
"source_rank",
"source_weight",
],
catalog_rows,
)
append_insert_block(
lines,
"mm_brand_lookup",
[
"alias_norm",
"alias_type",
"canonical_brand",
"manufacturer_brand",
"parent_brand",
"market_brand",
],
brand_rows,
)
lines.extend([
"COMMIT;",
"",
f"-- device_records: {device_record_count}",
f"-- device_catalog_rows: {len(catalog_rows)}",
f"-- device_lookup_rows: {len(catalog_rows)}",
f"-- brand_lookup_rows: {len(brand_rows)}",
f"-- legacy_models_rows: {len(catalog_rows)}",
"",
])
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text("\n".join(lines), encoding="utf-8")
print(f"Exported MySQL seed: {output_path}")
print(f"device_records={device_record_count}")
print(f"device_catalog_rows={len(catalog_rows)}")
print(f"device_lookup_rows={len(catalog_rows)}")
print(f"brand_lookup_rows={len(brand_rows)}")
print(f"legacy_models_rows={len(catalog_rows)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,73 +0,0 @@
#!/bin/sh
set -eu
APP_ROOT="${APP_ROOT:-/app}"
DATA_ROOT="${MOBILEMODELS_DATA_ROOT:-/data}"
mkdir -p "$DATA_ROOT" "$DATA_ROOT/state"
sync_missing_dir_entries() {
src_dir="$1"
dst_dir="$2"
mkdir -p "$dst_dir"
for src_entry in "$src_dir"/*; do
[ -e "$src_entry" ] || continue
name="$(basename "$src_entry")"
dst_entry="$dst_dir/$name"
if [ -d "$src_entry" ]; then
sync_missing_dir_entries "$src_entry" "$dst_entry"
continue
fi
if [ ! -e "$dst_entry" ] && [ ! -L "$dst_entry" ]; then
mkdir -p "$(dirname "$dst_entry")"
cp -a "$src_entry" "$dst_entry"
fi
done
}
init_path() {
rel_path="$1"
src_path="$APP_ROOT/$rel_path"
dst_path="$DATA_ROOT/$rel_path"
if [ -d "$src_path" ]; then
if [ ! -e "$dst_path" ] && [ ! -L "$dst_path" ]; then
mkdir -p "$(dirname "$dst_path")"
cp -a "$src_path" "$dst_path"
else
sync_missing_dir_entries "$src_path" "$dst_path"
fi
elif [ ! -e "$dst_path" ] && [ ! -L "$dst_path" ]; then
mkdir -p "$(dirname "$dst_path")"
cp -a "$src_path" "$dst_path"
fi
if [ -L "$src_path" ]; then
current_target="$(readlink "$src_path" || true)"
if [ "$current_target" = "$dst_path" ]; then
return
fi
rm -f "$src_path"
else
rm -rf "$src_path"
fi
ln -s "$dst_path" "$src_path"
}
for rel_path in \
brands \
misc \
dist \
README.md \
README_en.md \
CHANGELOG.md \
CHANGELOG_en.md \
LICENSE.txt
do
init_path "$rel_path"
done

View File

@@ -1,165 +0,0 @@
#!/usr/bin/env python3
"""Load MobileModels schema and seed data into MySQL."""
from __future__ import annotations
import argparse
import os
import subprocess
import sys
import time
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
def mysql_env(password: str) -> dict[str, str]:
env = os.environ.copy()
env["MYSQL_PWD"] = password
return env
def mysql_command(user: str, host: str, port: int, database: str | None = None) -> list[str]:
command = [
"mysql",
f"--host={host}",
f"--port={port}",
f"--user={user}",
"--protocol=TCP",
"--default-character-set=utf8mb4",
]
if database:
command.append(database)
return command
def mysqladmin_ping(user: str, password: str, host: str, port: int) -> bool:
proc = subprocess.run(
[
"mysqladmin",
f"--host={host}",
f"--port={port}",
f"--user={user}",
"--protocol=TCP",
"ping",
"--silent",
],
env=mysql_env(password),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
return proc.returncode == 0
def wait_for_mysql(user: str, password: str, host: str, port: int, timeout: int) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
if mysqladmin_ping(user, password, host, port):
return
time.sleep(2)
raise RuntimeError(f"MySQL 未在 {timeout}s 内就绪: {host}:{port}")
def run_sql_file(user: str, password: str, host: str, port: int, path: Path, database: str | None = None) -> None:
sql_text = path.read_text(encoding="utf-8")
proc = subprocess.run(
mysql_command(user, host, port, database=database),
env=mysql_env(password),
input=sql_text,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if proc.returncode != 0:
message = proc.stderr.strip() or proc.stdout.strip() or f"mysql exited with {proc.returncode}"
raise RuntimeError(f"执行 SQL 文件失败 {path}: {message}")
def sql_string(value: str) -> str:
return value.replace("\\", "\\\\").replace("'", "''")
def ensure_reader_user(
user: str,
password: str,
host: str,
port: int,
database: str,
reader_user: str,
reader_password: str,
) -> None:
sql = f"""
CREATE USER IF NOT EXISTS '{sql_string(reader_user)}'@'%' IDENTIFIED BY '{sql_string(reader_password)}';
ALTER USER '{sql_string(reader_user)}'@'%' IDENTIFIED BY '{sql_string(reader_password)}';
GRANT SELECT ON `{database}`.* TO '{sql_string(reader_user)}'@'%';
GRANT SELECT ON `python_services_test`.* TO '{sql_string(reader_user)}'@'%';
FLUSH PRIVILEGES;
"""
proc = subprocess.run(
mysql_command(user, host, port),
env=mysql_env(password),
input=sql,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if proc.returncode != 0:
message = proc.stderr.strip() or proc.stdout.strip() or f"mysql exited with {proc.returncode}"
raise RuntimeError(f"创建只读账号失败: {message}")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Load MobileModels schema and seed data into MySQL.")
parser.add_argument("--schema", type=Path, default=Path("sql/mobilemodels_mysql_schema.sql"))
parser.add_argument("--seed", type=Path, default=Path("dist/mobilemodels_mysql_seed.sql"))
parser.add_argument("--host", default=os.environ.get("MYSQL_HOST", "mysql"))
parser.add_argument("--port", type=int, default=int(os.environ.get("MYSQL_PORT", "3306")))
parser.add_argument("--user", default=os.environ.get("MYSQL_ROOT_USER", "root"))
parser.add_argument("--password", default=os.environ.get("MYSQL_ROOT_PASSWORD", "mobilemodels_root"))
parser.add_argument("--database", default=os.environ.get("MYSQL_DATABASE", "mobilemodels"))
parser.add_argument("--reader-user", default=os.environ.get("MYSQL_READER_USER", ""))
parser.add_argument("--reader-password", default=os.environ.get("MYSQL_READER_PASSWORD", ""))
parser.add_argument("--wait-timeout", type=int, default=120)
parser.add_argument("--check-only", action="store_true", help="Only check MySQL readiness")
return parser.parse_args()
def main() -> int:
args = parse_args()
schema_path = args.schema if args.schema.is_absolute() else REPO_ROOT / args.schema
seed_path = args.seed if args.seed.is_absolute() else REPO_ROOT / args.seed
wait_for_mysql(args.user, args.password, args.host, args.port, args.wait_timeout)
if args.check_only:
print(f"MySQL ready: {args.host}:{args.port}")
return 0
run_sql_file(args.user, args.password, args.host, args.port, schema_path)
run_sql_file(args.user, args.password, args.host, args.port, seed_path)
if args.reader_user and args.reader_password:
ensure_reader_user(
args.user,
args.password,
args.host,
args.port,
args.database,
args.reader_user,
args.reader_password,
)
print(f"Loaded schema: {schema_path}")
print(f"Loaded seed: {seed_path}")
if args.reader_user:
print(f"Ensured reader user: {args.reader_user}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,168 +0,0 @@
#!/usr/bin/env python3
"""Sync selected upstream MobileModels data into this repository."""
from __future__ import annotations
import argparse
import filecmp
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
DEFAULT_REPO_URL = "https://github.com/KHwang9883/MobileModels.git"
DEFAULT_BRANCH = "master"
SYNC_PATHS = [
"brands",
"misc",
"README.md",
"README_en.md",
"CHANGELOG.md",
"CHANGELOG_en.md",
"LICENSE.txt",
]
def run(cmd: list[str], cwd: Path | None = None) -> None:
subprocess.run(cmd, cwd=cwd or REPO_ROOT, check=True)
def remove_path(path: Path) -> None:
if path.is_dir():
shutil.rmtree(path)
elif path.exists():
path.unlink()
def sync_path(src: Path, dst: Path) -> None:
if src.is_dir():
dst.mkdir(parents=True, exist_ok=True)
source_children = {child.name for child in src.iterdir()}
for existing in dst.iterdir():
if existing.name not in source_children:
remove_path(existing)
for child in src.iterdir():
sync_path(child, dst / child.name)
return
dst.parent.mkdir(parents=True, exist_ok=True)
if dst.exists() and filecmp.cmp(src, dst, shallow=False):
return
shutil.copy2(src, dst)
def sync_selected_paths(upstream_root: Path) -> None:
for relative_path in SYNC_PATHS:
src = upstream_root / relative_path
dst = REPO_ROOT / relative_path
if not src.exists():
raise FileNotFoundError(f"Missing upstream path: {relative_path}")
sync_path(src, dst)
def build_index(output_path: str) -> None:
run(
[
sys.executable,
str(REPO_ROOT / "tools/device_mapper.py"),
"build",
"--output",
output_path,
]
)
def export_mysql_seed(output_path: str) -> None:
run(
[
sys.executable,
str(REPO_ROOT / "tools/export_mysql_seed.py"),
"--output",
output_path,
]
)
def load_mysql_seed(seed_path: str) -> None:
run(
[
sys.executable,
str(REPO_ROOT / "tools/load_mysql_seed.py"),
"--seed",
seed_path,
]
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Sync upstream MobileModels raw data and optionally rebuild the device index."
)
parser.add_argument("--repo-url", default=DEFAULT_REPO_URL, help="Upstream git repository URL")
parser.add_argument("--branch", default=DEFAULT_BRANCH, help="Upstream branch to sync from")
parser.add_argument(
"--build-index",
action="store_true",
help="Rebuild dist/device_index.json after syncing upstream data",
)
parser.add_argument(
"--index-output",
default="dist/device_index.json",
help="Output path for the rebuilt device index",
)
parser.add_argument(
"--export-mysql-seed",
action="store_true",
help="Export MySQL seed SQL after syncing upstream data",
)
parser.add_argument(
"--mysql-seed-output",
default="dist/mobilemodels_mysql_seed.sql",
help="Output path for the exported MySQL seed SQL",
)
parser.add_argument(
"--load-mysql",
action="store_true",
help="Load schema and seed data into MySQL after exporting seed SQL",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
with tempfile.TemporaryDirectory(prefix="mobilemodels-upstream-") as tmpdir:
upstream_root = Path(tmpdir) / "upstream"
run(
[
"git",
"clone",
"--depth",
"1",
"--branch",
args.branch,
args.repo_url,
str(upstream_root),
]
)
sync_selected_paths(upstream_root)
if args.build_index:
build_index(args.index_output)
if args.export_mysql_seed or args.load_mysql:
export_mysql_seed(args.mysql_seed_output)
if args.load_mysql:
load_mysql_seed(args.mysql_seed_output)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,347 +0,0 @@
#!/usr/bin/env python3
"""Compose-facing web server for MobileModels static pages and maintenance APIs."""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import threading
from datetime import datetime
from http import HTTPStatus
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from sync_upstream_mobilemodels import DEFAULT_BRANCH, DEFAULT_REPO_URL, REPO_ROOT
SYNC_SCRIPT = REPO_ROOT / "tools/sync_upstream_mobilemodels.py"
INDEX_PATH = REPO_ROOT / "dist/device_index.json"
MYSQL_SEED_PATH = REPO_ROOT / "dist/mobilemodels_mysql_seed.sql"
MYSQL_LOADER = REPO_ROOT / "tools/load_mysql_seed.py"
DATA_ROOT = Path(os.environ.get("MOBILEMODELS_DATA_ROOT", "/data"))
SYNC_METADATA_PATH = DATA_ROOT / "state/sync_status.json"
SYNC_LOCK = threading.Lock()
NORMALIZE_RE = re.compile(r"[^0-9a-z\u4e00-\u9fff]+")
def run_command(args: list[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
cwd=REPO_ROOT,
text=True,
capture_output=True,
check=False,
)
def normalize_text(text: str) -> str:
return NORMALIZE_RE.sub("", (text or "").lower())
def sql_string(value: str) -> str:
return (value or "").replace("\\", "\\\\").replace("'", "''")
def mysql_command(database: str | None = None) -> list[str]:
command = [
"mysql",
f"--host={os.environ.get('MYSQL_HOST', 'mysql')}",
f"--port={os.environ.get('MYSQL_PORT', '3306')}",
f"--user={os.environ.get('MYSQL_READER_USER', '')}",
"--protocol=TCP",
"--default-character-set=utf8mb4",
"--batch",
"--raw",
]
if database:
command.append(database)
return command
def mysql_env() -> dict[str, str]:
env = os.environ.copy()
env["MYSQL_PWD"] = os.environ.get("MYSQL_READER_PASSWORD", "")
return env
def run_mysql_query(sql: str, database: str | None = None) -> list[dict[str, str | None]]:
proc = subprocess.run(
mysql_command(database=database),
env=mysql_env(),
input=sql,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if proc.returncode != 0:
message = proc.stderr.strip() or proc.stdout.strip() or f"mysql exited with {proc.returncode}"
raise RuntimeError(message)
lines = [line for line in proc.stdout.splitlines() if line.strip()]
if not lines:
return []
headers = lines[0].split("\t")
rows: list[dict[str, str | None]] = []
for line in lines[1:]:
values = line.split("\t")
row = {}
for idx, header in enumerate(headers):
value = values[idx] if idx < len(values) else ""
row[header] = None if value == "NULL" else value
rows.append(row)
return rows
def build_sql_query_payload(payload: dict[str, object]) -> dict[str, object]:
raw_value = str(payload.get("model_raw") or payload.get("model") or "").strip()
if not raw_value:
raise RuntimeError("请填写设备标识。")
alias_norm = normalize_text(raw_value)
if not alias_norm:
raise RuntimeError("设备标识无法归一化,请检查输入。")
limit_value = payload.get("limit", 20)
try:
limit = int(limit_value)
except Exception as err:
raise RuntimeError("limit 必须是数字。") from err
limit = max(1, min(limit, 100))
sql = f"""
SELECT
model,
record_id,
alias_norm,
device_name,
brand,
manufacturer_brand,
parent_brand,
market_brand,
device_type,
source_file,
section,
source_rank,
source_weight,
code,
code_alias,
ver_name
FROM mobilemodels.mm_device_catalog
WHERE alias_norm = '{sql_string(alias_norm)}'
ORDER BY source_rank ASC, record_id ASC
LIMIT {limit};
""".strip()
rows = run_mysql_query(sql)
return {
"query_mode": "sql",
"model_raw": raw_value,
"alias_norm": alias_norm,
"limit": limit,
"sql": sql,
"rows": rows,
"row_count": len(rows),
}
def read_sync_metadata() -> dict[str, object]:
if not SYNC_METADATA_PATH.exists():
return {}
try:
return json.loads(SYNC_METADATA_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
def write_sync_metadata(payload: dict[str, object]) -> None:
SYNC_METADATA_PATH.parent.mkdir(parents=True, exist_ok=True)
SYNC_METADATA_PATH.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def get_status_payload() -> dict[str, object]:
index_mtime = None
mysql_seed_mtime = None
if INDEX_PATH.exists():
index_mtime = datetime.fromtimestamp(INDEX_PATH.stat().st_mtime).isoformat(timespec="seconds")
if MYSQL_SEED_PATH.exists():
mysql_seed_mtime = datetime.fromtimestamp(MYSQL_SEED_PATH.stat().st_mtime).isoformat(timespec="seconds")
mysql_host = os.environ.get("MYSQL_HOST", "mysql")
mysql_port = os.environ.get("MYSQL_PORT", "3306")
mysql_database = os.environ.get("MYSQL_DATABASE", "mobilemodels")
mysql_reader_user = os.environ.get("MYSQL_READER_USER", "")
mysql_reader_password = os.environ.get("MYSQL_READER_PASSWORD", "")
mysql_ready = False
mysql_status = ""
sync_metadata = read_sync_metadata()
mysql_proc = run_command(["python3", str(MYSQL_LOADER), "--check-only", "--wait-timeout", "5"])
if mysql_proc.returncode == 0:
mysql_ready = True
mysql_status = mysql_proc.stdout.strip() or "MySQL ready"
else:
mysql_status = mysql_proc.stderr.strip() or mysql_proc.stdout.strip() or "MySQL unavailable"
return {
"supports_upstream_sync": True,
"storage_mode": "docker_volume",
"repo_root": str(REPO_ROOT),
"data_root": str(DATA_ROOT),
"upstream_repo_url": DEFAULT_REPO_URL,
"upstream_branch": DEFAULT_BRANCH,
"last_sync_time": sync_metadata.get("last_sync_time"),
"last_upstream_commit": sync_metadata.get("last_upstream_commit"),
"index_file": str(INDEX_PATH.relative_to(REPO_ROOT)),
"index_mtime": index_mtime,
"mysql_seed_file": str(MYSQL_SEED_PATH.relative_to(REPO_ROOT)),
"mysql_seed_mtime": mysql_seed_mtime,
"mysql_host": mysql_host,
"mysql_port": mysql_port,
"mysql_database": mysql_database,
"mysql_reader_user": mysql_reader_user,
"mysql_reader_password": mysql_reader_password,
"mysql_ready": mysql_ready,
"mysql_status": mysql_status,
}
def run_upstream_sync() -> dict[str, object]:
if not SYNC_LOCK.acquire(blocking=False):
raise RuntimeError("已有同步任务在执行,请稍后再试。")
try:
upstream_proc = run_command(
["git", "ls-remote", DEFAULT_REPO_URL, f"refs/heads/{DEFAULT_BRANCH}"]
)
upstream_commit = ""
if upstream_proc.returncode == 0 and upstream_proc.stdout.strip():
upstream_commit = upstream_proc.stdout.split()[0]
proc = run_command([
"python3",
str(SYNC_SCRIPT),
"--build-index",
"--export-mysql-seed",
"--load-mysql",
])
output = "\n".join(
part for part in [proc.stdout.strip(), proc.stderr.strip()] if part
).strip()
if proc.returncode != 0:
raise RuntimeError(output or f"sync script failed with exit code {proc.returncode}")
payload = {
"storage_mode": "docker_volume",
"repo_root": str(REPO_ROOT),
"data_root": str(DATA_ROOT),
"upstream_repo_url": DEFAULT_REPO_URL,
"upstream_branch": DEFAULT_BRANCH,
"upstream_commit": upstream_commit,
"last_sync_time": datetime.now().isoformat(timespec="seconds"),
"last_upstream_commit": upstream_commit,
"index_file": str(INDEX_PATH.relative_to(REPO_ROOT)),
"index_mtime": datetime.fromtimestamp(INDEX_PATH.stat().st_mtime).isoformat(timespec="seconds")
if INDEX_PATH.exists()
else None,
"mysql_seed_file": str(MYSQL_SEED_PATH.relative_to(REPO_ROOT)),
"mysql_seed_mtime": datetime.fromtimestamp(MYSQL_SEED_PATH.stat().st_mtime).isoformat(timespec="seconds")
if MYSQL_SEED_PATH.exists()
else None,
"output": output or "同步脚本执行完成。",
}
write_sync_metadata({
"last_sync_time": payload["last_sync_time"],
"last_upstream_commit": payload["last_upstream_commit"],
"upstream_repo_url": DEFAULT_REPO_URL,
"upstream_branch": DEFAULT_BRANCH,
})
return payload
finally:
SYNC_LOCK.release()
class MobileModelsHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(REPO_ROOT), **kwargs)
def guess_type(self, path: str) -> str:
content_type = super().guess_type(path)
lower_path = path.lower()
if lower_path.endswith(".md"):
return "text/markdown; charset=utf-8"
if lower_path.endswith(".txt"):
return "text/plain; charset=utf-8"
if content_type.startswith("text/") and "charset=" not in content_type:
return f"{content_type}; charset=utf-8"
return content_type
def _send_json(self, payload: dict[str, object], status: int = HTTPStatus.OK) -> None:
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self.wfile.write(data)
def do_GET(self) -> None:
if self.path == "/api/status":
try:
self._send_json(get_status_payload())
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
return super().do_GET()
def do_POST(self) -> None:
if self.path == "/api/sync-upstream":
try:
payload = run_upstream_sync()
self._send_json(payload)
except RuntimeError as err:
status = HTTPStatus.CONFLICT if "已有同步任务" in str(err) else HTTPStatus.INTERNAL_SERVER_ERROR
self._send_json({"error": str(err)}, status=status)
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
if self.path == "/api/query-sql":
try:
content_length = int(self.headers.get("Content-Length", "0") or "0")
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
req = json.loads(raw_body.decode("utf-8") or "{}")
payload = build_sql_query_payload(req if isinstance(req, dict) else {})
self._send_json(payload)
except RuntimeError as err:
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
self._send_json({"error": "Not found"}, status=HTTPStatus.NOT_FOUND)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run the MobileModels web server inside Docker Compose.")
parser.add_argument("--host", default="127.0.0.1", help="Bind host")
parser.add_argument("--port", type=int, default=8123, help="Bind port")
return parser.parse_args()
def main() -> int:
args = parse_args()
server = ThreadingHTTPServer((args.host, args.port), MobileModelsHandler)
print(f"Serving MobileModels on http://{args.host}:{args.port}")
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())