#!/usr/bin/env python3 """Export MobileModels records into MySQL-friendly seed SQL.""" from __future__ import annotations import argparse import re from pathlib import Path from typing import Iterable from device_mapper import ( MARKET_BRAND_ALIASES, MARKET_BRAND_TO_MANUFACTURER, build_records, brand_aliases, normalize_text, resolve_parent_brand, ) from project_layout import PROJECT_ROOT, WORKSPACE_ROOT LEGACY_CODE_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9,._/+\\-]{1,63}$") def is_cn_source_file(source_file: str) -> bool: return source_file.endswith("_cn.md") def build_source_order(records: list[object]) -> list[str]: source_files = sorted({record.source_file for record in records}) cn = [source for source in source_files if is_cn_source_file(source)] other = [source for source in source_files if not is_cn_source_file(source)] return sorted(cn) + sorted(other) def build_source_weights(records: list[object]) -> tuple[dict[str, int], dict[str, float]]: order = build_source_order(records) total = len(order) rank_map: dict[str, int] = {} weight_map: dict[str, float] = {} for idx, source_file in enumerate(order): rank = idx + 1 weight = (((total - idx) / total) * 6) if total > 1 else 6 rank_map[source_file] = rank weight_map[source_file] = round(weight, 3) return rank_map, weight_map def sql_quote(value: object | None) -> str: if value is None: return "NULL" if isinstance(value, bool): return "1" if value else "0" if isinstance(value, (int, float)): return str(value) text = str(value) text = text.replace("\\", "\\\\").replace("'", "\\'") return f"'{text}'" def batched(items: list[tuple[str, ...]], batch_size: int) -> Iterable[list[tuple[str, ...]]]: for start in range(0, len(items), batch_size): yield items[start:start + batch_size] def build_catalog_rows(records: list[object]) -> list[tuple[str, ...]]: rank_map, weight_map = build_source_weights(records) rows = [] seen_keys: set[tuple[str, str]] = set() for record in records: aliases = sorted({alias.strip() for alias in record.aliases if alias.strip()}) code_aliases = [alias for alias in aliases if is_legacy_code_alias(alias)] primary_code = code_aliases[0] if code_aliases else None other_codes = [alias for alias in code_aliases if alias != primary_code] code_alias = " | ".join(other_codes) if other_codes else None version_names = [alias for alias in aliases if not is_legacy_code_alias(alias)] ver_name = " | ".join(version_names) if version_names else None for alias in aliases: alias_norm = normalize_text(alias) if not alias_norm: continue dedupe_key = (record.id, alias_norm) if dedupe_key in seen_keys: continue seen_keys.add(dedupe_key) rows.append(( sql_quote(record.id), sql_quote(alias), sql_quote(alias_norm), sql_quote(record.device_name), sql_quote(record.brand), sql_quote(record.manufacturer_brand), sql_quote(record.parent_brand), sql_quote(record.market_brand), sql_quote(record.device_type), sql_quote(primary_code), sql_quote(code_alias), sql_quote(ver_name), sql_quote(record.source_file), sql_quote(record.section), sql_quote(rank_map[record.source_file]), sql_quote(f"{weight_map[record.source_file]:.3f}"), )) rows.sort(key=lambda item: (item[2], item[14], item[0], item[1])) return rows def build_brand_rows(records: list[object]) -> list[tuple[str, ...]]: manufacturer_brands = sorted({record.manufacturer_brand for record in records}) parent_brands = sorted({record.parent_brand for record in records}) rows: dict[tuple[str, str], tuple[str, ...]] = {} for brand in manufacturer_brands: parent_brand = resolve_parent_brand(brand) for alias in brand_aliases(brand): alias_norm = normalize_text(alias) if not alias_norm: continue rows[(alias_norm, "manufacturer")] = ( sql_quote(alias_norm), sql_quote("manufacturer"), sql_quote(brand), sql_quote(brand), sql_quote(parent_brand), sql_quote(None), ) for brand in parent_brands: for alias in brand_aliases(brand): alias_norm = normalize_text(alias) if not alias_norm: continue rows[(alias_norm, "parent")] = ( sql_quote(alias_norm), sql_quote("parent"), sql_quote(brand), sql_quote(None), sql_quote(brand), sql_quote(None), ) for market_brand, aliases in MARKET_BRAND_ALIASES.items(): manufacturer_brand = MARKET_BRAND_TO_MANUFACTURER.get(market_brand, market_brand) parent_brand = resolve_parent_brand(manufacturer_brand) for alias in sorted(set([market_brand, *aliases])): alias_norm = normalize_text(alias) if not alias_norm: continue rows[(alias_norm, "market")] = ( sql_quote(alias_norm), sql_quote("market"), sql_quote(market_brand), sql_quote(manufacturer_brand), sql_quote(parent_brand), sql_quote(market_brand), ) return [rows[key] for key in sorted(rows)] def is_legacy_code_alias(text: str) -> bool: value = (text or "").strip() if not value or not LEGACY_CODE_RE.match(value): return False return any(ch.isdigit() for ch in value) def append_insert_block(lines: list[str], table_name: str, columns: list[str], rows: list[tuple[str, ...]], batch_size: int = 500) -> None: if not rows: return column_sql = ", ".join(f"`{column}`" for column in columns) for chunk in batched(rows, batch_size): values_sql = ",\n".join(f" ({', '.join(row)})" for row in chunk) lines.append(f"INSERT INTO `{table_name}` ({column_sql}) VALUES\n{values_sql};") lines.append("") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Export MobileModels MySQL seed SQL.") parser.add_argument( "--repo-root", type=Path, default=WORKSPACE_ROOT, help="Path to workspace root", ) parser.add_argument( "--output", type=Path, default=Path("dist/mobilemodels_mysql_seed.sql"), help="Output SQL path", ) return parser.parse_args() def main() -> int: args = parse_args() repo_root = args.repo_root.resolve() output_path = args.output if args.output.is_absolute() else PROJECT_ROOT / args.output records = build_records(repo_root) device_record_count = len(records) catalog_rows = build_catalog_rows(records) brand_rows = build_brand_rows(records) lines = [ "-- MobileModels MySQL seed", "-- Generated by tools/export_mysql_seed.py", "USE `mobilemodels`;", "", "START TRANSACTION;", "", "DELETE FROM `mm_device_catalog`;", "DELETE FROM `mm_brand_lookup`;", "", ] append_insert_block( lines, "mm_device_catalog", [ "record_id", "model", "alias_norm", "device_name", "brand", "manufacturer_brand", "parent_brand", "market_brand", "device_type", "code", "code_alias", "ver_name", "source_file", "section", "source_rank", "source_weight", ], catalog_rows, ) append_insert_block( lines, "mm_brand_lookup", [ "alias_norm", "alias_type", "canonical_brand", "manufacturer_brand", "parent_brand", "market_brand", ], brand_rows, ) lines.extend([ "COMMIT;", "", f"-- device_records: {device_record_count}", f"-- device_catalog_rows: {len(catalog_rows)}", f"-- device_lookup_rows: {len(catalog_rows)}", f"-- brand_lookup_rows: {len(brand_rows)}", f"-- legacy_models_rows: {len(catalog_rows)}", "", ]) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text("\n".join(lines), encoding="utf-8") print(f"Exported MySQL seed: {output_path}") print(f"device_records={device_record_count}") print(f"device_catalog_rows={len(catalog_rows)}") print(f"device_lookup_rows={len(catalog_rows)}") print(f"brand_lookup_rows={len(brand_rows)}") print(f"legacy_models_rows={len(catalog_rows)}") return 0 if __name__ == "__main__": raise SystemExit(main())