Compare commits

..

15 Commits

18 changed files with 24793 additions and 20799 deletions
+507 -27
View File
@@ -1,55 +1,535 @@
# 手机品牌型号汇总
# 项目使用文档
当前项目以根目录作为统一入口,支持通过 Docker Compose 直接启动设备查询、数据管理和 MySQL 服务。
## 1. 文档概览
## 启动方式
### 1.1 文档目标
这份文档用于说明 MobileModels 的使用方式、数据接入方式、维护方式和排查方式。文档不再按“项目说明 / 使用说明 / 数据库说明”拆开,而是整理成一份统一文档,方便不同角色在同一份上下文里完成阅读。
### 1.2 适用角色
- 产品或业务使用者:需要查设备、看字段、理解页面入口
- 数据维护者:需要维护品牌、来源排序、同步数据、刷新索引
- 服务端接入方:需要接 MySQL、按统一规则查询设备数据
- 运维维护者:需要启动服务、检查配置、处理运行问题
### 1.3 建议阅读路径
- 只想先跑起来:先看 `2. 快速开始`
- 只想用页面:先看 `3. 页面使用`
- 只想接数据库:先看 `4. 数据接入`
- 负责维护和发布:重点看 `5. 数据维护``6. 配置与运行``7. 排查指南`
## 2. 快速开始
### 2.1 项目提供什么
MobileModels 将下面几类能力放在一个项目里统一交付:
- 设备查询页面
- 数据管理页面
- 文档查看页面
- 设备索引构建
- MySQL schema 与 seed 生成
- 原始数据同步与自动调度
使用时不需要分别启动多个服务,默认通过 Docker Compose 统一运行。
### 2.2 启动服务
在项目根目录执行:
```bash
docker compose up --build -d
```
本地测试 MySQL,一起叠加测试配置启动
果需要同时启动本地测试 MySQL
```bash
docker compose -f docker-compose.yml -f docker-compose.test.yml up --build -d
```
需自定义 MySQL 连接,先复制环境模板
果需要覆盖默认环境变量
```bash
cp .env.example .env
```
页面入口
### 2.3 页面入口
- `http://127.0.0.1:8123/web/device_query.html`
- `http://127.0.0.1:8123/web/brand_management.html`
- `http://127.0.0.1:8123/web/device_query.html?view=docs`
- `设备查询``http://127.0.0.1:8123/web/device_query.html`
- `数据管理``http://127.0.0.1:8123/web/brand_management.html`
- `使用文档``http://127.0.0.1:8123/web/device_query.html?view=docs`
## 目录结构
### 2.4 停止与重置
停止服务:
```bash
docker compose down
```
清空运行期数据:
```bash
docker compose down -v
```
### 2.5 启动后会自动完成什么
默认启动后会按配置完成以下动作:
1. 加载原始数据工作区
2. 构建设备索引
3. 生成 `dist/device_index.json`
4. 生成 `dist/mobilemodels_mysql_seed.sql`
5. 按配置决定是否自动装载 MySQL
6. 启动 Web 页面与 API
7. 启动项目内部的自动同步调度
## 3. 页面使用
### 3.1 页面结构
项目包含两个主页面和一个文档入口:
- `设备查询`:面向日常查询、联调、设备归类
- `数据管理`:面向品牌维护、来源维护、同步维护、索引维护
- `使用文档`:面向统一查看项目使用文档
### 3.2 设备查询页
设备查询页分为三个 tab。
#### 设备标识
适用输入:
- Android 原始标识,如 `SM-G9980`
- iOS 原始标识,如 `iPhone14,2`
- HarmonyOS 原始标识,如 `NOH-AL00`
适合场景:
- 设备管理
- 登录设备展示
- 服务端设备归类
- 客户端上报值排查
结果字段使用方式:
- 设备名称:`device_name`
- 品牌展示:优先 `market_brand`,为空再取 `brand`
- 厂商归属:优先 `parent_brand`,为空再取 `manufacturer_brand`
- 设备类型:`device_type`
不建议直接用作主展示的字段:
- `alias_norm`
- `source_file`
- `source_rank`
- `source_weight`
- `code`
- `code_alias`
- `ver_name`
#### 设备名称
适用输入:
- `iPad mini 7`
- `iPhone SE 3`
- 其他人工可读设备名称
查询行为:
1. 先尝试设备名称别名映射
2. 未命中时再查 `device_name` / `ver_name`
适合场景:
- 运营核对
- 人工排查
- 已知设备名但不知道原始标识
#### 标识索引
适用输入:
- 设备标识
- 常见设备名称
适合场景:
- 联调
- 索引识别结果比对
- 与 MySQL 查询结果做交叉验证
### 3.3 数据管理页
数据管理页按左侧导航分为五部分。
#### 品牌列表
这里维护:
- 品牌数、厂商数
- 品牌列表
- 品牌与厂商关系
- 品牌同义词
适用场景:
- 品牌口径调整
- 厂商归属调整
- 品牌展示不符合预期时排查
#### 手动补录
这里维护本地覆盖库。
这里可做的事:
- 新增独立品牌
- 在品牌下补录设备
- 编辑或删除本地补录记录
适用场景:
- 上游暂未收录的新品牌
- 学习机、教育终端、定制设备
- 需要立即参与页面查询和 MySQL 查询的补录数据
使用说明:
1. 品牌先建在本地覆盖库
2. 设备标识填客户端真实上报值
3. 保存后自动重建索引和 MySQL seed
4. 如果开启 MySQL 自动装载,会继续自动刷新 MySQL
5. 本地覆盖库不会被“原始数据同步”覆盖
#### 数据来源
这里维护来源优先级。
使用方式:
1. 直接拖拽来源顺序
2. 越靠前优先级越高
3. 保存后影响结果排序
适用场景:
- 多来源结果排序不符合预期
- 新来源加入后需要重新排序
#### 原始数据同步
这里维护:
- MySQL 自动装载开关
- 外部 MySQL 初始化
- 每日自动同步时间
- 同步状态与任务日志
适用场景:
- 上游数据更新后刷新本地数据
- 外部 MySQL 需要重建
- 调整自动同步策略
#### 索引数据
这里用于查看当前索引状态并手动重新加载。
适用场景:
- 同步完成后确认索引是否已生效
- 页面结果与预期不一致时检查索引版本
### 3.4 页面调试建议
如果页面查询结果异常,建议按下面顺序判断:
1. 先看输入值是否符合页面预期
2. 再看页面中的调试信息
3. 再看返回 JSON 是否包含预期字段
4. 再检查索引或 MySQL 数据是否已刷新
## 4. 数据接入
### 4.1 推荐查询入口
推荐统一查询下面这张主表:
```sql
mobilemodels.mm_device_catalog
```
这张表已经整合了设备标识、设备名称、品牌、厂商、来源和归一化结果。业务侧如果需要查设备信息,优先直接使用这张表,而不是自己拼接多张表或从页面逻辑反推字段。
### 4.2 推荐查询方式
主用方式是按 `alias_norm` 等值查询。
推荐 SQL
```sql
SELECT
model,
record_id,
alias_norm,
device_name,
brand,
manufacturer_brand,
parent_brand,
market_brand,
device_type,
source_file,
section,
source_rank,
source_weight,
code,
code_alias,
ver_name
FROM mobilemodels.mm_device_catalog
WHERE alias_norm = ?
ORDER BY source_rank ASC, record_id ASC
LIMIT 20;
```
### 4.3 为什么主推 `alias_norm`
客户端上报的原始 `model_raw` 可能包含大小写、横线、空格、下划线、逗号等差异。为了让相同设备尽可能稳定命中,项目统一把输入值归一化到 `alias_norm`,然后按 `alias_norm` 查询主表。
这也是页面查询与数据库查询保持一致的关键。
### 4.4 查询前归一化规则
归一化规则如下:
- 先转小写
- 只保留数字、英文字母、中文
- 去掉空格、横线、下划线、逗号及其他标点
正则示例:
```text
workspace/ 上游原始数据、补充资料与历史文件
dist/ 构建产物与 MySQL seed
docs/ 项目文档
/[^0-9a-z\u4e00-\u9fff]+/g
```
JavaScript 示例:
```js
text.toLowerCase().replace(/[^0-9a-z\u4e00-\u9fff]+/g, "")
```
样例:
```text
SM-G9980 -> smg9980
iPhone14,2 -> iphone142
NOH-AL00 -> nohal00
```
### 4.5 结果字段使用建议
面向设备管理场景,推荐按下面顺序取值:
- 设备名称:`device_name`
- 品牌展示:优先 `market_brand`,为空再取 `brand`
- 厂商归属:优先 `parent_brand`,为空再取 `manufacturer_brand`
- 设备类型:`device_type`
下面这些字段通常用于排查、排序或扩展匹配,不建议直接作为主展示字段:
- `alias_norm`
- `source_file`
- `source_rank`
- `source_weight`
- `code`
- `code_alias`
- `ver_name`
### 4.6 页面查询与数据库查询的关系
页面里的 `设备标识` 查询和 `设备名称` 查询,底层都依赖这张主表。因此:
- 页面结果和第三方直接查库结果应该保持同一口径
- 页面里的归一化规则应与服务端或业务接入侧保持一致
- 如果页面显示异常,先核对主表数据,再核对前端展示逻辑
## 5. 数据维护
### 5.1 数据来源
项目的数据来源分为两部分:
- 上游原始数据:`workspace/brands/*.md`
- 本地覆盖库:`workspace/local/manual_catalog.json`
上游原始数据用于同步官方或社区维护的数据,本地覆盖库用于补录当前业务需要但上游暂未收录的品牌和设备。
### 5.2 数据生成链路
完整链路如下:
1. 同步上游原始 markdown
2. 解析 `workspace/brands/*.md`
3. 合并 `workspace/local/manual_catalog.json`
4. 构建 `dist/device_index.json`
5. 导出 `dist/mobilemodels_mysql_seed.sql`
6. 按配置决定是否自动装载 MySQL
### 5.3 关键产物
- 索引文件:`dist/device_index.json`
- MySQL seed`dist/mobilemodels_mysql_seed.sql`
### 5.4 常见维护动作
#### 刷新原始数据
适用于上游数据已变化,需要重新生成索引和 MySQL 数据。
#### 调整品牌关系
适用于品牌展示或厂商归属不符合预期的情况。
#### 手动补录品牌或设备
适用于上游未收录,但业务需要立即支持的设备。
维护方式:
1.`数据管理 -> 手动补录` 中新增品牌或设备
2. 保存后自动刷新索引与 MySQL seed
3. 如关闭了 MySQL 自动装载,需按需手动初始化或刷新外部 MySQL
说明:
- 本地覆盖库不会被上游同步覆盖
- 本地补录来源默认优先级更高
- 适合维护学习机、教育设备、定制终端
#### 调整来源顺序
适用于多个来源的优先级需要重新定义的情况。
#### 重新加载索引
适用于索引已更新,但页面还没有使用到最新索引的情况。
## 6. 配置与运行
### 6.1 目录说明
```text
workspace/ 上游原始数据与补充资料
dist/ 索引产物与 MySQL seed
docs/ 文档入口与兼容说明
sql/ MySQL schema
tools/ 构建、同步、导入服务脚本
tools/ 构建、同步、导入服务脚本
web/ 页面与静态资源
```
## 说明
### 6.2 常见环境变量
- `workspace/` 用于存放原始数据工作区
- `docker-compose.yml``Dockerfile``tools/` 都位于项目主目录
- 默认主配置面向远程 MySQL
- `docker-compose.test.yml` 中的 MySQL 仅用于本地测试
- 容器内生成的 `dist/device_index.json``dist/mobilemodels_mysql_seed.sql` 会直接挂载到宿主机项目根目录的 `dist/`
- Compose 会优先读取 shell 环境变量和项目根目录 `.env`,再回退到 `docker-compose.yml` 默认值
- 上游原始 git 同步、索引构建和 MySQL 刷新都在容器内完成
- 项目内置“每日自动同步”调度,不依赖 GitHub Actions;时间点可在数据管理页设置,也可用 `.env` 覆盖默认值
- 如需 GitHub 加速,可配置 `GITHUB_PROXY_PREFIX`,也可在数据管理页直接修改
- `MYSQL_HOST`
- `MYSQL_PORT`
- `MYSQL_DATABASE`
- `MYSQL_ROOT_USER`
- `MYSQL_ROOT_PASSWORD`
- `MYSQL_READER_USER`
- `MYSQL_READER_PASSWORD`
- `MYSQL_AUTO_LOAD`
- `SYNC_SCHEDULE_ENABLED`
- `SYNC_SCHEDULE_TIME`
- `GITHUB_PROXY_PREFIX`
- `TZ`
更多说明见:
### 6.3 运行期配置落盘位置
- [docs/README.md](docs/README.md)
- [docs/web-ui.md](docs/web-ui.md)
- MySQL 自动装载:`/data/state/mysql_settings.json`
- 自动同步计划:`/data/state/sync_schedule.json`
### 6.4 运行约定
- Compose 会优先读取 shell 环境变量和项目根目录 `.env`
- 容器启动后会完成索引构建、服务启动,以及按配置决定是否自动装载 MySQL
- 自动同步运行在项目容器内部,不依赖 GitHub Actions
- 生产环境应替换默认数据库密码
### 6.5 MySQL 使用建议
- 生产环境不要继续使用默认密码
- 如果接外部 MySQL,先确认 root 账号具备建库建表权限
- 如关闭自动装载,需通过数据管理页手动初始化外部 MySQL
## 7. 排查指南
### 7.1 查不到设备标识
按下面顺序排查:
1. 确认输入的是客户端原始 `model_raw`
2. 确认归一化结果是否符合规则
3. 查看 `设备标识` 页调试信息和返回 JSON
4. 检查主表里是否存在对应 `alias_norm`
### 7.2 品牌或厂商归属不对
建议排查:
1. 查看 `数据管理 -> 品牌列表`
2. 核对 `market_brand``parent_brand`
3. 确认是否存在品牌同义词或父级归属未维护的情况
### 7.3 结果排序不符合预期
建议排查:
1. 查看 `数据管理 -> 数据来源`
2. 核对 `source_rank`
3. 确认结果是否来自预期来源文件
### 7.4 MySQL 没有刷新
建议排查:
1. 查看 `数据管理 -> 原始数据同步`
2. 检查 MySQL 自动装载开关
3. 查看同步状态与任务日志
4. 必要时手动初始化外部 MySQL
### 7.5 索引结果不对
建议排查:
1. 查看 `数据管理 -> 索引数据`
2. 检查 `dist/device_index.json` 是否已更新
3. 确认页面是否已经重新加载了最新索引
## 8. 常见问题
### 8.1 为什么页面结果和数据库结果不一致
优先检查以下两点:
- 页面是否使用了最新索引
- 数据库是否使用了最新 seed 数据
如果索引和 MySQL 数据版本不一致,页面和数据库查询结果就可能不同步。
### 8.2 为什么品牌字段有多个
项目同时保留原始品牌、展示品牌和父级厂商字段,是为了兼顾原始数据保留、业务展示口径和厂商归属口径。业务展示通常取 `market_brand`,厂商归属通常取 `parent_brand`
### 8.3 什么时候用设备名称查询,什么时候用设备标识查询
- 主流程、服务端接入、设备管理:优先用设备标识查询
- 人工排查、运营核对、设备名反查:用设备名称查询
### 8.4 什么时候需要手动初始化外部 MySQL
当你使用的是外部 MySQL,且关闭了自动装载,或者数据库结构和数据需要重新初始化时,需要在数据管理页里手动执行初始化。
+11373 -10618
View File
File diff suppressed because it is too large Load Diff
+9252 -9156
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -13,6 +13,6 @@
- `web-ui.md`
- Docker Compose 启动、页面入口、MySQL 连接和管理能力说明
- `mysql-query-design.md`
- 主表设计、兼容视图、推荐查询方式
- 主表设计、查询方式与清理策略
- `device-mapper.md`
- `dist/device_index.json` 构建方式与索引字段说明
+2 -1
View File
@@ -25,7 +25,7 @@ python3 tools/device_mapper.py find --name 'L55M5-AD' --brand Xiaomi
- `brand`: normalized brand
- `manufacturer_brand`: manufacturer-level brand
- `market_brand`: market sub-brand
- `device_type`: `phone | tablet | wear | tv | other`
- `device_type`: `phone | tablet | wear | tv | computer | other`
- `aliases`: all searchable aliases
- `lookup`: normalized alias -> candidate `record.id[]`
- `brand_aliases`: normalized brand aliases to filter by app-provided brand
@@ -54,4 +54,5 @@ Supported categories:
- `tablet`
- `wear`
- `tv`
- `computer`
- `other`
+3 -140
View File
@@ -1,142 +1,5 @@
# MySQL 设计说明
# 项目使用文档入口
本文档说明交付版 MobileModels 的 MySQL 数据组织方式、兼容层设计与推荐查询方式。
相关说明已统一整理到一份文档中,请查看:
## 设计目标
- 所有设备标识都能落到 MySQL 查询
- 支持第三方直接查库,保证查询速度
- 保留兼容旧结构的访问方式
- 页面侧和 SQL 接入侧统一使用同一份设备数据
## 主表
主推物理表:
```sql
mobilemodels.mm_device_catalog
```
主表整合了设备型号、品牌、厂商、来源、别名归一化结果和兼容字段,适合作为统一查询入口。
### 关键字段
- `model`
- `record_id`
- `alias_norm`
- `device_name`
- `brand`
- `manufacturer_brand`
- `parent_brand`
- `market_brand`
- `device_type`
- `source_file`
- `section`
- `source_rank`
- `source_weight`
- `code`
- `code_alias`
- `ver_name`
## 推荐查询方式
### 1. 第三方直接查主表
推荐按 `alias_norm` 等值查询:
```sql
SELECT
model,
record_id,
alias_norm,
device_name,
brand,
manufacturer_brand,
parent_brand,
market_brand,
device_type,
source_file,
section,
source_rank,
source_weight,
code,
code_alias,
ver_name
FROM mobilemodels.mm_device_catalog
WHERE alias_norm = ?
ORDER BY source_rank ASC, record_id ASC
LIMIT 20;
```
### 2. 页面 SQL 查询
页面的 `SQL 查询` tab 也是基于这张主表。
查询流程:
1. 接收客户端原始上报值
2. 服务端归一化为 `alias_norm`
3. 按主表等值查询
4. 返回结果列表、执行 SQL 和 JSON 输出
## 兼容视图
为了兼容旧系统,当前仍保留以下视图:
```sql
mobilemodels.mm_device_lookup
mobilemodels.mm_device_record
mobilemodels.models
python_services_test.models
```
其中旧结构 `python_services_test.models` 主要用于兼容既有查询逻辑,不再作为主推接入方式。
## 兼容旧结构查询示例
```sql
SELECT
model,
dtype,
brand,
brand_title,
code,
code_alias,
model_name,
ver_name
FROM python_services_test.models
WHERE model = ?
LIMIT 20;
```
## 归一化规则
`alias_norm` 统一按以下规则生成:
- 全部转小写
- 仅保留 `[0-9a-z中文]`
- 去掉空格、横线、下划线和其他标点
示例:
```text
SM-G9980 -> smg9980
iPhone14,2 -> iphone142
NOH-AL00 -> nohal00
```
## 数据来源
主表和索引数据均由以下流程生成:
1. 同步上游原始 markdown 数据
2. 解析 `workspace/brands/*.md`
3. 构建 `dist/device_index.json`
4. 导出 `dist/mobilemodels_mysql_seed.sql`
5. 加载 MySQL schema 与 seed
## 交付建议
- 第三方新接入优先使用 `mm_device_catalog`
- 页面联调和数据库联调使用同一套原始数据与归一化规则
- 生产环境务必替换默认数据库密码
- [项目使用文档](../README.md)
+3 -163
View File
@@ -1,165 +1,5 @@
# Web UI
# 项目使用文档入口
## 启动方式
相关说明已统一整理到一份文档中,请查看:
在项目根目录执行:
```bash
docker compose up --build -d
```
如果要连本地测试 MySQL
```bash
docker compose -f docker-compose.yml -f docker-compose.test.yml up --build -d
```
如需自定义环境变量:
```bash
cp .env.example .env
```
Compose 的环境变量来源顺序:
1. 当前 shell 环境变量
2. 项目根目录 `.env`
3. `docker-compose.yml` 中的默认值
停止服务:
```bash
docker compose down
```
重置 MySQL 和运行期数据:
```bash
docker compose down -v
```
## 页面入口
- `http://127.0.0.1:8123/web/device_query.html`:设备查询
- `http://127.0.0.1:8123/web/brand_management.html`:数据管理
- `http://127.0.0.1:8123/web/device_query.html?view=docs`:相关文档
整个功能栈统一运行在 Docker Compose 中,不再依赖本地 Python。
原始数据工作空间位于项目内的 `workspace/` 目录。
## 启动后自动完成的动作
-`workspace/brands` 构建设备索引
- 生成 `dist/device_index.json`
- 导出 MySQL seed 文件
- 如开启 `MYSQL_AUTO_LOAD=1`,则加载 MySQL schema 与 seed 数据
- 启动项目内置的每日自动同步调度器
- 启动 Web 页面与 API 服务
首次启动默认值仍来自环境变量;之后可在 Web UI 中修改自动装载开关,运行期配置会持久化到 `/data/state/mysql_settings.json`
## MySQL 默认连接
- Host: `127.0.0.1`
- Port: `3306`
- Database: `mobilemodels`
- Reader User: `mobilemodels_reader`
如需自定义账号密码,请使用 `.env` 覆盖默认值。
常用变量:
- `MYSQL_HOST`
- `MYSQL_PORT`
- `TZ`
- `MYSQL_ROOT_USER`
- `MYSQL_ROOT_PASSWORD`
- `MYSQL_DATABASE`
- `MYSQL_READER_USER`
- `MYSQL_READER_PASSWORD`
- `MYSQL_AUTO_LOAD`
- `SYNC_SCHEDULE_ENABLED`
- `SYNC_SCHEDULE_TIME`
- `GITHUB_PROXY_PREFIX`
## MySQL 模式
- 主配置 `docker-compose.yml`
- 面向远程 MySQL
- 默认不自动装载 schema/seed
- 测试配置 `docker-compose.test.yml`
- 额外启动一个本地测试 MySQL
- 应用容器会自动把数据加载进去
## 设备查询
页面顶部统一提供三个导航入口:
- `设备查询`
- `数据管理`
- `相关文档`
设备查询页顶部包含两个页内 tab
- `SQL 查询`
- `索引查询`
### SQL 查询
- 直接调用 Compose 内 API 查询 MySQL 主表 `mobilemodels.mm_device_catalog`
- 服务端先将输入归一化为 `alias_norm`
- 页面展示实际执行的 SQL、返回结果和 JSON
- 页面同时展示只读连接参数,便于核对配置
### 索引查询
- 基于 `dist/device_index.json` 内存索引进行快速识别
- 适合前端联调、接口对比和结果核验
### 平台输入建议
- Android / iOS / HarmonyOS:直接使用客户端原始上报的 `model_raw`
- 输入框会根据所选平台自动提供示例值
- 未输入时,系统会使用当前平台的默认示例值发起查询
## 数据管理
数据管理页支持:
- 品牌列表管理
- 品牌与厂商关系管理
- 品牌同义词管理
- 数据来源优先级管理
- 外部 MySQL 手动初始化
- 原始数据同步
- 每日自动同步时间点设置
- 索引数据查看与重新加载
### 外部 MySQL 手动初始化
- 页面入口:`数据管理 -> 原始数据同步 -> 初始化外部 MySQL`
- 适用于 `MYSQL_AUTO_LOAD=0` 的远程 MySQL
- 点击后会执行 schema 与 seed 导入,自动创建数据库,并重建 `mobilemodels` 相关表与视图
- 执行前请确认 `MYSQL_HOST``MYSQL_PORT``MYSQL_ROOT_USER``MYSQL_ROOT_PASSWORD` 指向正确且具备建库建表权限
### MySQL 自动装载开关
- 页面入口:`数据管理 -> 原始数据同步 -> MySQL 自动装载`
- 保存后会更新运行期配置 `/data/state/mysql_settings.json`
- 会影响后续“开始同步原始数据”是否自动刷新 MySQL
- 也会影响容器后续启动时是否自动执行 schema 与 seed 导入
### 每日自动同步
- 调度器运行在项目容器内部,不依赖 GitHub Actions
- 页面入口:`数据管理 -> 原始数据同步`
- 可设置是否启用,以及每天执行的时间点
- 可选配置 GitHub 加速前缀,例如 `https://ghfast.top/`
- 运行期配置持久化在 `/data/state/sync_schedule.json`
- 时间按容器时区执行,默认值来自 `TZ`,默认 `Asia/Shanghai`
## 说明
- 原始数据、索引和 MySQL seed 运行时持久化在 Docker volume 中,不回写本地工作区
- 交付环境建议覆盖默认的 `MYSQL_ROOT_PASSWORD``MYSQL_READER_PASSWORD`
- [项目使用文档](../README.md)
+25 -120
View File
@@ -2,18 +2,18 @@ CREATE DATABASE IF NOT EXISTS `mobilemodels`
DEFAULT CHARACTER SET utf8mb4
DEFAULT COLLATE utf8mb4_0900_ai_ci;
CREATE DATABASE IF NOT EXISTS `python_services_test`
DEFAULT CHARACTER SET utf8mb4
DEFAULT COLLATE utf8mb4_0900_ai_ci;
DROP DATABASE IF EXISTS `python_services_test`;
USE `mobilemodels`;
SET @drop_stmt = (
SELECT CASE `TABLE_TYPE`
WHEN 'BASE TABLE' THEN 'DROP TABLE `python_services_test`.`models`'
WHEN 'VIEW' THEN 'DROP VIEW `python_services_test`.`models`'
WHEN 'BASE TABLE' THEN 'DROP TABLE `mm_device_catalog`'
WHEN 'VIEW' THEN 'DROP VIEW `mm_device_catalog`'
ELSE 'DO 0'
END
FROM `information_schema`.`TABLES`
WHERE `TABLE_SCHEMA` = 'python_services_test' AND `TABLE_NAME` = 'models'
WHERE `TABLE_SCHEMA` = 'mobilemodels' AND `TABLE_NAME` = 'mm_device_catalog'
LIMIT 1
);
SET @drop_stmt = COALESCE(@drop_stmt, 'DO 0');
@@ -21,16 +21,14 @@ PREPARE stmt FROM @drop_stmt;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
USE `mobilemodels`;
SET @drop_stmt = (
SELECT CASE `TABLE_TYPE`
WHEN 'BASE TABLE' THEN 'DROP TABLE `mm_device_record`'
WHEN 'VIEW' THEN 'DROP VIEW `mm_device_record`'
WHEN 'BASE TABLE' THEN 'DROP TABLE `mm_brand_lookup`'
WHEN 'VIEW' THEN 'DROP VIEW `mm_brand_lookup`'
ELSE 'DO 0'
END
FROM `information_schema`.`TABLES`
WHERE `TABLE_SCHEMA` = 'mobilemodels' AND `TABLE_NAME` = 'mm_device_record'
WHERE `TABLE_SCHEMA` = 'mobilemodels' AND `TABLE_NAME` = 'mm_brand_lookup'
LIMIT 1
);
SET @drop_stmt = COALESCE(@drop_stmt, 'DO 0');
@@ -53,6 +51,21 @@ PREPARE stmt FROM @drop_stmt;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @drop_stmt = (
SELECT CASE `TABLE_TYPE`
WHEN 'BASE TABLE' THEN 'DROP TABLE `mm_device_record`'
WHEN 'VIEW' THEN 'DROP VIEW `mm_device_record`'
ELSE 'DO 0'
END
FROM `information_schema`.`TABLES`
WHERE `TABLE_SCHEMA` = 'mobilemodels' AND `TABLE_NAME` = 'mm_device_record'
LIMIT 1
);
SET @drop_stmt = COALESCE(@drop_stmt, 'DO 0');
PREPARE stmt FROM @drop_stmt;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @drop_stmt = (
SELECT CASE `TABLE_TYPE`
WHEN 'BASE TABLE' THEN 'DROP TABLE `models`'
@@ -107,7 +120,7 @@ CREATE TABLE IF NOT EXISTS `mm_device_catalog` (
`manufacturer_brand` varchar(64) NOT NULL,
`parent_brand` varchar(64) NOT NULL,
`market_brand` varchar(64) NOT NULL,
`device_type` enum('phone','tablet','wear','tv','other') NOT NULL,
`device_type` enum('phone','tablet','wear','tv','computer','other') NOT NULL,
`code` varchar(64) DEFAULT NULL,
`code_alias` varchar(255) DEFAULT NULL,
`ver_name` text DEFAULT NULL,
@@ -147,111 +160,3 @@ CREATE TABLE IF NOT EXISTS `mm_brand_lookup` (
KEY `idx_mm_brand_lookup_parent_brand` (`parent_brand`),
KEY `idx_mm_brand_lookup_market_brand` (`market_brand`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE OR REPLACE VIEW `mm_device_lookup` AS
SELECT
c.`alias_norm`,
c.`record_id`,
c.`device_name`,
c.`brand`,
c.`manufacturer_brand`,
c.`parent_brand`,
c.`market_brand`,
c.`device_type`,
c.`source_file`,
c.`section`,
c.`source_rank`,
c.`source_weight`,
c.`updated_at`
FROM `mm_device_catalog` AS c;
CREATE OR REPLACE VIEW `mm_device_record` AS
SELECT
c.`record_id`,
c.`device_name`,
c.`brand`,
c.`manufacturer_brand`,
c.`parent_brand`,
c.`market_brand`,
c.`device_type`,
c.`source_file`,
c.`section`,
c.`source_rank`,
c.`source_weight`,
CAST(CONCAT('[', GROUP_CONCAT(JSON_QUOTE(c.`model`) ORDER BY c.`model` SEPARATOR ','), ']') AS JSON) AS `aliases_json`,
MAX(c.`updated_at`) AS `updated_at`
FROM `mm_device_catalog` AS c
GROUP BY
c.`record_id`,
c.`device_name`,
c.`brand`,
c.`manufacturer_brand`,
c.`parent_brand`,
c.`market_brand`,
c.`device_type`,
c.`source_file`,
c.`section`,
c.`source_rank`,
c.`source_weight`;
CREATE OR REPLACE VIEW `vw_mm_device_lookup` AS
SELECT
c.`alias_norm`,
c.`record_id`,
c.`device_name`,
c.`brand`,
c.`manufacturer_brand`,
c.`parent_brand`,
c.`market_brand`,
c.`device_type`,
c.`source_file`,
c.`section`,
c.`source_rank`,
c.`source_weight`,
c.`updated_at`
FROM `mm_device_catalog` AS c;
CREATE OR REPLACE VIEW `models` AS
SELECT
c.`model`,
c.`device_type` AS `dtype`,
c.`market_brand` AS `brand`,
c.`manufacturer_brand` AS `brand_title`,
c.`code`,
c.`code_alias`,
c.`device_name` AS `model_name`,
c.`ver_name`,
c.`updated_at` AS `update_at`,
c.`hash_md5`,
c.`hash_crc`
FROM `mm_device_catalog` AS c;
CREATE OR REPLACE VIEW `vw_models` AS
SELECT
c.`model`,
c.`device_type` AS `dtype`,
c.`market_brand` AS `brand`,
c.`manufacturer_brand` AS `brand_title`,
c.`code`,
c.`code_alias`,
c.`device_name` AS `model_name`,
c.`ver_name`,
c.`updated_at` AS `update_at`,
c.`hash_md5`,
c.`hash_crc`
FROM `mm_device_catalog` AS c;
CREATE OR REPLACE VIEW `python_services_test`.`models` AS
SELECT
`model`,
`dtype`,
`brand`,
`brand_title`,
`code`,
`code_alias`,
`model_name`,
`ver_name`,
`update_at`,
`hash_md5`,
`hash_crc`
FROM `mobilemodels`.`models`;
+221 -32
View File
@@ -85,6 +85,8 @@ FILE_DEFAULT_DEVICE_TYPE: Dict[str, str] = {
"zhixuan": "phone",
}
MANUAL_SOURCE_FILE = "local/manual_catalog.json"
BRAND_ALIASES: Dict[str, List[str]] = {
"360": ["360", "360手机", "奇酷", "qiku"],
@@ -177,11 +179,17 @@ WEAR_KEYWORDS = [
"glasses",
"眼镜",
]
OTHER_KEYWORDS = [
COMPUTER_KEYWORDS = [
"matebook",
"macbook",
"笔记本",
"电脑",
"laptop",
"notebook",
"desktop",
"workstation",
]
OTHER_KEYWORDS = [
"vision",
"vr",
"ipod",
@@ -233,6 +241,10 @@ class DeviceRecord:
section: str
MANUAL_BRAND_ALIAS_OVERRIDES: Dict[str, List[str]] = {}
MANUAL_PARENT_BRAND_OVERRIDES: Dict[str, str] = {}
def normalize_text(text: str) -> str:
return re.sub(r"[^0-9a-z\u4e00-\u9fff]+", "", text.lower())
@@ -243,22 +255,141 @@ def canonical_brand(file_stem: str) -> str:
def brand_aliases(brand: str) -> List[str]:
aliases = set(BRAND_ALIASES.get(brand, []))
aliases.update(MANUAL_BRAND_ALIAS_OVERRIDES.get(brand, []))
aliases.add(brand)
return sorted(aliases)
def has_keyword(text: str, keywords: Iterable[str]) -> bool:
norm_text = normalize_text(text)
norm_text = re.sub(r"[^0-9a-z\u4e00-\u9fff]+", " ", text.lower())
norm_text = " ".join(norm_text.split())
for kw in keywords:
if normalize_text(kw) and normalize_text(kw) in norm_text:
kw_norm = re.sub(r"[^0-9a-z\u4e00-\u9fff]+", " ", kw.lower())
kw_norm = " ".join(kw_norm.split())
if kw_norm and kw_norm in norm_text:
return True
return False
def resolve_parent_brand(manufacturer_brand: str) -> str:
if manufacturer_brand in MANUAL_PARENT_BRAND_OVERRIDES:
return MANUAL_PARENT_BRAND_OVERRIDES[manufacturer_brand]
return MANUFACTURER_PARENT_BRAND.get(manufacturer_brand, manufacturer_brand)
def reset_manual_overrides() -> None:
MANUAL_BRAND_ALIAS_OVERRIDES.clear()
MANUAL_PARENT_BRAND_OVERRIDES.clear()
def normalize_alias_list(*groups: object) -> List[str]:
aliases: List[str] = []
seen: Set[str] = set()
for group in groups:
if group is None:
continue
items = group if isinstance(group, (list, tuple, set)) else [group]
for item in items:
text = str(item or "").strip()
key = normalize_text(text)
if not text or not key or key in seen:
continue
seen.add(key)
aliases.append(text)
return aliases
def is_preferred_cn_source_file(source_file: str) -> bool:
source = str(source_file or "").strip().lower()
if not source:
return False
if source == MANUAL_SOURCE_FILE:
return True
return not (source.endswith("_en.md") or source.endswith("_global_en.md"))
def first_preferred_match(records: List[DeviceRecord]) -> List[DeviceRecord]:
if not records:
return []
cn_records = [record for record in records if is_preferred_cn_source_file(record.source_file)]
return [(cn_records or records)[0]]
def load_manual_catalog(repo_root: Path) -> dict[str, object]:
path = repo_root / MANUAL_SOURCE_FILE
if not path.exists():
return {"brands": [], "devices": []}
payload = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
raise RuntimeError(f"{MANUAL_SOURCE_FILE} 必须是 JSON 对象。")
brands = payload.get("brands")
devices = payload.get("devices")
return {
"brands": brands if isinstance(brands, list) else [],
"devices": devices if isinstance(devices, list) else [],
}
def apply_manual_brand_overrides(manual_catalog: dict[str, object]) -> dict[str, str]:
reset_manual_overrides()
brand_parent_map: dict[str, str] = {}
for raw_brand in manual_catalog.get("brands", []):
if not isinstance(raw_brand, dict):
continue
brand_name = str(raw_brand.get("name") or "").strip()
if not brand_name:
continue
aliases = normalize_alias_list(brand_name, raw_brand.get("aliases"))
parent_brand = str(raw_brand.get("parent_brand") or brand_name).strip() or brand_name
MANUAL_BRAND_ALIAS_OVERRIDES[brand_name] = aliases
MANUAL_PARENT_BRAND_OVERRIDES[brand_name] = parent_brand
brand_parent_map[brand_name] = parent_brand
return brand_parent_map
def parse_manual_catalog(repo_root: Path, manual_catalog: dict[str, object]) -> List[DeviceRecord]:
brand_parent_map = apply_manual_brand_overrides(manual_catalog)
records: List[DeviceRecord] = []
for raw_device in manual_catalog.get("devices", []):
if not isinstance(raw_device, dict):
continue
brand = str(raw_device.get("brand") or "").strip()
device_name = str(raw_device.get("device_name") or "").strip()
if not brand or not device_name:
continue
aliases = normalize_alias_list(
raw_device.get("models"),
device_name,
raw_device.get("aliases"),
)
if not aliases:
continue
device_type = str(raw_device.get("device_type") or "").strip() or "other"
section = str(raw_device.get("section") or "手动补录").strip() or "手动补录"
record_id = str(raw_device.get("id") or "").strip() or f"manual:{normalize_text(brand)}:{normalize_text(device_name)}"
parent_brand = brand_parent_map.get(brand, str(raw_device.get("parent_brand") or brand).strip() or brand)
records.append(
DeviceRecord(
id=record_id,
device_name=device_name,
brand=brand,
manufacturer_brand=brand,
parent_brand=parent_brand,
market_brand=brand,
device_type=device_type,
aliases=aliases,
source_file=MANUAL_SOURCE_FILE,
section=section,
)
)
return records
def infer_market_brand(
manufacturer_brand: str,
device_name: str,
@@ -300,6 +431,8 @@ def infer_device_type(
return "tablet"
if has_keyword(corpus, WEAR_KEYWORDS):
return "wear"
if has_keyword(corpus, COMPUTER_KEYWORDS):
return "computer"
if has_keyword(corpus, OTHER_KEYWORDS):
return "other"
if has_keyword(corpus, PHONE_KEYWORDS):
@@ -330,6 +463,40 @@ def extract_codes(text: str) -> List[str]:
return [code.strip() for code in BACKTICK_RE.findall(text) if code.strip()]
def infer_base_variant_name(variant_name: str, entry_title: str) -> Optional[str]:
base = re.split(r"\s+/\s+", variant_name.strip(), maxsplit=1)[0].strip()
if not base:
return None
base = re.sub(r"\s*(?:国行版|国内版|中国版|印度版|欧洲版|国际版|北美版|日本版|韩国版|港版|台版|海外版)\s*$", "", base)
base = re.sub(
r"\s+(?:China|Chinese|India|Europe|European|Global|International|North America|North American|Japan|Korea|Hong Kong|Taiwan|US|USA|T-Mobile|Verizon|AT&T|SIM Free|SoftBank)\s*$",
"",
base,
flags=re.IGNORECASE,
)
base = " ".join(base.split())
if not base or normalize_text(base) not in normalize_text(entry_title):
return None
return base
def split_variant_groups(entry_title: str, title_codes: Iterable[str], variants: list[tuple[list[str], str]]) -> list[tuple[str, Set[str]]]:
groups: dict[str, Set[str]] = {}
for variant_codes, variant_name in variants:
base_name = infer_base_variant_name(variant_name, entry_title)
if not base_name:
return []
aliases = groups.setdefault(base_name, set(title_codes))
aliases.add(base_name)
aliases.add(variant_name)
aliases.update(variant_codes)
if len(groups) < 2:
return []
return list(groups.items())
def parse_brand_file(path: Path) -> List[DeviceRecord]:
file_stem = path.stem
brand = canonical_brand(file_stem)
@@ -340,43 +507,50 @@ def parse_brand_file(path: Path) -> List[DeviceRecord]:
section = ""
current_title = ""
current_title_codes: List[str] = []
current_aliases: Set[str] = set()
current_variants: list[tuple[list[str], str]] = []
def flush_current() -> None:
nonlocal current_title, current_aliases
nonlocal current_title, current_title_codes, current_aliases, current_variants
if not current_title:
return
aliases = sorted({alias.strip() for alias in current_aliases if alias.strip()})
record_id = f"{file_stem}:{len(records) + 1}"
device_type = infer_device_type(
device_name=current_title,
section=section,
source_file=path.name,
aliases=aliases,
default_type=default_type,
)
records.append(
DeviceRecord(
id=record_id,
device_name=current_title,
brand=brand,
manufacturer_brand=brand,
parent_brand=resolve_parent_brand(brand),
market_brand=infer_market_brand(
manufacturer_brand=brand,
device_name=current_title,
section=section,
aliases=aliases,
),
device_type=device_type,
aliases=aliases,
source_file=f"brands/{path.name}",
split_groups = split_variant_groups(current_title, current_title_codes, current_variants)
record_groups = split_groups or [(current_title, current_aliases)]
for device_name, raw_aliases in record_groups:
aliases = sorted({alias.strip() for alias in raw_aliases if alias.strip()})
record_id = f"{file_stem}:{len(records) + 1}"
device_type = infer_device_type(
device_name=device_name,
section=section,
source_file=path.name,
aliases=aliases,
default_type=default_type,
)
records.append(
DeviceRecord(
id=record_id,
device_name=device_name,
brand=brand,
manufacturer_brand=brand,
parent_brand=resolve_parent_brand(brand),
market_brand=infer_market_brand(
manufacturer_brand=brand,
device_name=device_name,
section=section,
aliases=aliases,
),
device_type=device_type,
aliases=aliases,
source_file=f"brands/{path.name}",
section=section,
)
)
)
current_title = ""
current_title_codes = []
current_aliases = set()
current_variants = []
for raw in lines:
line = raw.strip()
@@ -385,6 +559,7 @@ def parse_brand_file(path: Path) -> List[DeviceRecord]:
section_match = SECTION_RE.match(line)
if section_match:
flush_current()
section = section_match.group(1).strip()
continue
@@ -393,8 +568,10 @@ def parse_brand_file(path: Path) -> List[DeviceRecord]:
flush_current()
raw_title = entry_match.group(1).strip()
current_title = clean_entry_title(raw_title)
current_aliases = set(extract_codes(raw_title))
current_title_codes = extract_codes(raw_title)
current_aliases = set(current_title_codes)
current_aliases.add(current_title)
current_variants = []
continue
if not current_title:
@@ -404,6 +581,7 @@ def parse_brand_file(path: Path) -> List[DeviceRecord]:
if variant_match:
variant_codes = extract_codes(variant_match.group(1))
variant_name = variant_match.group(2).strip()
current_variants.append((variant_codes, variant_name))
current_aliases.update(variant_codes)
current_aliases.add(variant_name)
@@ -601,7 +779,15 @@ class DeviceMapper:
}
matched_records = [r for r in [self.records_by_id[rid] for rid in candidate_ids] if self._brand_match(fallback_filter, r)]
matched_records.sort(key=lambda r: (r.device_name, r.source_file, r.id))
matched_records.sort(
key=lambda r: (
0 if is_preferred_cn_source_file(r.source_file) else 1,
r.device_name,
r.source_file,
r.id,
)
)
matched_records = first_preferred_match(matched_records)
if matched_records:
best = matched_records[0]
@@ -635,10 +821,13 @@ class DeviceMapper:
def build_records(repo_root: Path) -> List[DeviceRecord]:
brands_dir = repo_root / "brands"
records: List[DeviceRecord] = []
manual_catalog = load_manual_catalog(repo_root)
apply_manual_brand_overrides(manual_catalog)
for md_path in sorted(brands_dir.glob("*.md")):
records.extend(parse_brand_file(md_path))
records.extend(parse_manual_catalog(repo_root, manual_catalog))
return records
+4 -5
View File
@@ -9,6 +9,7 @@ from pathlib import Path
from typing import Iterable
from device_mapper import (
MANUAL_SOURCE_FILE,
MARKET_BRAND_ALIASES,
MARKET_BRAND_TO_MANUFACTURER,
build_records,
@@ -28,9 +29,11 @@ def is_cn_source_file(source_file: str) -> bool:
def build_source_order(records: list[object]) -> list[str]:
source_files = sorted({record.source_file for record in records})
manual = [source for source in source_files if source == MANUAL_SOURCE_FILE]
source_files = [source for source in source_files if source != MANUAL_SOURCE_FILE]
cn = [source for source in source_files if is_cn_source_file(source)]
other = [source for source in source_files if not is_cn_source_file(source)]
return sorted(cn) + sorted(other)
return manual + sorted(cn) + sorted(other)
def build_source_weights(records: list[object]) -> tuple[dict[str, int], dict[str, float]]:
@@ -260,9 +263,7 @@ def main() -> int:
"",
f"-- device_records: {device_record_count}",
f"-- device_catalog_rows: {len(catalog_rows)}",
f"-- device_lookup_rows: {len(catalog_rows)}",
f"-- brand_lookup_rows: {len(brand_rows)}",
f"-- legacy_models_rows: {len(catalog_rows)}",
"",
])
@@ -271,9 +272,7 @@ def main() -> int:
print(f"Exported MySQL seed: {output_path}")
print(f"device_records={device_record_count}")
print(f"device_catalog_rows={len(catalog_rows)}")
print(f"device_lookup_rows={len(catalog_rows)}")
print(f"brand_lookup_rows={len(brand_rows)}")
print(f"legacy_models_rows={len(catalog_rows)}")
return 0
+79 -1
View File
@@ -29,12 +29,88 @@ sync_missing_dir_entries() {
done
}
refresh_workspace_builtin_entries() {
src_dir="$1"
dst_dir="$2"
mkdir -p "$dst_dir"
for rel_path in \
brands \
misc \
CHANGELOG.md \
CHANGELOG_en.md \
LICENSE.txt
do
src_entry="$src_dir/$rel_path"
dst_entry="$dst_dir/$rel_path"
[ -e "$src_entry" ] || continue
rm -rf "$dst_entry"
mkdir -p "$(dirname "$dst_entry")"
cp -a "$src_entry" "$dst_entry"
done
if [ -d "$src_dir/local" ]; then
sync_missing_dir_entries "$src_dir/local" "$dst_dir/local"
fi
}
workspace_has_sync_metadata() {
[ -f "$DATA_ROOT/state/sync_status.json" ]
}
migrate_legacy_manual_catalog() {
legacy_dir="$DATA_ROOT/workspace/brands/local"
target_dir="$DATA_ROOT/workspace/local"
legacy_file="$legacy_dir/manual_catalog.json"
target_file="$target_dir/manual_catalog.json"
[ -d "$legacy_dir" ] || return
mkdir -p "$target_dir"
if [ -f "$legacy_file" ] && [ ! -f "$target_file" ]; then
cp -a "$legacy_file" "$target_file"
fi
# Older builds accidentally copied local overlay files under brands/local.
# Once the real target exists, drop the misplaced directory to avoid confusion.
rm -rf "$legacy_dir"
}
init_path() {
rel_path="$1"
src_path="$APP_ROOT/$rel_path"
dst_path="$DATA_ROOT/$rel_path"
if [ -d "$src_path" ]; then
# Bind-mounted paths like /app/dist cannot be removed and replaced with symlinks.
# Keep the mount in place and only ensure the runtime copy exists.
if [ "$rel_path" = "dist" ]; then
if [ -d "$src_path" ]; then
if [ ! -e "$dst_path" ] && [ ! -L "$dst_path" ]; then
mkdir -p "$(dirname "$dst_path")"
cp -a "$src_path" "$dst_path"
else
sync_missing_dir_entries "$src_path" "$dst_path"
fi
elif [ ! -e "$dst_path" ] && [ ! -L "$dst_path" ]; then
mkdir -p "$(dirname "$dst_path")"
cp -a "$src_path" "$dst_path"
fi
return
fi
if [ "$rel_path" = "workspace" ] && [ -d "$src_path" ] && [ ! -L "$src_path" ]; then
if [ ! -e "$dst_path" ] && [ ! -L "$dst_path" ]; then
mkdir -p "$(dirname "$dst_path")"
cp -a "$src_path" "$dst_path"
elif workspace_has_sync_metadata; then
sync_missing_dir_entries "$src_path/local" "$dst_path/local"
else
refresh_workspace_builtin_entries "$src_path" "$dst_path"
fi
elif [ -d "$src_path" ]; then
if [ ! -e "$dst_path" ] && [ ! -L "$dst_path" ]; then
mkdir -p "$(dirname "$dst_path")"
cp -a "$src_path" "$dst_path"
@@ -65,3 +141,5 @@ for rel_path in \
do
init_path "$rel_path"
done
migrate_legacy_manual_catalog
-1
View File
@@ -95,7 +95,6 @@ def ensure_reader_user(
CREATE USER IF NOT EXISTS '{sql_string(reader_user)}'@'%' IDENTIFIED BY '{sql_string(reader_password)}';
ALTER USER '{sql_string(reader_user)}'@'%' IDENTIFIED BY '{sql_string(reader_password)}';
GRANT SELECT ON `{database}`.* TO '{sql_string(reader_user)}'@'%';
GRANT SELECT ON `python_services_test`.* TO '{sql_string(reader_user)}'@'%';
FLUSH PRIVILEGES;
"""
proc = subprocess.run(
+697 -5
View File
@@ -15,6 +15,7 @@ from http import HTTPStatus
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from device_mapper import build_records
from project_layout import PROJECT_ROOT, WORKSPACE_ROOT
from sync_upstream_mobilemodels import DEFAULT_BRANCH, DEFAULT_REPO_URL
@@ -24,15 +25,27 @@ INDEX_PATH = PROJECT_ROOT / "dist/device_index.json"
MYSQL_SEED_PATH = PROJECT_ROOT / "dist/mobilemodels_mysql_seed.sql"
MYSQL_LOADER = PROJECT_ROOT / "tools/load_mysql_seed.py"
DATA_ROOT = Path(os.environ.get("MOBILEMODELS_DATA_ROOT", "/data"))
MANUAL_CATALOG_PATH = WORKSPACE_ROOT / "local/manual_catalog.json"
SYNC_METADATA_PATH = DATA_ROOT / "state/sync_status.json"
SCHEDULE_CONFIG_PATH = DATA_ROOT / "state/sync_schedule.json"
MYSQL_CONFIG_PATH = DATA_ROOT / "state/mysql_settings.json"
SYNC_LOCK = threading.Lock()
SCHEDULE_LOCK = threading.Lock()
MYSQL_CONFIG_LOCK = threading.Lock()
INDEX_ALIAS_LOCK = threading.Lock()
MANUAL_REBUILD_LOCK = threading.Lock()
NORMALIZE_RE = re.compile(r"[^0-9a-z\u4e00-\u9fff]+")
SCHEDULE_TIME_RE = re.compile(r"^(?:[01]?\d|2[0-3]):[0-5]\d$")
SCHEDULER_POLL_SECONDS = 20
INDEX_DEVICE_NAME_ALIAS_MAP: dict[str, list[str]] | None = None
DEVICE_TYPES = {"phone", "tablet", "wear", "tv", "computer", "other"}
LAST_MANUAL_MYSQL_LOAD_STATUS: dict[str, object] = {
"running": False,
"last_started_at": None,
"last_finished_at": None,
"last_status": None,
"last_message": None,
}
def truthy_env(name: str, default: str = "0") -> bool:
@@ -301,6 +314,495 @@ def sql_string(value: str) -> str:
return (value or "").replace("\\", "\\\\").replace("'", "''")
def normalize_alias_list(*groups: object) -> list[str]:
aliases: list[str] = []
seen: set[str] = set()
for group in groups:
if group is None:
continue
items = group if isinstance(group, list) else [group]
for item in items:
text = str(item or "").strip()
key = normalize_text(text)
if not text or not key or key in seen:
continue
seen.add(key)
aliases.append(text)
return aliases
def default_manual_catalog() -> dict[str, object]:
return {"brands": [], "devices": []}
def read_manual_catalog() -> dict[str, object]:
if not MANUAL_CATALOG_PATH.exists():
return default_manual_catalog()
try:
payload = json.loads(MANUAL_CATALOG_PATH.read_text(encoding="utf-8"))
except Exception:
return default_manual_catalog()
if not isinstance(payload, dict):
return default_manual_catalog()
brands = payload.get("brands")
devices = payload.get("devices")
return {
"brands": brands if isinstance(brands, list) else [],
"devices": devices if isinstance(devices, list) else [],
}
def write_manual_catalog(payload: dict[str, object]) -> dict[str, object]:
catalog = {
"brands": payload.get("brands") if isinstance(payload.get("brands"), list) else [],
"devices": payload.get("devices") if isinstance(payload.get("devices"), list) else [],
}
MANUAL_CATALOG_PATH.parent.mkdir(parents=True, exist_ok=True)
MANUAL_CATALOG_PATH.write_text(json.dumps(catalog, ensure_ascii=False, indent=2), encoding="utf-8")
return catalog
def canonical_manual_brand(raw: object) -> dict[str, object]:
if not isinstance(raw, dict):
raise RuntimeError("品牌数据格式无效。")
name = str(raw.get("name") or "").strip()
if not name:
raise RuntimeError("品牌名称不能为空。")
parent_brand = str(raw.get("parent_brand") or name).strip() or name
aliases = normalize_alias_list(name, raw.get("aliases"))
now = local_now().isoformat(timespec="seconds")
return {
"name": name,
"parent_brand": parent_brand,
"aliases": aliases,
"updated_at": str(raw.get("updated_at") or now),
"created_at": str(raw.get("created_at") or now),
}
def canonical_manual_device(raw: object, allowed_brands: set[str]) -> dict[str, object]:
if not isinstance(raw, dict):
raise RuntimeError("设备数据格式无效。")
brand = str(raw.get("brand") or "").strip()
if not brand:
raise RuntimeError("所属品牌不能为空。")
if brand not in allowed_brands:
raise RuntimeError(f"所属品牌不存在: {brand}")
device_name = str(raw.get("device_name") or "").strip()
if not device_name:
raise RuntimeError("设备名称不能为空。")
device_type = str(raw.get("device_type") or "").strip().lower() or "other"
if device_type not in DEVICE_TYPES:
raise RuntimeError("设备类型无效。")
models = normalize_alias_list(raw.get("models"))
if not models:
raise RuntimeError("设备标识至少保留一个。")
aliases = normalize_alias_list(device_name, raw.get("aliases"))
section = str(raw.get("section") or "手动补录").strip() or "手动补录"
stable_id = f"manual:{normalize_text(brand)}:{normalize_text(device_name)}"
now = local_now().isoformat(timespec="seconds")
return {
"id": str(raw.get("id") or stable_id).strip() or stable_id,
"brand": brand,
"device_name": device_name,
"device_type": device_type,
"models": models,
"aliases": aliases,
"section": section,
"updated_at": str(raw.get("updated_at") or now),
"created_at": str(raw.get("created_at") or now),
}
def validate_manual_catalog(payload: dict[str, object]) -> dict[str, object]:
brands_raw = payload.get("brands") if isinstance(payload.get("brands"), list) else []
devices_raw = payload.get("devices") if isinstance(payload.get("devices"), list) else []
brands: list[dict[str, object]] = []
seen_brand_keys: set[str] = set()
for raw_brand in brands_raw:
brand = canonical_manual_brand(raw_brand)
brand_key = normalize_text(str(brand["name"]))
if brand_key in seen_brand_keys:
raise RuntimeError(f"品牌重复: {brand['name']}")
seen_brand_keys.add(brand_key)
brands.append(brand)
builtin_brands = load_builtin_brand_names()
allowed_brands = builtin_brands | {str(item["name"]) for item in brands}
devices: list[dict[str, object]] = []
seen_device_ids: set[str] = set()
for raw_device in devices_raw:
device = canonical_manual_device(raw_device, allowed_brands)
if device["id"] in seen_device_ids:
raise RuntimeError(f"设备 ID 重复: {device['id']}")
seen_device_ids.add(str(device["id"]))
devices.append(device)
return {"brands": brands, "devices": devices}
def load_builtin_brand_names() -> set[str]:
records = build_records(WORKSPACE_ROOT)
names = {
str(record.brand).strip()
for record in records
if str(record.source_file) != "local/manual_catalog.json"
}
names.update(
str(record.market_brand).strip()
for record in records
if str(record.source_file) != "local/manual_catalog.json"
)
names.update(
str(record.parent_brand).strip()
for record in records
if str(record.source_file) != "local/manual_catalog.json"
)
return {name for name in names if name}
def count_manual_alias_conflicts(device: dict[str, object]) -> int:
records = build_records(WORKSPACE_ROOT)
alias_set = {
normalize_text(alias)
for alias in normalize_alias_list(device.get("models"), device.get("aliases"), device.get("device_name"))
}
conflicts: set[str] = set()
for record in records:
if str(record.source_file) == "local/manual_catalog.json":
continue
for alias in getattr(record, "aliases", []):
alias_norm = normalize_text(str(alias))
if alias_norm and alias_norm in alias_set:
conflicts.add(alias_norm)
return len(conflicts)
def _manual_mysql_loader_task() -> None:
with MANUAL_REBUILD_LOCK:
LAST_MANUAL_MYSQL_LOAD_STATUS["running"] = True
LAST_MANUAL_MYSQL_LOAD_STATUS["last_started_at"] = local_now().isoformat(timespec="seconds")
LAST_MANUAL_MYSQL_LOAD_STATUS["last_status"] = "running"
LAST_MANUAL_MYSQL_LOAD_STATUS["last_message"] = "手动补录触发的 MySQL 刷新进行中。"
try:
load_proc = run_command(["python3", str(MYSQL_LOADER)])
message = "\n".join(part for part in [load_proc.stdout.strip(), load_proc.stderr.strip()] if part).strip() or "MySQL 已刷新。"
LAST_MANUAL_MYSQL_LOAD_STATUS["last_status"] = "success" if load_proc.returncode == 0 else "failed"
LAST_MANUAL_MYSQL_LOAD_STATUS["last_message"] = message
except Exception as err:
LAST_MANUAL_MYSQL_LOAD_STATUS["last_status"] = "failed"
LAST_MANUAL_MYSQL_LOAD_STATUS["last_message"] = str(err)
finally:
LAST_MANUAL_MYSQL_LOAD_STATUS["running"] = False
LAST_MANUAL_MYSQL_LOAD_STATUS["last_finished_at"] = local_now().isoformat(timespec="seconds")
def start_manual_mysql_loader() -> bool:
if LAST_MANUAL_MYSQL_LOAD_STATUS.get("running"):
return False
thread = threading.Thread(target=_manual_mysql_loader_task, name="manual-mysql-loader", daemon=True)
thread.start()
return True
def rebuild_generated_outputs(*, defer_mysql_load: bool = False) -> dict[str, object]:
build_proc = run_command(
[
"python3",
str(PROJECT_ROOT / "tools/device_mapper.py"),
"--repo-root",
str(WORKSPACE_ROOT),
"build",
"--output",
str(INDEX_PATH),
]
)
build_output = "\n".join(part for part in [build_proc.stdout.strip(), build_proc.stderr.strip()] if part).strip()
if build_proc.returncode != 0:
raise RuntimeError(build_output or "重建设备索引失败。")
seed_proc = run_command(
[
"python3",
str(PROJECT_ROOT / "tools/export_mysql_seed.py"),
"--repo-root",
str(WORKSPACE_ROOT),
"--output",
str(MYSQL_SEED_PATH),
]
)
seed_output = "\n".join(part for part in [seed_proc.stdout.strip(), seed_proc.stderr.strip()] if part).strip()
if seed_proc.returncode != 0:
raise RuntimeError(seed_output or "导出 MySQL seed 失败。")
mysql_loaded = False
mysql_message = "MySQL 未刷新。"
if mysql_auto_load_enabled():
if defer_mysql_load:
started = start_manual_mysql_loader()
mysql_message = "MySQL 后台刷新中。" if started else "MySQL 后台刷新已在进行中。"
else:
load_proc = run_command(["python3", str(MYSQL_LOADER)])
mysql_message = "\n".join(part for part in [load_proc.stdout.strip(), load_proc.stderr.strip()] if part).strip() or "MySQL 已刷新。"
if load_proc.returncode != 0:
raise RuntimeError(mysql_message)
mysql_loaded = True
return {
"index_updated": True,
"mysql_seed_updated": True,
"mysql_loaded": mysql_loaded,
"message": (
"本地覆盖库已保存,索引与 MySQL seed 已刷新,MySQL 正在后台刷新。"
if defer_mysql_load and mysql_auto_load_enabled()
else "本地覆盖库已保存,索引与 MySQL seed 已刷新。"
if mysql_loaded
else "本地覆盖库已保存,索引与 MySQL seed 已刷新,MySQL 未自动装载。"
),
"build_output": build_output,
"mysql_seed_output": seed_output,
"mysql_message": mysql_message,
}
def manual_catalog_payload() -> dict[str, object]:
catalog = validate_manual_catalog(read_manual_catalog())
brands = sorted(catalog["brands"], key=lambda item: str(item["name"]).lower())
devices = sorted(catalog["devices"], key=lambda item: (str(item["brand"]).lower(), str(item["device_name"]).lower()))
return {
"brands": brands,
"devices": devices,
"stats": {
"brand_count": len(brands),
"device_count": len(devices),
},
"catalog_file": str(MANUAL_CATALOG_PATH.relative_to(PROJECT_ROOT)),
"mysql_refresh": {
"running": bool(LAST_MANUAL_MYSQL_LOAD_STATUS.get("running")),
"last_started_at": LAST_MANUAL_MYSQL_LOAD_STATUS.get("last_started_at"),
"last_finished_at": LAST_MANUAL_MYSQL_LOAD_STATUS.get("last_finished_at"),
"last_status": LAST_MANUAL_MYSQL_LOAD_STATUS.get("last_status"),
"last_message": LAST_MANUAL_MYSQL_LOAD_STATUS.get("last_message"),
},
}
def upsert_manual_brand(payload: dict[str, object]) -> dict[str, object]:
if not SYNC_LOCK.acquire(blocking=False):
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
try:
catalog = validate_manual_catalog(read_manual_catalog())
incoming = canonical_manual_brand(payload)
brand_key = normalize_text(str(incoming["name"]))
existing_by_key = {normalize_text(str(item["name"])): item for item in catalog["brands"]}
existing = existing_by_key.get(brand_key)
if existing:
incoming["created_at"] = existing.get("created_at") or incoming["created_at"]
catalog["brands"] = [
item for item in catalog["brands"] if normalize_text(str(item["name"])) != brand_key
]
catalog["brands"].append(incoming)
validated = validate_manual_catalog(catalog)
write_manual_catalog(validated)
rebuild_result = rebuild_generated_outputs(defer_mysql_load=True)
return {
"saved_brand": incoming,
"catalog": manual_catalog_payload(),
**rebuild_result,
}
finally:
SYNC_LOCK.release()
def upsert_manual_device(payload: dict[str, object]) -> dict[str, object]:
if not SYNC_LOCK.acquire(blocking=False):
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
try:
catalog = validate_manual_catalog(read_manual_catalog())
allowed_brands = load_builtin_brand_names() | {str(item["name"]) for item in catalog["brands"]}
device_payload = canonical_manual_device(payload, allowed_brands)
existing = next((item for item in catalog["devices"] if str(item.get("id")) == str(device_payload["id"])), None)
if existing:
device_payload["created_at"] = existing.get("created_at") or device_payload["created_at"]
catalog["devices"] = [item for item in catalog["devices"] if str(item.get("id")) != str(device_payload["id"])]
catalog["devices"].append(device_payload)
validated = validate_manual_catalog(catalog)
write_manual_catalog(validated)
rebuild_result = rebuild_generated_outputs(defer_mysql_load=True)
return {
"saved_device": device_payload,
"alias_conflict_count": count_manual_alias_conflicts(device_payload),
"catalog": manual_catalog_payload(),
**rebuild_result,
}
finally:
SYNC_LOCK.release()
def delete_manual_brand(payload: dict[str, object]) -> dict[str, object]:
if not SYNC_LOCK.acquire(blocking=False):
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
try:
brand_name = str(payload.get("name") or "").strip()
if not brand_name:
raise RuntimeError("品牌名称不能为空。")
catalog = validate_manual_catalog(read_manual_catalog())
active_devices = [item for item in catalog["devices"] if str(item.get("brand") or "").strip() == brand_name]
if active_devices:
raise RuntimeError("该品牌下仍有关联设备,请先删除设备后再删除品牌。")
next_brands = [item for item in catalog["brands"] if str(item.get("name") or "").strip() != brand_name]
if len(next_brands) == len(catalog["brands"]):
raise RuntimeError(f"未找到品牌: {brand_name}")
write_manual_catalog({"brands": next_brands, "devices": catalog["devices"]})
rebuild_result = rebuild_generated_outputs(defer_mysql_load=True)
return {
"deleted_brand": brand_name,
"catalog": manual_catalog_payload(),
**rebuild_result,
}
finally:
SYNC_LOCK.release()
def delete_manual_device(payload: dict[str, object]) -> dict[str, object]:
if not SYNC_LOCK.acquire(blocking=False):
raise RuntimeError("已有同步或数据重建任务在执行,请稍后再试。")
try:
device_id = str(payload.get("id") or "").strip()
if not device_id:
raise RuntimeError("设备 ID 不能为空。")
catalog = validate_manual_catalog(read_manual_catalog())
next_devices = [item for item in catalog["devices"] if str(item.get("id") or "").strip() != device_id]
if len(next_devices) == len(catalog["devices"]):
raise RuntimeError(f"未找到设备: {device_id}")
write_manual_catalog({"brands": catalog["brands"], "devices": next_devices})
rebuild_result = rebuild_generated_outputs(defer_mysql_load=True)
return {
"deleted_device": device_id,
"catalog": manual_catalog_payload(),
**rebuild_result,
}
finally:
SYNC_LOCK.release()
def parse_apple_series_generation(name: str) -> dict[str, object] | None:
text = re.sub(r"\s+", " ", str(name or "").strip())
if not text:
return None
ordinal_match = re.match(r"^(.*?)(?:\s*\((\d+)(?:st|nd|rd|th)\s+generation\)|\s+(\d+))$", text, re.IGNORECASE)
if ordinal_match:
base_label = re.sub(r"\s+", " ", str(ordinal_match.group(1) or "").strip())
generation = int(ordinal_match.group(2) or ordinal_match.group(3) or "0")
if base_label and generation > 0:
return {
"base_label": base_label,
"base_norm": normalize_text(base_label),
"generation": generation,
"chip_like": False,
}
chip_like = bool(re.search(r"\((?:[^)]*\b(?:a\d{1,2}|m\d{1,2})\b[^)]*)\)$", text, re.IGNORECASE))
base_label = re.sub(r"\s*\([^)]*\)\s*$", "", text).strip()
if not base_label:
return None
return {
"base_label": base_label,
"base_norm": normalize_text(base_label),
"generation": None,
"chip_like": chip_like,
}
def build_index_device_name_alias_map() -> dict[str, list[str]]:
if not INDEX_PATH.exists():
return {}
try:
payload = json.loads(INDEX_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
records = payload.get("records") if isinstance(payload, dict) else None
if not isinstance(records, list):
return {}
grouped: dict[str, dict[str, object]] = {}
for record in records:
if not isinstance(record, dict):
continue
brand = str(record.get("market_brand") or record.get("manufacturer_brand") or record.get("brand") or "").strip()
if brand != "Apple":
continue
device_name = str(record.get("device_name") or "").strip()
parsed = parse_apple_series_generation(device_name)
if not parsed or not parsed.get("base_norm"):
continue
base_norm = str(parsed["base_norm"])
group = grouped.setdefault(
base_norm,
{
"base_label": str(parsed["base_label"]),
"items": [],
},
)
items = group["items"]
if isinstance(items, list):
items.append(
{
"device_name": device_name,
"generation": parsed.get("generation"),
"chip_like": bool(parsed.get("chip_like")),
}
)
alias_map: dict[str, set[str]] = {}
for group in grouped.values():
items = group.get("items")
if not isinstance(items, list):
continue
explicit_generations = sorted(
{
int(item["generation"])
for item in items
if isinstance(item, dict) and isinstance(item.get("generation"), int) and int(item["generation"]) > 0
}
)
max_explicit_generation = explicit_generations[-1] if explicit_generations else 0
base_label = str(group.get("base_label") or "").strip()
if not base_label:
continue
for item in items:
if not isinstance(item, dict):
continue
generation = item.get("generation")
device_name = str(item.get("device_name") or "").strip()
if not isinstance(generation, int):
if device_name == base_label and max_explicit_generation >= 2:
generation = 1
elif item.get("chip_like") and max_explicit_generation >= 1:
generation = max_explicit_generation + 1
if not isinstance(generation, int) or generation <= 0:
continue
alias_key = normalize_text(f"{base_label} {generation}")
if not alias_key:
continue
alias_map.setdefault(alias_key, set()).add(device_name)
return {key: sorted(values) for key, values in alias_map.items()}
def resolve_index_device_names(alias_norm: str) -> list[str]:
global INDEX_DEVICE_NAME_ALIAS_MAP
if INDEX_DEVICE_NAME_ALIAS_MAP is None:
with INDEX_ALIAS_LOCK:
if INDEX_DEVICE_NAME_ALIAS_MAP is None:
INDEX_DEVICE_NAME_ALIAS_MAP = build_index_device_name_alias_map()
return list((INDEX_DEVICE_NAME_ALIAS_MAP or {}).get(alias_norm, []))
def mysql_command(database: str | None = None) -> list[str]:
command = [
"mysql",
@@ -355,10 +857,22 @@ def run_mysql_query(sql: str, database: str | None = None) -> list[dict[str, str
return rows
SQL_CN_SOURCE_ORDER = """
CASE
WHEN source_file = 'local/manual_catalog.json'
OR (source_file NOT LIKE '%_en.md' AND source_file NOT LIKE '%_global_en.md')
THEN 0
ELSE 1
END ASC,
source_rank ASC,
record_id ASC
""".strip()
def build_sql_query_payload(payload: dict[str, object]) -> dict[str, object]:
raw_value = str(payload.get("model_raw") or payload.get("model") or "").strip()
if not raw_value:
raise RuntimeError("请填写设备标识。")
raise RuntimeError("请填写设备标识或设备名称")
alias_norm = normalize_text(raw_value)
if not alias_norm:
@@ -369,9 +883,9 @@ def build_sql_query_payload(payload: dict[str, object]) -> dict[str, object]:
limit = int(limit_value)
except Exception as err:
raise RuntimeError("limit 必须是数字。") from err
limit = max(1, min(limit, 100))
limit = 1
sql = f"""
exact_sql = f"""
SELECT
model,
record_id,
@@ -391,17 +905,153 @@ SELECT
ver_name
FROM mobilemodels.mm_device_catalog
WHERE alias_norm = '{sql_string(alias_norm)}'
ORDER BY source_rank ASC, record_id ASC
ORDER BY {SQL_CN_SOURCE_ORDER}
LIMIT {limit};
""".strip()
rows = run_mysql_query(sql)
rows = run_mysql_query(exact_sql)
sql = exact_sql
match_strategy = "alias_norm_exact"
match_strategy_label = "alias_norm 精确匹配"
resolved_device_names: list[str] = []
if not rows:
resolved_device_names = resolve_index_device_names(alias_norm)
if resolved_device_names:
name_conditions = " OR ".join(
f"device_name = '{sql_string(device_name)}'" for device_name in resolved_device_names
)
sql = f"""
SELECT
model,
record_id,
alias_norm,
device_name,
brand,
manufacturer_brand,
parent_brand,
market_brand,
device_type,
source_file,
section,
source_rank,
source_weight,
code,
code_alias,
ver_name
FROM mobilemodels.mm_device_catalog
WHERE {name_conditions}
ORDER BY {SQL_CN_SOURCE_ORDER}
LIMIT {limit};
""".strip()
rows = run_mysql_query(sql)
if rows:
match_strategy = "device_name_alias"
match_strategy_label = "设备名称别名映射"
return {
"query_mode": "sql",
"model_raw": raw_value,
"alias_norm": alias_norm,
"limit": limit,
"sql": sql,
"match_strategy": match_strategy,
"match_strategy_label": match_strategy_label,
"resolved_device_names": resolved_device_names,
"rows": rows,
"row_count": len(rows),
}
def build_sql_device_name_query_payload(payload: dict[str, object]) -> dict[str, object]:
raw_value = str(payload.get("device_name") or payload.get("name") or "").strip()
if not raw_value:
raise RuntimeError("请填写设备名称。")
limit_value = payload.get("limit", 20)
try:
limit = int(limit_value)
except Exception as err:
raise RuntimeError("limit 必须是数字。") from err
limit = 1
alias_norm = normalize_text(raw_value)
resolved_device_names = resolve_index_device_names(alias_norm) if alias_norm else []
rows: list[dict[str, str | None]] = []
match_strategy = "device_name_like"
match_strategy_label = "device_name 模糊匹配"
if resolved_device_names:
name_conditions = " OR ".join(
f"device_name = '{sql_string(device_name)}'" for device_name in resolved_device_names
)
sql = f"""
SELECT
model,
record_id,
alias_norm,
device_name,
brand,
manufacturer_brand,
parent_brand,
market_brand,
device_type,
source_file,
section,
source_rank,
source_weight,
code,
code_alias,
ver_name
FROM mobilemodels.mm_device_catalog
WHERE {name_conditions}
ORDER BY {SQL_CN_SOURCE_ORDER}
LIMIT {limit};
""".strip()
rows = run_mysql_query(sql)
if rows:
match_strategy = "device_name_alias"
match_strategy_label = "设备名称别名映射"
else:
sql = ""
if not rows:
sql = f"""
SELECT
model,
record_id,
alias_norm,
device_name,
brand,
manufacturer_brand,
parent_brand,
market_brand,
device_type,
source_file,
section,
source_rank,
source_weight,
code,
code_alias,
ver_name
FROM mobilemodels.mm_device_catalog
WHERE device_name LIKE '%{sql_string(raw_value)}%'
OR ver_name LIKE '%{sql_string(raw_value)}%'
ORDER BY {SQL_CN_SOURCE_ORDER}
LIMIT {limit};
""".strip()
rows = run_mysql_query(sql)
return {
"query_mode": "sql_device_name",
"device_name": raw_value,
"alias_norm": alias_norm,
"limit": limit,
"sql": sql,
"match_strategy": match_strategy,
"match_strategy_label": match_strategy_label,
"resolved_device_names": resolved_device_names,
"rows": rows,
"row_count": len(rows),
}
@@ -680,6 +1330,14 @@ class MobileModelsHandler(SimpleHTTPRequestHandler):
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
if self.path == "/api/manual-catalog":
try:
self._send_json(manual_catalog_payload())
except RuntimeError as err:
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
return super().do_GET()
def do_POST(self) -> None:
@@ -715,6 +1373,18 @@ class MobileModelsHandler(SimpleHTTPRequestHandler):
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
if self.path == "/api/query-sql-device-name":
try:
content_length = int(self.headers.get("Content-Length", "0") or "0")
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
req = json.loads(raw_body.decode("utf-8") or "{}")
payload = build_sql_device_name_query_payload(req if isinstance(req, dict) else {})
self._send_json(payload)
except RuntimeError as err:
self._send_json({"error": str(err)}, status=HTTPStatus.BAD_REQUEST)
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
if self.path == "/api/sync-schedule":
try:
content_length = int(self.headers.get("Content-Length", "0") or "0")
@@ -765,6 +1435,28 @@ class MobileModelsHandler(SimpleHTTPRequestHandler):
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
if self.path in {"/api/manual-brand", "/api/manual-device", "/api/manual-brand-delete", "/api/manual-device-delete"}:
try:
content_length = int(self.headers.get("Content-Length", "0") or "0")
raw_body = self.rfile.read(content_length) if content_length > 0 else b"{}"
req = json.loads(raw_body.decode("utf-8") or "{}")
if not isinstance(req, dict):
raise RuntimeError("请求体必须是 JSON 对象。")
if self.path == "/api/manual-brand":
payload = upsert_manual_brand(req)
elif self.path == "/api/manual-device":
payload = upsert_manual_device(req)
elif self.path == "/api/manual-brand-delete":
payload = delete_manual_brand(req)
else:
payload = delete_manual_device(req)
self._send_json(payload)
except RuntimeError as err:
status = HTTPStatus.CONFLICT if "已有同步或数据重建任务" in str(err) else HTTPStatus.BAD_REQUEST
self._send_json({"error": str(err)}, status=status)
except Exception as err:
self._send_json({"error": str(err)}, status=HTTPStatus.INTERNAL_SERVER_ERROR)
return
self._send_json({"error": "Not found"}, status=HTTPStatus.NOT_FOUND)
+1092 -171
View File
File diff suppressed because it is too large Load Diff
+1174 -313
View File
File diff suppressed because it is too large Load Diff
+345 -44
View File
@@ -3,7 +3,7 @@
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>MobileModels 文档查看</title>
<title>MobileModels 使用文档</title>
<style>
:root {
--bg: #f5f7fb;
@@ -12,23 +12,31 @@
--sub: #566173;
--line: #d9e0ea;
--brand: #0f6fff;
--nav-height: 52px;
}
* { box-sizing: border-box; }
body {
margin: 0;
padding-top: var(--nav-height);
font-family: "PingFang SC", "Noto Sans SC", "Microsoft YaHei", sans-serif;
background: radial-gradient(circle at 0 0, #eef4ff 0, var(--bg) 40%), var(--bg);
color: var(--text);
}
.top-nav {
position: fixed;
top: 0;
left: 0;
right: 0;
z-index: 1000;
background: linear-gradient(180deg, #1f2a3a, #1a2431);
border-bottom: 1px solid rgba(255, 255, 255, 0.08);
box-shadow: 0 10px 30px rgba(14, 25, 42, 0.16);
}
.top-nav-inner {
max-width: 1200px;
margin: 0 auto;
padding: 0 16px;
height: 52px;
height: var(--nav-height);
display: flex;
align-items: center;
gap: 8px;
@@ -56,7 +64,18 @@
margin: 24px auto;
padding: 0 16px 32px;
display: grid;
gap: 16px;
grid-template-columns: 260px minmax(0, 1fr);
gap: 12px;
}
.doc-nav-card {
position: sticky;
top: calc(var(--nav-height) + 16px);
z-index: 10;
max-height: calc(100vh - var(--nav-height) - 32px);
overflow: hidden;
display: grid;
grid-template-rows: auto minmax(0, 1fr);
gap: 12px;
}
.card {
background: var(--card);
@@ -76,39 +95,166 @@
font-size: 13px;
line-height: 1.5;
}
.btns {
display: flex;
.doc-toc {
min-height: 0;
overflow: auto;
display: grid;
gap: 8px;
flex-wrap: wrap;
margin-top: 12px;
padding-right: 4px;
}
.btn {
display: inline-flex;
align-items: center;
padding: 9px 14px;
.doc-toc-section {
display: grid;
gap: 6px;
padding: 8px 0 10px;
border-bottom: 1px solid #e7edf7;
}
.doc-toc-section:last-child {
border-bottom: 0;
padding-bottom: 0;
}
.doc-toc-title {
padding: 0 4px;
font-size: 12px;
font-weight: 700;
letter-spacing: 0.02em;
color: #587095;
text-transform: uppercase;
}
.doc-toc-links {
display: grid;
gap: 6px;
}
.doc-toc-link {
display: block;
padding: 7px 10px;
border-radius: 10px;
border: 1px solid #c8d6ee;
background: #f7faff;
color: #244775;
color: #33527f;
text-decoration: none;
font-size: 13px;
font-weight: 700;
line-height: 1.45;
background: transparent;
border: 1px solid transparent;
}
.btn:hover {
background: #eef5ff;
.doc-toc-link:hover {
background: #f4f8ff;
border-color: #e0e8f6;
}
pre {
margin: 0;
.doc-toc-link.level-2 {
font-weight: 600;
color: #28466f;
}
.doc-toc-link.level-3 {
margin-left: 0;
font-size: 12px;
}
.doc-toc-link.level-4 {
margin-left: 12px;
font-size: 12px;
}
.doc-shell {
padding: 0;
overflow: hidden;
}
.doc-shell .markdown-body {
padding: 18px;
}
.markdown-body {
color: #223047;
font-size: 14px;
line-height: 1.75;
}
.markdown-body > :first-child {
margin-top: 0;
}
.markdown-body > :last-child {
margin-bottom: 0;
}
.markdown-body h1,
.markdown-body h2,
.markdown-body h3,
.markdown-body h4 {
margin: 1.2em 0 0.55em;
color: #1b2a42;
line-height: 1.35;
}
.markdown-body h1 { font-size: 32px; }
.markdown-body h2 { font-size: 24px; }
.markdown-body h3 { font-size: 19px; }
.markdown-body h4 { font-size: 16px; }
.markdown-body p,
.markdown-body ul,
.markdown-body ol,
.markdown-body blockquote {
margin: 0 0 1em;
}
.markdown-body ul,
.markdown-body ol {
padding-left: 22px;
}
.markdown-body li + li {
margin-top: 4px;
}
.markdown-body blockquote {
padding: 10px 14px;
border-left: 4px solid #cddcf6;
border-radius: 0 10px 10px 0;
background: #f7faff;
color: #52627a;
}
.markdown-body pre {
margin: 0 0 1em;
white-space: pre-wrap;
word-break: break-word;
font-size: 13px;
line-height: 1.65;
background: #f6f8fb;
border: 1px solid var(--line);
border-radius: 10px;
border-radius: 12px;
padding: 14px;
overflow: auto;
}
.markdown-body pre code {
padding: 0;
border: 0;
background: transparent;
color: inherit;
font-size: inherit;
}
.markdown-body code {
font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace;
font-size: 0.92em;
padding: 1px 6px;
border-radius: 6px;
background: #edf3ff;
color: #294f88;
border: 1px solid #d7e3fb;
}
.markdown-body hr {
border: 0;
border-top: 1px solid #d8e1ef;
margin: 1.2em 0;
}
.markdown-body a {
color: #0f6fff;
text-decoration: none;
}
.markdown-body a:hover {
text-decoration: underline;
}
.markdown-body h2,
.markdown-body h3,
.markdown-body h4 {
scroll-margin-top: calc(var(--nav-height) + 20px);
}
@media (max-width: 1020px) {
.wrap {
grid-template-columns: 1fr;
}
.doc-nav-card {
position: static;
max-height: none;
}
}
</style>
</head>
<body>
@@ -117,51 +263,204 @@
<a href="/web/device_query.html" class="brand">MobileModels</a>
<a href="/web/device_query.html" class="item">设备查询</a>
<a href="/web/brand_management.html" class="item">数据管理</a>
<a href="/web/device_query.html?view=docs" class="item active">相关文档</a>
<a href="/web/device_query.html?view=docs" class="item active">使用文档</a>
</div>
</nav>
<div class="wrap">
<section class="card">
<h1 class="title" id="docTitle">文档查看</h1>
<p class="sub" id="docPath">正在加载文档...</p>
<div class="btns">
<a class="btn" href="/web/doc_viewer.html?path=/docs/mysql-query-design.md&title=MySQL%20%E8%AE%BE%E8%AE%A1%E8%AF%B4%E6%98%8E">MySQL 设计说明</a>
<a class="btn" href="/web/doc_viewer.html?path=/docs/web-ui.md&title=Web%20%E4%BD%BF%E7%94%A8%E8%AF%B4%E6%98%8E">Web 使用说明</a>
<a class="btn" href="/web/doc_viewer.html?path=/README.md&title=%E9%A1%B9%E7%9B%AE%20README">项目 README</a>
</div>
<section class="card doc-nav-card">
<h1 class="title" id="docTitle">使用文档</h1>
<div id="docToc" class="doc-toc" aria-label="文档目录"></div>
</section>
<section class="card">
<pre id="docContent">加载中...</pre>
<section class="card doc-shell">
<div id="docContent" class="markdown-body">加载中...</div>
</section>
</div>
<script>
const ALLOWED_DOCS = new Map([
["/docs/mysql-query-design.md", "MySQL 设计说明"],
["/docs/web-ui.md", "Web 使用说明"],
["/README.md", "项目 README"],
["/README.md", "项目使用文档"],
]);
async function main() {
const params = new URLSearchParams(window.location.search);
const path = params.get("path") || "/docs/mysql-query-design.md";
const title = params.get("title") || ALLOWED_DOCS.get(path) || "文档查看";
const path = params.get("path") || "/README.md";
const title = params.get("title") || ALLOWED_DOCS.get(path) || "使用文档";
const docTitleEl = document.getElementById("docTitle");
const docPathEl = document.getElementById("docPath");
const docContentEl = document.getElementById("docContent");
const docTocEl = document.getElementById("docToc");
function escapeHtml(text) {
return String(text || "")
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")
.replace(/"/g, "&quot;")
.replace(/'/g, "&#39;");
}
function renderInlineMarkdown(text) {
let html = escapeHtml(text || "");
html = html.replace(/`([^`]+)`/g, "<code>$1</code>");
html = html.replace(/\*\*([^*]+)\*\*/g, "<strong>$1</strong>");
html = html.replace(/\[([^\]]+)\]\(([^)]+)\)/g, '<a href="$2" target="_blank" rel="noreferrer">$1</a>');
return html;
}
function slugify(text) {
return String(text || "")
.toLowerCase()
.replace(/<[^>]+>/g, "")
.replace(/[^0-9a-z\u4e00-\u9fff]+/g, "-")
.replace(/^-+|-+$/g, "") || "section";
}
function renderMarkdown(text) {
const normalized = String(text || "").replace(/\r\n/g, "\n");
const lines = normalized.split("\n");
const chunks = [];
const headingIds = new Map();
let i = 0;
while (i < lines.length) {
const line = lines[i];
const trimmed = line.trim();
if (!trimmed) {
i += 1;
continue;
}
if (trimmed.startsWith("```")) {
const lang = trimmed.slice(3).trim();
const codeLines = [];
i += 1;
while (i < lines.length && !lines[i].trim().startsWith("```")) {
codeLines.push(lines[i]);
i += 1;
}
if (i < lines.length) i += 1;
const className = lang ? ` class="language-${escapeHtml(lang)}"` : "";
chunks.push(`<pre><code${className}>${escapeHtml(codeLines.join("\n"))}</code></pre>`);
continue;
}
const heading = trimmed.match(/^(#{1,4})\s+(.*)$/);
if (heading) {
const level = heading[1].length;
const renderedText = renderInlineMarkdown(heading[2]);
const baseId = slugify(heading[2]);
const count = headingIds.get(baseId) || 0;
headingIds.set(baseId, count + 1);
const headingId = count ? `${baseId}-${count + 1}` : baseId;
chunks.push(`<h${level} id="${headingId}">${renderedText}</h${level}>`);
i += 1;
continue;
}
if (/^---+$/.test(trimmed) || /^___+$/.test(trimmed)) {
chunks.push("<hr />");
i += 1;
continue;
}
if (/^>\s?/.test(trimmed)) {
const quoteLines = [];
while (i < lines.length && /^>\s?/.test(lines[i].trim())) {
quoteLines.push(lines[i].trim().replace(/^>\s?/, ""));
i += 1;
}
chunks.push(`<blockquote>${renderInlineMarkdown(quoteLines.join(" "))}</blockquote>`);
continue;
}
if (/^[-*]\s+/.test(trimmed)) {
const items = [];
while (i < lines.length && /^[-*]\s+/.test(lines[i].trim())) {
items.push(lines[i].trim().replace(/^[-*]\s+/, ""));
i += 1;
}
chunks.push(`<ul>${items.map((item) => `<li>${renderInlineMarkdown(item)}</li>`).join("")}</ul>`);
continue;
}
if (/^\d+\.\s+/.test(trimmed)) {
const items = [];
while (i < lines.length && /^\d+\.\s+/.test(lines[i].trim())) {
items.push(lines[i].trim().replace(/^\d+\.\s+/, ""));
i += 1;
}
chunks.push(`<ol>${items.map((item) => `<li>${renderInlineMarkdown(item)}</li>`).join("")}</ol>`);
continue;
}
const paragraphLines = [];
while (i < lines.length) {
const candidate = lines[i];
const candidateTrim = candidate.trim();
if (!candidateTrim) break;
if (
candidateTrim.startsWith("```")
|| /^(#{1,4})\s+/.test(candidateTrim)
|| /^[-*]\s+/.test(candidateTrim)
|| /^\d+\.\s+/.test(candidateTrim)
|| /^>\s?/.test(candidateTrim)
|| /^---+$/.test(candidateTrim)
|| /^___+$/.test(candidateTrim)
) {
break;
}
paragraphLines.push(candidateTrim);
i += 1;
}
chunks.push(`<p>${renderInlineMarkdown(paragraphLines.join(" "))}</p>`);
}
return chunks.join("");
}
function buildToc(container) {
const headings = Array.from(container.querySelectorAll("h2, h3, h4"));
if (!headings.length) {
docTocEl.innerHTML = `<div class="sub">当前文档没有可用目录。</div>`;
return;
}
const groups = [];
let currentGroup = null;
headings.forEach((heading) => {
const level = Number(heading.tagName.slice(1));
const item = {
id: heading.id,
text: escapeHtml(heading.textContent || ""),
level,
};
if (level === 2 || !currentGroup) {
currentGroup = { heading: item, children: [] };
groups.push(currentGroup);
return;
}
currentGroup.children.push(item);
});
docTocEl.innerHTML = groups.map((group) => `
<section class="doc-toc-section">
<div class="doc-toc-title">${group.heading.text}</div>
<div class="doc-toc-links">
<a class="doc-toc-link level-${group.heading.level}" href="#${group.heading.id}">${group.heading.text}</a>
${group.children.map((item) => `<a class="doc-toc-link level-${item.level}" href="#${item.id}">${item.text}</a>`).join("")}
</div>
</section>
`).join("");
}
if (!ALLOWED_DOCS.has(path)) {
docTitleEl.textContent = "文档不存在";
docPathEl.textContent = path;
docContentEl.textContent = "当前只允许查看预设文档。";
docTitleEl.textContent = "使用文档";
docContentEl.innerHTML = "<p>当前只允许查看预设文档。</p>";
return;
}
document.title = `${title} - MobileModels`;
docTitleEl.textContent = title;
docPathEl.textContent = path;
docTitleEl.textContent = "使用文档";
try {
const resp = await fetch(path, { cache: "no-store" });
@@ -169,9 +468,11 @@
throw new Error(`HTTP ${resp.status}`);
}
const text = await resp.text();
docContentEl.textContent = text;
docContentEl.innerHTML = renderMarkdown(text);
buildToc(docContentEl);
} catch (err) {
docContentEl.textContent = `加载失败\n${err.message || err}`;
docContentEl.innerHTML = `<p>加载失败</p><pre>${escapeHtml(err.message || String(err))}</pre>`;
docTocEl.innerHTML = "";
}
}
+14
View File
@@ -0,0 +1,14 @@
{
"brands": [
{
"name": "学而思",
"parent_brand": "好未来",
"aliases": [
"学而思"
],
"updated_at": "2026-04-14T17:05:28+08:00",
"created_at": "2026-04-14T17:05:27+08:00"
}
],
"devices": []
}