Add manual catalog management
This commit is contained in:
@@ -85,6 +85,8 @@ FILE_DEFAULT_DEVICE_TYPE: Dict[str, str] = {
|
||||
"zhixuan": "phone",
|
||||
}
|
||||
|
||||
MANUAL_SOURCE_FILE = "local/manual_catalog.json"
|
||||
|
||||
|
||||
BRAND_ALIASES: Dict[str, List[str]] = {
|
||||
"360": ["360", "360手机", "奇酷", "qiku"],
|
||||
@@ -239,6 +241,10 @@ class DeviceRecord:
|
||||
section: str
|
||||
|
||||
|
||||
MANUAL_BRAND_ALIAS_OVERRIDES: Dict[str, List[str]] = {}
|
||||
MANUAL_PARENT_BRAND_OVERRIDES: Dict[str, str] = {}
|
||||
|
||||
|
||||
def normalize_text(text: str) -> str:
|
||||
return re.sub(r"[^0-9a-z\u4e00-\u9fff]+", "", text.lower())
|
||||
|
||||
@@ -249,6 +255,7 @@ def canonical_brand(file_stem: str) -> str:
|
||||
|
||||
def brand_aliases(brand: str) -> List[str]:
|
||||
aliases = set(BRAND_ALIASES.get(brand, []))
|
||||
aliases.update(MANUAL_BRAND_ALIAS_OVERRIDES.get(brand, []))
|
||||
aliases.add(brand)
|
||||
return sorted(aliases)
|
||||
|
||||
@@ -265,9 +272,108 @@ def has_keyword(text: str, keywords: Iterable[str]) -> bool:
|
||||
|
||||
|
||||
def resolve_parent_brand(manufacturer_brand: str) -> str:
|
||||
if manufacturer_brand in MANUAL_PARENT_BRAND_OVERRIDES:
|
||||
return MANUAL_PARENT_BRAND_OVERRIDES[manufacturer_brand]
|
||||
return MANUFACTURER_PARENT_BRAND.get(manufacturer_brand, manufacturer_brand)
|
||||
|
||||
|
||||
def reset_manual_overrides() -> None:
|
||||
MANUAL_BRAND_ALIAS_OVERRIDES.clear()
|
||||
MANUAL_PARENT_BRAND_OVERRIDES.clear()
|
||||
|
||||
|
||||
def normalize_alias_list(*groups: object) -> List[str]:
|
||||
aliases: List[str] = []
|
||||
seen: Set[str] = set()
|
||||
for group in groups:
|
||||
if group is None:
|
||||
continue
|
||||
items = group if isinstance(group, (list, tuple, set)) else [group]
|
||||
for item in items:
|
||||
text = str(item or "").strip()
|
||||
key = normalize_text(text)
|
||||
if not text or not key or key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
aliases.append(text)
|
||||
return aliases
|
||||
|
||||
|
||||
def load_manual_catalog(repo_root: Path) -> dict[str, object]:
|
||||
path = repo_root / MANUAL_SOURCE_FILE
|
||||
if not path.exists():
|
||||
return {"brands": [], "devices": []}
|
||||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||
if not isinstance(payload, dict):
|
||||
raise RuntimeError(f"{MANUAL_SOURCE_FILE} 必须是 JSON 对象。")
|
||||
brands = payload.get("brands")
|
||||
devices = payload.get("devices")
|
||||
return {
|
||||
"brands": brands if isinstance(brands, list) else [],
|
||||
"devices": devices if isinstance(devices, list) else [],
|
||||
}
|
||||
|
||||
|
||||
def apply_manual_brand_overrides(manual_catalog: dict[str, object]) -> dict[str, str]:
|
||||
reset_manual_overrides()
|
||||
brand_parent_map: dict[str, str] = {}
|
||||
for raw_brand in manual_catalog.get("brands", []):
|
||||
if not isinstance(raw_brand, dict):
|
||||
continue
|
||||
brand_name = str(raw_brand.get("name") or "").strip()
|
||||
if not brand_name:
|
||||
continue
|
||||
aliases = normalize_alias_list(brand_name, raw_brand.get("aliases"))
|
||||
parent_brand = str(raw_brand.get("parent_brand") or brand_name).strip() or brand_name
|
||||
MANUAL_BRAND_ALIAS_OVERRIDES[brand_name] = aliases
|
||||
MANUAL_PARENT_BRAND_OVERRIDES[brand_name] = parent_brand
|
||||
brand_parent_map[brand_name] = parent_brand
|
||||
return brand_parent_map
|
||||
|
||||
|
||||
def parse_manual_catalog(repo_root: Path, manual_catalog: dict[str, object]) -> List[DeviceRecord]:
|
||||
brand_parent_map = apply_manual_brand_overrides(manual_catalog)
|
||||
records: List[DeviceRecord] = []
|
||||
|
||||
for raw_device in manual_catalog.get("devices", []):
|
||||
if not isinstance(raw_device, dict):
|
||||
continue
|
||||
brand = str(raw_device.get("brand") or "").strip()
|
||||
device_name = str(raw_device.get("device_name") or "").strip()
|
||||
if not brand or not device_name:
|
||||
continue
|
||||
|
||||
aliases = normalize_alias_list(
|
||||
raw_device.get("models"),
|
||||
device_name,
|
||||
raw_device.get("aliases"),
|
||||
)
|
||||
if not aliases:
|
||||
continue
|
||||
|
||||
device_type = str(raw_device.get("device_type") or "").strip() or "other"
|
||||
section = str(raw_device.get("section") or "手动补录").strip() or "手动补录"
|
||||
record_id = str(raw_device.get("id") or "").strip() or f"manual:{normalize_text(brand)}:{normalize_text(device_name)}"
|
||||
parent_brand = brand_parent_map.get(brand, str(raw_device.get("parent_brand") or brand).strip() or brand)
|
||||
|
||||
records.append(
|
||||
DeviceRecord(
|
||||
id=record_id,
|
||||
device_name=device_name,
|
||||
brand=brand,
|
||||
manufacturer_brand=brand,
|
||||
parent_brand=parent_brand,
|
||||
market_brand=brand,
|
||||
device_type=device_type,
|
||||
aliases=aliases,
|
||||
source_file=MANUAL_SOURCE_FILE,
|
||||
section=section,
|
||||
)
|
||||
)
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def infer_market_brand(
|
||||
manufacturer_brand: str,
|
||||
device_name: str,
|
||||
@@ -647,10 +753,13 @@ class DeviceMapper:
|
||||
def build_records(repo_root: Path) -> List[DeviceRecord]:
|
||||
brands_dir = repo_root / "brands"
|
||||
records: List[DeviceRecord] = []
|
||||
manual_catalog = load_manual_catalog(repo_root)
|
||||
apply_manual_brand_overrides(manual_catalog)
|
||||
|
||||
for md_path in sorted(brands_dir.glob("*.md")):
|
||||
records.extend(parse_brand_file(md_path))
|
||||
|
||||
records.extend(parse_manual_catalog(repo_root, manual_catalog))
|
||||
return records
|
||||
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
from device_mapper import (
|
||||
MANUAL_SOURCE_FILE,
|
||||
MARKET_BRAND_ALIASES,
|
||||
MARKET_BRAND_TO_MANUFACTURER,
|
||||
build_records,
|
||||
@@ -28,9 +29,11 @@ def is_cn_source_file(source_file: str) -> bool:
|
||||
|
||||
def build_source_order(records: list[object]) -> list[str]:
|
||||
source_files = sorted({record.source_file for record in records})
|
||||
manual = [source for source in source_files if source == MANUAL_SOURCE_FILE]
|
||||
source_files = [source for source in source_files if source != MANUAL_SOURCE_FILE]
|
||||
cn = [source for source in source_files if is_cn_source_file(source)]
|
||||
other = [source for source in source_files if not is_cn_source_file(source)]
|
||||
return sorted(cn) + sorted(other)
|
||||
return manual + sorted(cn) + sorted(other)
|
||||
|
||||
|
||||
def build_source_weights(records: list[object]) -> tuple[dict[str, int], dict[str, float]]:
|
||||
|
||||
@@ -29,6 +29,24 @@ sync_missing_dir_entries() {
|
||||
done
|
||||
}
|
||||
|
||||
migrate_legacy_manual_catalog() {
|
||||
legacy_dir="$DATA_ROOT/workspace/brands/local"
|
||||
target_dir="$DATA_ROOT/workspace/local"
|
||||
legacy_file="$legacy_dir/manual_catalog.json"
|
||||
target_file="$target_dir/manual_catalog.json"
|
||||
|
||||
[ -d "$legacy_dir" ] || return
|
||||
|
||||
mkdir -p "$target_dir"
|
||||
if [ -f "$legacy_file" ] && [ ! -f "$target_file" ]; then
|
||||
cp -a "$legacy_file" "$target_file"
|
||||
fi
|
||||
|
||||
# Older builds accidentally copied local overlay files under brands/local.
|
||||
# Once the real target exists, drop the misplaced directory to avoid confusion.
|
||||
rm -rf "$legacy_dir"
|
||||
}
|
||||
|
||||
init_path() {
|
||||
rel_path="$1"
|
||||
src_path="$APP_ROOT/$rel_path"
|
||||
@@ -82,3 +100,5 @@ for rel_path in \
|
||||
do
|
||||
init_path "$rel_path"
|
||||
done
|
||||
|
||||
migrate_legacy_manual_catalog
|
||||
|
||||
@@ -15,6 +15,7 @@ from http import HTTPStatus
|
||||
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
|
||||
from pathlib import Path
|
||||
|
||||
from device_mapper import build_records
|
||||
from project_layout import PROJECT_ROOT, WORKSPACE_ROOT
|
||||
from sync_upstream_mobilemodels import DEFAULT_BRANCH, DEFAULT_REPO_URL
|
||||
|
||||
@@ -24,6 +25,7 @@ INDEX_PATH = PROJECT_ROOT / "dist/device_index.json"
|
||||
MYSQL_SEED_PATH = PROJECT_ROOT / "dist/mobilemodels_mysql_seed.sql"
|
||||
MYSQL_LOADER = PROJECT_ROOT / "tools/load_mysql_seed.py"
|
||||
DATA_ROOT = Path(os.environ.get("MOBILEMODELS_DATA_ROOT", "/data"))
|
||||
MANUAL_CATALOG_PATH = WORKSPACE_ROOT / "local/manual_catalog.json"
|
||||
SYNC_METADATA_PATH = DATA_ROOT / "state/sync_status.json"
|
||||
SCHEDULE_CONFIG_PATH = DATA_ROOT / "state/sync_schedule.json"
|
||||
MYSQL_CONFIG_PATH = DATA_ROOT / "state/mysql_settings.json"
|
||||
@@ -35,6 +37,7 @@ NORMALIZE_RE = re.compile(r"[^0-9a-z\u4e00-\u9fff]+")
|
||||
SCHEDULE_TIME_RE = re.compile(r"^(?:[01]?\d|2[0-3]):[0-5]\d$")
|
||||
SCHEDULER_POLL_SECONDS = 20
|
||||
INDEX_DEVICE_NAME_ALIAS_MAP: dict[str, list[str]] | None = None
|
||||
DEVICE_TYPES = {"phone", "tablet", "wear", "tv", "computer", "other"}
|
||||
|
||||
|
||||
def truthy_env(name: str, default: str = "0") -> bool:
|
||||
@@ -303,6 +306,334 @@ def sql_string(value: str) -> str:
|
||||
return (value or "").replace("\\", "\\\\").replace("'", "''")
|
||||
|
||||
|
||||
def normalize_alias_list(*groups: object) -> list[str]:
|
||||
aliases: list[str] = []
|
||||
seen: set[str] = set()
|
||||
for group in groups:
|
||||
if group is None:
|
||||
continue
|
||||
items = group if isinstance(group, list) else [group]
|
||||
for item in items:
|
||||
text = str(item or "").strip()
|
||||
key = normalize_text(text)
|
||||
if not text or not key or key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
aliases.append(text)
|
||||
return aliases
|
||||
|
||||
|
||||
def default_manual_catalog() -> dict[str, object]:
|
||||
return {"brands": [], "devices": []}
|
||||
|
||||
|
||||
def read_manual_catalog() -> dict[str, object]:
|
||||
if not MANUAL_CATALOG_PATH.exists():
|
||||
return default_manual_catalog()
|
||||
try:
|
||||
payload = json.loads(MANUAL_CATALOG_PATH.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
return default_manual_catalog()
|
||||
if not isinstance(payload, dict):
|
||||
return default_manual_catalog()
|
||||
brands = payload.get("brands")
|
||||
devices = payload.get("devices")
|
||||
return {
|
||||
"brands": brands if isinstance(brands, list) else [],
|
||||
"devices": devices if isinstance(devices, list) else [],
|
||||
}
|
||||
|
||||
|
||||
def write_manual_catalog(payload: dict[str, object]) -> dict[str, object]:
|
||||
catalog = {
|
||||
"brands": payload.get("brands") if isinstance(payload.get("brands"), list) else [],
|
||||
"devices": payload.get("devices") if isinstance(payload.get("devices"), list) else [],
|
||||
}
|
||||
MANUAL_CATALOG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
MANUAL_CATALOG_PATH.write_text(json.dumps(catalog, ensure_ascii=False, indent=2), encoding="utf-8")
|
||||
return catalog
|
||||
|
||||
|
||||
def canonical_manual_brand(raw: object) -> dict[str, object]:
|
||||
if not isinstance(raw, dict):
|
||||
raise RuntimeError("品牌数据格式无效。")
|
||||
name = str(raw.get("name") or "").strip()
|
||||
if not name:
|
||||
raise RuntimeError("品牌名称不能为空。")
|
||||
parent_brand = str(raw.get("parent_brand") or name).strip() or name
|
||||
aliases = normalize_alias_list(name, raw.get("aliases"))
|
||||
now = local_now().isoformat(timespec="seconds")
|
||||
return {
|
||||
"name": name,
|
||||
"parent_brand": parent_brand,
|
||||
"aliases": aliases,
|
||||
"updated_at": str(raw.get("updated_at") or now),
|
||||
"created_at": str(raw.get("created_at") or now),
|
||||
}
|
||||
|
||||
|
||||
def canonical_manual_device(raw: object, allowed_brands: set[str]) -> dict[str, object]:
|
||||
if not isinstance(raw, dict):
|
||||
raise RuntimeError("设备数据格式无效。")
|
||||
brand = str(raw.get("brand") or "").strip()
|
||||
if not brand:
|
||||
raise RuntimeError("所属品牌不能为空。")
|
||||
if brand not in allowed_brands:
|
||||
raise RuntimeError(f"所属品牌不存在: {brand}")
|
||||
device_name = str(raw.get("device_name") or "").strip()
|
||||
if not device_name:
|
||||
raise RuntimeError("设备名称不能为空。")
|
||||
device_type = str(raw.get("device_type") or "").strip().lower() or "other"
|
||||
if device_type not in DEVICE_TYPES:
|
||||
raise RuntimeError("设备类型无效。")
|
||||
models = normalize_alias_list(raw.get("models"))
|
||||
if not models:
|
||||
raise RuntimeError("设备标识至少保留一个。")
|
||||
aliases = normalize_alias_list(device_name, raw.get("aliases"))
|
||||
section = str(raw.get("section") or "手动补录").strip() or "手动补录"
|
||||
stable_id = f"manual:{normalize_text(brand)}:{normalize_text(device_name)}"
|
||||
now = local_now().isoformat(timespec="seconds")
|
||||
return {
|
||||
"id": str(raw.get("id") or stable_id).strip() or stable_id,
|
||||
"brand": brand,
|
||||
"device_name": device_name,
|
||||
"device_type": device_type,
|
||||
"models": models,
|
||||
"aliases": aliases,
|
||||
"section": section,
|
||||
"updated_at": str(raw.get("updated_at") or now),
|
||||
"created_at": str(raw.get("created_at") or now),
|
||||
}
|
||||
|
||||
|
||||
def validate_manual_catalog(payload: dict[str, object]) -> dict[str, object]:
|
||||
brands_raw = payload.get("brands") if isinstance(payload.get("brands"), list) else []
|
||||
devices_raw = payload.get("devices") if isinstance(payload.get("devices"), list) else []
|
||||
|
||||
brands: list[dict[str, object]] = []
|
||||
seen_brand_keys: set[str] = set()
|
||||
for raw_brand in brands_raw:
|
||||
brand = canonical_manual_brand(raw_brand)
|
||||
brand_key = normalize_text(str(brand["name"]))
|
||||
if brand_key in seen_brand_keys:
|
||||
raise RuntimeError(f"品牌重复: {brand['name']}")
|
||||
seen_brand_keys.add(brand_key)
|
||||
brands.append(brand)
|
||||
|
||||
builtin_brands = load_builtin_brand_names()
|
||||
allowed_brands = builtin_brands | {str(item["name"]) for item in brands}
|
||||
devices: list[dict[str, object]] = []
|
||||
seen_device_ids: set[str] = set()
|
||||
for raw_device in devices_raw:
|
||||
device = canonical_manual_device(raw_device, allowed_brands)
|
||||
if device["id"] in seen_device_ids:
|
||||
raise RuntimeError(f"设备 ID 重复: {device['id']}")
|
||||
seen_device_ids.add(str(device["id"]))
|
||||
devices.append(device)
|
||||
|
||||
return {"brands": brands, "devices": devices}
|
||||
|
||||
|
||||
def load_builtin_brand_names() -> set[str]:
|
||||
records = build_records(WORKSPACE_ROOT)
|
||||
names = {
|
||||
str(record.brand).strip()
|
||||
for record in records
|
||||
if str(record.source_file) != "local/manual_catalog.json"
|
||||
}
|
||||
names.update(
|
||||
str(record.market_brand).strip()
|
||||
for record in records
|
||||
if str(record.source_file) != "local/manual_catalog.json"
|
||||
)
|
||||
names.update(
|
||||
str(record.parent_brand).strip()
|
||||
for record in records
|
||||
if str(record.source_file) != "local/manual_catalog.json"
|
||||
)
|
||||
return {name for name in names if name}
|
||||
|
||||
|
||||
def count_manual_alias_conflicts(device: dict[str, object]) -> int:
|
||||
records = build_records(WORKSPACE_ROOT)
|
||||
alias_set = {
|
||||
normalize_text(alias)
|
||||
for alias in normalize_alias_list(device.get("models"), device.get("aliases"), device.get("device_name"))
|
||||
}
|
||||
conflicts: set[str] = set()
|
||||
for record in records:
|
||||
if str(record.source_file) == "local/manual_catalog.json":
|
||||
continue
|
||||
for alias in getattr(record, "aliases", []):
|
||||
alias_norm = normalize_text(str(alias))
|
||||
if alias_norm and alias_norm in alias_set:
|
||||
conflicts.add(alias_norm)
|
||||
return len(conflicts)
|
||||
|
||||
|
||||
def rebuild_generated_outputs() -> dict[str, object]:
|
||||
build_proc = run_command(
|
||||
[
|
||||
"python3",
|
||||
str(PROJECT_ROOT / "tools/device_mapper.py"),
|
||||
"--repo-root",
|
||||
str(WORKSPACE_ROOT),
|
||||
"build",
|
||||
"--output",
|
||||
str(INDEX_PATH),
|
||||
]
|
||||
)
|
||||
build_output = "\n".join(part for part in [build_proc.stdout.strip(), build_proc.stderr.strip()] if part).strip()
|
||||
if build_proc.returncode != 0:
|
||||
raise RuntimeError(build_output or "重建设备索引失败。")
|
||||
|
||||
seed_proc = run_command(
|
||||
[
|
||||
"python3",
|
||||
str(PROJECT_ROOT / "tools/export_mysql_seed.py"),
|
||||
"--repo-root",
|
||||
str(WORKSPACE_ROOT),
|
||||
"--output",
|
||||
str(MYSQL_SEED_PATH),
|
||||
]
|
||||
)
|
||||
seed_output = "\n".join(part for part in [seed_proc.stdout.strip(), seed_proc.stderr.strip()] if part).strip()
|
||||
if seed_proc.returncode != 0:
|
||||
raise RuntimeError(seed_output or "导出 MySQL seed 失败。")
|
||||
|
||||
mysql_loaded = False
|
||||
mysql_message = "MySQL 未刷新。"
|
||||
if mysql_auto_load_enabled():
|
||||
load_proc = run_command(["python3", str(MYSQL_LOADER)])
|
||||
mysql_message = "\n".join(part for part in [load_proc.stdout.strip(), load_proc.stderr.strip()] if part).strip() or "MySQL 已刷新。"
|
||||
if load_proc.returncode != 0:
|
||||
raise RuntimeError(mysql_message)
|
||||
mysql_loaded = True
|
||||
return {
|
||||
"index_updated": True,
|
||||
"mysql_seed_updated": True,
|
||||
"mysql_loaded": mysql_loaded,
|
||||
"message": "本地覆盖库已保存,索引与 MySQL seed 已刷新。" if mysql_loaded else "本地覆盖库已保存,索引与 MySQL seed 已刷新,MySQL 未自动装载。",
|
||||
"build_output": build_output,
|
||||
"mysql_seed_output": seed_output,
|
||||
"mysql_message": mysql_message,
|
||||
}
|
||||
|
||||
|
||||
def manual_catalog_payload() -> dict[str, object]:
|
||||
catalog = validate_manual_catalog(read_manual_catalog())
|
||||
brands = sorted(catalog["brands"], key=lambda item: str(item["name"]).lower())
|
||||
devices = sorted(catalog["devices"], key=lambda item: (str(item["brand"]).lower(), str(item["device_name"]).lower()))
|
||||
return {
|
||||
"brands": brands,
|
||||
"devices": devices,
|
||||
"stats": {
|
||||
"brand_count": len(brands),
|
||||
"device_count": len(devices),
|
||||
},
|
||||
"catalog_file": str(MANUAL_CATALOG_PATH.relative_to(PROJECT_ROOT)),
|
||||
}
|
||||
|
||||
|
||||
def upsert_manual_brand(payload: dict[str, object]) -> dict[str, object]:
|
||||
if not SYNC_LOCK.acquire(blocking=False):
|
||||
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
|
||||
try:
|
||||
catalog = validate_manual_catalog(read_manual_catalog())
|
||||
incoming = canonical_manual_brand(payload)
|
||||
brand_key = normalize_text(str(incoming["name"]))
|
||||
existing_by_key = {normalize_text(str(item["name"])): item for item in catalog["brands"]}
|
||||
existing = existing_by_key.get(brand_key)
|
||||
if existing:
|
||||
incoming["created_at"] = existing.get("created_at") or incoming["created_at"]
|
||||
catalog["brands"] = [
|
||||
item for item in catalog["brands"] if normalize_text(str(item["name"])) != brand_key
|
||||
]
|
||||
catalog["brands"].append(incoming)
|
||||
validated = validate_manual_catalog(catalog)
|
||||
write_manual_catalog(validated)
|
||||
rebuild_result = rebuild_generated_outputs()
|
||||
return {
|
||||
"saved_brand": incoming,
|
||||
"catalog": manual_catalog_payload(),
|
||||
**rebuild_result,
|
||||
}
|
||||
finally:
|
||||
SYNC_LOCK.release()
|
||||
|
||||
|
||||
def upsert_manual_device(payload: dict[str, object]) -> dict[str, object]:
|
||||
if not SYNC_LOCK.acquire(blocking=False):
|
||||
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
|
||||
try:
|
||||
catalog = validate_manual_catalog(read_manual_catalog())
|
||||
allowed_brands = load_builtin_brand_names() | {str(item["name"]) for item in catalog["brands"]}
|
||||
device_payload = canonical_manual_device(payload, allowed_brands)
|
||||
existing = next((item for item in catalog["devices"] if str(item.get("id")) == str(device_payload["id"])), None)
|
||||
if existing:
|
||||
device_payload["created_at"] = existing.get("created_at") or device_payload["created_at"]
|
||||
catalog["devices"] = [item for item in catalog["devices"] if str(item.get("id")) != str(device_payload["id"])]
|
||||
catalog["devices"].append(device_payload)
|
||||
validated = validate_manual_catalog(catalog)
|
||||
write_manual_catalog(validated)
|
||||
rebuild_result = rebuild_generated_outputs()
|
||||
return {
|
||||
"saved_device": device_payload,
|
||||
"alias_conflict_count": count_manual_alias_conflicts(device_payload),
|
||||
"catalog": manual_catalog_payload(),
|
||||
**rebuild_result,
|
||||
}
|
||||
finally:
|
||||
SYNC_LOCK.release()
|
||||
|
||||
|
||||
def delete_manual_brand(payload: dict[str, object]) -> dict[str, object]:
|
||||
if not SYNC_LOCK.acquire(blocking=False):
|
||||
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
|
||||
try:
|
||||
brand_name = str(payload.get("name") or "").strip()
|
||||
if not brand_name:
|
||||
raise RuntimeError("品牌名称不能为空。")
|
||||
catalog = validate_manual_catalog(read_manual_catalog())
|
||||
active_devices = [item for item in catalog["devices"] if str(item.get("brand") or "").strip() == brand_name]
|
||||
if active_devices:
|
||||
raise RuntimeError("该品牌下仍有关联设备,请先删除设备后再删除品牌。")
|
||||
next_brands = [item for item in catalog["brands"] if str(item.get("name") or "").strip() != brand_name]
|
||||
if len(next_brands) == len(catalog["brands"]):
|
||||
raise RuntimeError(f"未找到品牌: {brand_name}")
|
||||
write_manual_catalog({"brands": next_brands, "devices": catalog["devices"]})
|
||||
rebuild_result = rebuild_generated_outputs()
|
||||
return {
|
||||
"deleted_brand": brand_name,
|
||||
"catalog": manual_catalog_payload(),
|
||||
**rebuild_result,
|
||||
}
|
||||
finally:
|
||||
SYNC_LOCK.release()
|
||||
|
||||
|
||||
def delete_manual_device(payload: dict[str, object]) -> dict[str, object]:
|
||||
if not SYNC_LOCK.acquire(blocking=False):
|
||||
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
|
||||
try:
|
||||
device_id = str(payload.get("id") or "").strip()
|
||||
if not device_id:
|
||||
raise RuntimeError("设备 ID 不能为空。")
|
||||
catalog = validate_manual_catalog(read_manual_catalog())
|
||||
next_devices = [item for item in catalog["devices"] if str(item.get("id") or "").strip() != device_id]
|
||||
if len(next_devices) == len(catalog["devices"]):
|
||||
raise RuntimeError(f"未找到设备: {device_id}")
|
||||
write_manual_catalog({"brands": catalog["brands"], "devices": next_devices})
|
||||
rebuild_result = rebuild_generated_outputs()
|
||||
return {
|
||||
"deleted_device": device_id,
|
||||
"catalog": manual_catalog_payload(),
|
||||
**rebuild_result,
|
||||
}
|
||||
finally:
|
||||
SYNC_LOCK.release()
|
||||
|
||||
|
||||
def parse_apple_series_generation(name: str) -> dict[str, object] | None:
|
||||
text = re.sub(r"\s+", " ", str(name or "").strip())
|
||||
if not text:
|
||||
@@ -935,6 +1266,14 @@ class MobileModelsHandler(SimpleHTTPRequestHandler):
|
||||
except Exception as err:
|
||||
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
||||
return
|
||||
if self.path == "/api/manual-catalog":
|
||||
try:
|
||||
self._send_json(manual_catalog_payload())
|
||||
except RuntimeError as err:
|
||||
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
|
||||
except Exception as err:
|
||||
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
||||
return
|
||||
return super().do_GET()
|
||||
|
||||
def do_POST(self) -> None:
|
||||
@@ -1032,6 +1371,28 @@ class MobileModelsHandler(SimpleHTTPRequestHandler):
|
||||
except Exception as err:
|
||||
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
||||
return
|
||||
if self.path in {"/api/manual-brand", "/api/manual-device", "/api/manual-brand-delete", "/api/manual-device-delete"}:
|
||||
try:
|
||||
content_length = int(self.headers.get("Content-Length", "0") or "0")
|
||||
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
|
||||
req = json.loads(raw_body.decode("utf-8") or "{}")
|
||||
if not isinstance(req, dict):
|
||||
raise RuntimeError("请求体必须是 JSON 对象。")
|
||||
if self.path == "/api/manual-brand":
|
||||
payload = upsert_manual_brand(req)
|
||||
elif self.path == "/api/manual-device":
|
||||
payload = upsert_manual_device(req)
|
||||
elif self.path == "/api/manual-brand-delete":
|
||||
payload = delete_manual_brand(req)
|
||||
else:
|
||||
payload = delete_manual_device(req)
|
||||
self._send_json(payload)
|
||||
except RuntimeError as err:
|
||||
status = HTTPStatus.CONFLICT if "已有同步或数据重建任务" in str(err) else HTTPStatus.BAD_REQUEST
|
||||
self._send_json({"error": str(err)}, status=status)
|
||||
except Exception as err:
|
||||
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
||||
return
|
||||
|
||||
self._send_json({"error": "Not found"}, status=HTTPStatus.NOT_FOUND)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user