165 lines
5.4 KiB
Python
165 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
|
"""Load MobileModels schema and seed data into MySQL."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from project_layout import PROJECT_ROOT
|
|
|
|
|
|
def mysql_env(password: str) -> dict[str, str]:
|
|
env = os.environ.copy()
|
|
env["MYSQL_PWD"] = password
|
|
return env
|
|
|
|
|
|
def mysql_command(user: str, host: str, port: int, database: str | None = None) -> list[str]:
|
|
command = [
|
|
"mysql",
|
|
f"--host={host}",
|
|
f"--port={port}",
|
|
f"--user={user}",
|
|
"--protocol=TCP",
|
|
"--default-character-set=utf8mb4",
|
|
]
|
|
if database:
|
|
command.append(database)
|
|
return command
|
|
|
|
|
|
def mysqladmin_ping(user: str, password: str, host: str, port: int) -> bool:
|
|
proc = subprocess.run(
|
|
[
|
|
"mysqladmin",
|
|
f"--host={host}",
|
|
f"--port={port}",
|
|
f"--user={user}",
|
|
"--protocol=TCP",
|
|
"ping",
|
|
"--silent",
|
|
],
|
|
env=mysql_env(password),
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
return proc.returncode == 0
|
|
|
|
|
|
def wait_for_mysql(user: str, password: str, host: str, port: int, timeout: int) -> None:
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
if mysqladmin_ping(user, password, host, port):
|
|
return
|
|
time.sleep(2)
|
|
raise RuntimeError(f"MySQL 未在 {timeout}s 内就绪: {host}:{port}")
|
|
|
|
|
|
def run_sql_file(user: str, password: str, host: str, port: int, path: Path, database: str | None = None) -> None:
|
|
sql_text = path.read_text(encoding="utf-8")
|
|
proc = subprocess.run(
|
|
mysql_command(user, host, port, database=database),
|
|
env=mysql_env(password),
|
|
input=sql_text,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
if proc.returncode != 0:
|
|
message = proc.stderr.strip() or proc.stdout.strip() or f"mysql exited with {proc.returncode}"
|
|
raise RuntimeError(f"执行 SQL 文件失败 {path}: {message}")
|
|
|
|
|
|
def sql_string(value: str) -> str:
|
|
return value.replace("\\", "\\\\").replace("'", "''")
|
|
|
|
|
|
def ensure_reader_user(
|
|
user: str,
|
|
password: str,
|
|
host: str,
|
|
port: int,
|
|
database: str,
|
|
reader_user: str,
|
|
reader_password: str,
|
|
) -> None:
|
|
sql = f"""
|
|
CREATE USER IF NOT EXISTS '{sql_string(reader_user)}'@'%' IDENTIFIED BY '{sql_string(reader_password)}';
|
|
ALTER USER '{sql_string(reader_user)}'@'%' IDENTIFIED BY '{sql_string(reader_password)}';
|
|
GRANT SELECT ON `{database}`.* TO '{sql_string(reader_user)}'@'%';
|
|
GRANT SELECT ON `python_services_test`.* TO '{sql_string(reader_user)}'@'%';
|
|
FLUSH PRIVILEGES;
|
|
"""
|
|
proc = subprocess.run(
|
|
mysql_command(user, host, port),
|
|
env=mysql_env(password),
|
|
input=sql,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
if proc.returncode != 0:
|
|
message = proc.stderr.strip() or proc.stdout.strip() or f"mysql exited with {proc.returncode}"
|
|
raise RuntimeError(f"创建只读账号失败: {message}")
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Load MobileModels schema and seed data into MySQL.")
|
|
parser.add_argument("--schema", type=Path, default=Path("sql/mobilemodels_mysql_schema.sql"))
|
|
parser.add_argument("--seed", type=Path, default=Path("dist/mobilemodels_mysql_seed.sql"))
|
|
parser.add_argument("--host", default=os.environ.get("MYSQL_HOST", "mysql"))
|
|
parser.add_argument("--port", type=int, default=int(os.environ.get("MYSQL_PORT", "3306")))
|
|
parser.add_argument("--user", default=os.environ.get("MYSQL_ROOT_USER", "root"))
|
|
parser.add_argument("--password", default=os.environ.get("MYSQL_ROOT_PASSWORD", "mobilemodels_root"))
|
|
parser.add_argument("--database", default=os.environ.get("MYSQL_DATABASE", "mobilemodels"))
|
|
parser.add_argument("--reader-user", default=os.environ.get("MYSQL_READER_USER", ""))
|
|
parser.add_argument("--reader-password", default=os.environ.get("MYSQL_READER_PASSWORD", ""))
|
|
parser.add_argument("--wait-timeout", type=int, default=120)
|
|
parser.add_argument("--check-only", action="store_true", help="Only check MySQL readiness")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
schema_path = args.schema if args.schema.is_absolute() else PROJECT_ROOT / args.schema
|
|
seed_path = args.seed if args.seed.is_absolute() else PROJECT_ROOT / args.seed
|
|
|
|
wait_for_mysql(args.user, args.password, args.host, args.port, args.wait_timeout)
|
|
|
|
if args.check_only:
|
|
print(f"MySQL ready: {args.host}:{args.port}")
|
|
return 0
|
|
|
|
run_sql_file(args.user, args.password, args.host, args.port, schema_path)
|
|
run_sql_file(args.user, args.password, args.host, args.port, seed_path)
|
|
|
|
if args.reader_user and args.reader_password:
|
|
ensure_reader_user(
|
|
args.user,
|
|
args.password,
|
|
args.host,
|
|
args.port,
|
|
args.database,
|
|
args.reader_user,
|
|
args.reader_password,
|
|
)
|
|
|
|
print(f"Loaded schema: {schema_path}")
|
|
print(f"Loaded seed: {seed_path}")
|
|
if args.reader_user:
|
|
print(f"Ensured reader user: {args.reader_user}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|