feat: dockerize app and unify query management UI
This commit is contained in:
281
tools/export_mysql_seed.py
Normal file
281
tools/export_mysql_seed.py
Normal file
@@ -0,0 +1,281 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Export MobileModels records into MySQL-friendly seed SQL."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
from device_mapper import (
|
||||
MARKET_BRAND_ALIASES,
|
||||
MARKET_BRAND_TO_MANUFACTURER,
|
||||
build_records,
|
||||
brand_aliases,
|
||||
normalize_text,
|
||||
resolve_parent_brand,
|
||||
)
|
||||
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
LEGACY_CODE_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9,._/+\\-]{1,63}$")
|
||||
|
||||
|
||||
def is_cn_source_file(source_file: str) -> bool:
|
||||
return source_file.endswith("_cn.md")
|
||||
|
||||
|
||||
def build_source_order(records: list[object]) -> list[str]:
|
||||
source_files = sorted({record.source_file for record in records})
|
||||
cn = [source for source in source_files if is_cn_source_file(source)]
|
||||
other = [source for source in source_files if not is_cn_source_file(source)]
|
||||
return sorted(cn) + sorted(other)
|
||||
|
||||
|
||||
def build_source_weights(records: list[object]) -> tuple[dict[str, int], dict[str, float]]:
|
||||
order = build_source_order(records)
|
||||
total = len(order)
|
||||
rank_map: dict[str, int] = {}
|
||||
weight_map: dict[str, float] = {}
|
||||
|
||||
for idx, source_file in enumerate(order):
|
||||
rank = idx + 1
|
||||
weight = (((total - idx) / total) * 6) if total > 1 else 6
|
||||
rank_map[source_file] = rank
|
||||
weight_map[source_file] = round(weight, 3)
|
||||
|
||||
return rank_map, weight_map
|
||||
|
||||
|
||||
def sql_quote(value: object | None) -> str:
|
||||
if value is None:
|
||||
return "NULL"
|
||||
if isinstance(value, bool):
|
||||
return "1" if value else "0"
|
||||
if isinstance(value, (int, float)):
|
||||
return str(value)
|
||||
text = str(value)
|
||||
text = text.replace("\\", "\\\\").replace("'", "\\'")
|
||||
return f"'{text}'"
|
||||
|
||||
|
||||
def batched(items: list[tuple[str, ...]], batch_size: int) -> Iterable[list[tuple[str, ...]]]:
|
||||
for start in range(0, len(items), batch_size):
|
||||
yield items[start:start + batch_size]
|
||||
|
||||
|
||||
def build_catalog_rows(records: list[object]) -> list[tuple[str, ...]]:
|
||||
rank_map, weight_map = build_source_weights(records)
|
||||
rows = []
|
||||
seen_keys: set[tuple[str, str]] = set()
|
||||
for record in records:
|
||||
aliases = sorted({alias.strip() for alias in record.aliases if alias.strip()})
|
||||
code_aliases = [alias for alias in aliases if is_legacy_code_alias(alias)]
|
||||
primary_code = code_aliases[0] if code_aliases else None
|
||||
other_codes = [alias for alias in code_aliases if alias != primary_code]
|
||||
code_alias = " | ".join(other_codes) if other_codes else None
|
||||
version_names = [alias for alias in aliases if not is_legacy_code_alias(alias)]
|
||||
ver_name = " | ".join(version_names) if version_names else None
|
||||
|
||||
for alias in aliases:
|
||||
alias_norm = normalize_text(alias)
|
||||
if not alias_norm:
|
||||
continue
|
||||
dedupe_key = (record.id, alias_norm)
|
||||
if dedupe_key in seen_keys:
|
||||
continue
|
||||
seen_keys.add(dedupe_key)
|
||||
rows.append((
|
||||
sql_quote(record.id),
|
||||
sql_quote(alias),
|
||||
sql_quote(alias_norm),
|
||||
sql_quote(record.device_name),
|
||||
sql_quote(record.brand),
|
||||
sql_quote(record.manufacturer_brand),
|
||||
sql_quote(record.parent_brand),
|
||||
sql_quote(record.market_brand),
|
||||
sql_quote(record.device_type),
|
||||
sql_quote(primary_code),
|
||||
sql_quote(code_alias),
|
||||
sql_quote(ver_name),
|
||||
sql_quote(record.source_file),
|
||||
sql_quote(record.section),
|
||||
sql_quote(rank_map[record.source_file]),
|
||||
sql_quote(f"{weight_map[record.source_file]:.3f}"),
|
||||
))
|
||||
|
||||
rows.sort(key=lambda item: (item[2], item[14], item[0], item[1]))
|
||||
return rows
|
||||
|
||||
|
||||
def build_brand_rows(records: list[object]) -> list[tuple[str, ...]]:
|
||||
manufacturer_brands = sorted({record.manufacturer_brand for record in records})
|
||||
parent_brands = sorted({record.parent_brand for record in records})
|
||||
rows: dict[tuple[str, str], tuple[str, ...]] = {}
|
||||
|
||||
for brand in manufacturer_brands:
|
||||
parent_brand = resolve_parent_brand(brand)
|
||||
for alias in brand_aliases(brand):
|
||||
alias_norm = normalize_text(alias)
|
||||
if not alias_norm:
|
||||
continue
|
||||
rows[(alias_norm, "manufacturer")] = (
|
||||
sql_quote(alias_norm),
|
||||
sql_quote("manufacturer"),
|
||||
sql_quote(brand),
|
||||
sql_quote(brand),
|
||||
sql_quote(parent_brand),
|
||||
sql_quote(None),
|
||||
)
|
||||
|
||||
for brand in parent_brands:
|
||||
for alias in brand_aliases(brand):
|
||||
alias_norm = normalize_text(alias)
|
||||
if not alias_norm:
|
||||
continue
|
||||
rows[(alias_norm, "parent")] = (
|
||||
sql_quote(alias_norm),
|
||||
sql_quote("parent"),
|
||||
sql_quote(brand),
|
||||
sql_quote(None),
|
||||
sql_quote(brand),
|
||||
sql_quote(None),
|
||||
)
|
||||
|
||||
for market_brand, aliases in MARKET_BRAND_ALIASES.items():
|
||||
manufacturer_brand = MARKET_BRAND_TO_MANUFACTURER.get(market_brand, market_brand)
|
||||
parent_brand = resolve_parent_brand(manufacturer_brand)
|
||||
for alias in sorted(set([market_brand, *aliases])):
|
||||
alias_norm = normalize_text(alias)
|
||||
if not alias_norm:
|
||||
continue
|
||||
rows[(alias_norm, "market")] = (
|
||||
sql_quote(alias_norm),
|
||||
sql_quote("market"),
|
||||
sql_quote(market_brand),
|
||||
sql_quote(manufacturer_brand),
|
||||
sql_quote(parent_brand),
|
||||
sql_quote(market_brand),
|
||||
)
|
||||
|
||||
return [rows[key] for key in sorted(rows)]
|
||||
|
||||
|
||||
def is_legacy_code_alias(text: str) -> bool:
|
||||
value = (text or "").strip()
|
||||
if not value or not LEGACY_CODE_RE.match(value):
|
||||
return False
|
||||
return any(ch.isdigit() for ch in value)
|
||||
|
||||
|
||||
def append_insert_block(lines: list[str], table_name: str, columns: list[str], rows: list[tuple[str, ...]], batch_size: int = 500) -> None:
|
||||
if not rows:
|
||||
return
|
||||
|
||||
column_sql = ", ".join(f"`{column}`" for column in columns)
|
||||
for chunk in batched(rows, batch_size):
|
||||
values_sql = ",\n".join(f" ({', '.join(row)})" for row in chunk)
|
||||
lines.append(f"INSERT INTO `{table_name}` ({column_sql}) VALUES\n{values_sql};")
|
||||
lines.append("")
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Export MobileModels MySQL seed SQL.")
|
||||
parser.add_argument(
|
||||
"--repo-root",
|
||||
type=Path,
|
||||
default=REPO_ROOT,
|
||||
help="Path to MobileModels repository root",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
type=Path,
|
||||
default=Path("dist/mobilemodels_mysql_seed.sql"),
|
||||
help="Output SQL path",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
repo_root = args.repo_root.resolve()
|
||||
output_path = args.output if args.output.is_absolute() else repo_root / args.output
|
||||
|
||||
records = build_records(repo_root)
|
||||
device_record_count = len(records)
|
||||
catalog_rows = build_catalog_rows(records)
|
||||
brand_rows = build_brand_rows(records)
|
||||
|
||||
lines = [
|
||||
"-- MobileModels MySQL seed",
|
||||
"-- Generated by tools/export_mysql_seed.py",
|
||||
"USE `mobilemodels`;",
|
||||
"",
|
||||
"START TRANSACTION;",
|
||||
"",
|
||||
"DELETE FROM `mm_device_catalog`;",
|
||||
"DELETE FROM `mm_brand_lookup`;",
|
||||
"",
|
||||
]
|
||||
append_insert_block(
|
||||
lines,
|
||||
"mm_device_catalog",
|
||||
[
|
||||
"record_id",
|
||||
"model",
|
||||
"alias_norm",
|
||||
"device_name",
|
||||
"brand",
|
||||
"manufacturer_brand",
|
||||
"parent_brand",
|
||||
"market_brand",
|
||||
"device_type",
|
||||
"code",
|
||||
"code_alias",
|
||||
"ver_name",
|
||||
"source_file",
|
||||
"section",
|
||||
"source_rank",
|
||||
"source_weight",
|
||||
],
|
||||
catalog_rows,
|
||||
)
|
||||
append_insert_block(
|
||||
lines,
|
||||
"mm_brand_lookup",
|
||||
[
|
||||
"alias_norm",
|
||||
"alias_type",
|
||||
"canonical_brand",
|
||||
"manufacturer_brand",
|
||||
"parent_brand",
|
||||
"market_brand",
|
||||
],
|
||||
brand_rows,
|
||||
)
|
||||
|
||||
lines.extend([
|
||||
"COMMIT;",
|
||||
"",
|
||||
f"-- device_records: {device_record_count}",
|
||||
f"-- device_catalog_rows: {len(catalog_rows)}",
|
||||
f"-- device_lookup_rows: {len(catalog_rows)}",
|
||||
f"-- brand_lookup_rows: {len(brand_rows)}",
|
||||
f"-- legacy_models_rows: {len(catalog_rows)}",
|
||||
"",
|
||||
])
|
||||
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text("\n".join(lines), encoding="utf-8")
|
||||
print(f"Exported MySQL seed: {output_path}")
|
||||
print(f"device_records={device_record_count}")
|
||||
print(f"device_catalog_rows={len(catalog_rows)}")
|
||||
print(f"device_lookup_rows={len(catalog_rows)}")
|
||||
print(f"brand_lookup_rows={len(brand_rows)}")
|
||||
print(f"legacy_models_rows={len(catalog_rows)}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user