Files
shunt-rules/main.py
T

525 lines
17 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import json
import os
import sys
from collections import Counter
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import quote
from urllib.error import HTTPError
from urllib.request import Request, urlopen
try:
import tomllib
except ModuleNotFoundError: # Python < 3.11
tomllib = None
DEFAULT_LIMIT = 100
UNSUPPORTED_CLASH_TYPES = {
"USER-AGENT",
"URL-REGEX",
}
UNSUPPORTED_MIHOMO_TYPES = {
"USER-AGENT",
"URL-REGEX",
}
@dataclass
class Config:
base_url: str
owner: str
repo: str
ref: str
token: str | None
source_mode: str
local_source_root: str
source_root: str
source_filename_pattern: str
output_dir: str
include_categories: list[str]
exclude_categories: list[str]
clash_no_resolve: bool
mihomo_no_resolve: bool
@dataclass(frozen=True)
class RuleLine:
raw: str
rule_type: str
class GiteaClient:
def __init__(self, base_url: str, token: str | None):
self.base_url = base_url.rstrip("/")
self.token = token
def _request_json(self, path: str, params: dict[str, Any] | None = None) -> Any:
url = f"{self.base_url}{path}"
if params:
query = "&".join(f"{quote(str(k))}={quote(str(v))}" for k, v in params.items())
url = f"{url}?{query}"
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"token {self.token}"
req = Request(url, headers=headers)
with urlopen(req) as resp:
return json.loads(resp.read().decode("utf-8"))
def _request_text_url(self, url: str) -> str:
headers = {"Accept": "text/plain"}
if self.token:
headers["Authorization"] = f"token {self.token}"
req = Request(url, headers=headers)
with urlopen(req) as resp:
return resp.read().decode("utf-8", errors="replace")
def list_dir(self, owner: str, repo: str, path: str, ref: str) -> list[dict[str, Any]]:
encoded_path = quote(path.strip("/"), safe="/")
endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}"
page = 1
all_items: list[dict[str, Any]] = []
while True:
data = self._request_json(endpoint, {"ref": ref, "page": page, "limit": DEFAULT_LIMIT})
if isinstance(data, dict):
raise RuntimeError(f"Path is not a directory: {path}")
items = list(data)
all_items.extend(items)
if len(items) < DEFAULT_LIMIT:
break
page += 1
return all_items
def read_file(self, owner: str, repo: str, path: str, ref: str) -> str:
encoded_path = quote(path.strip("/"), safe="/")
endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}"
try:
data = self._request_json(endpoint, {"ref": ref})
except HTTPError as exc:
if exc.code != 404:
raise
# Fallback for environments where Gitea API route is not exposed.
raw_url = (
f"{self.base_url}/{quote(owner)}/{quote(repo)}/raw/branch/"
f"{quote(ref)}/{encoded_path}"
)
return self._request_text_url(raw_url)
if not isinstance(data, dict):
raise RuntimeError(f"Path is not a file: {path}")
content = data.get("content")
encoding = data.get("encoding")
if not content:
raise RuntimeError(f"Missing file content for path: {path}")
if encoding != "base64":
raise RuntimeError(f"Unsupported encoding ({encoding}) for path: {path}")
return base64.b64decode(content).decode("utf-8", errors="replace")
def load_config(path: Path) -> Config:
if path.suffix.lower() == ".json":
raw = json.loads(path.read_text(encoding="utf-8"))
else:
if tomllib is None:
raise RuntimeError(
"TOML config requires Python 3.11+. "
"Use Python 3.11+ or provide a JSON config file."
)
raw = tomllib.loads(path.read_text(encoding="utf-8"))
gitea = raw.get("gitea", {})
source = raw.get("source", {})
output = raw.get("output", {})
token_env = gitea.get("token_env", "GITEA_TOKEN")
token = (os.getenv(token_env) or None) if token_env else None
return Config(
base_url=gitea["base_url"],
owner=gitea["owner"],
repo=gitea["repo"],
ref=gitea.get("ref", "main"),
token=token,
source_mode=source.get("mode", "gitea"),
local_source_root=source.get("local_root", "."),
source_root=source.get("root", "rule/Surge"),
source_filename_pattern=source.get("filename_pattern", "{name}.list"),
output_dir=output.get("dir", "dist"),
include_categories=source.get("include_categories", []),
exclude_categories=source.get("exclude_categories", []),
clash_no_resolve=output.get("clash_no_resolve", False),
mihomo_no_resolve=output.get("mihomo_no_resolve", False),
)
def parse_rules(content: str) -> list[RuleLine]:
rules: list[RuleLine] = []
seen: set[str] = set()
for original in content.splitlines():
line = original.strip()
if not line or line.startswith("#"):
continue
if line in seen:
continue
seen.add(line)
# Domain-only files (e.g. *_Domain.list) may contain plain host suffixes
# without a rule prefix. Normalize them to DOMAIN-SUFFIX.
if "," not in line:
domain = line.lstrip(".").strip()
if not domain:
continue
normalized = f"DOMAIN-SUFFIX,{domain}"
if normalized in seen:
continue
seen.add(normalized)
rules.append(RuleLine(raw=normalized, rule_type="DOMAIN-SUFFIX"))
continue
parts = [part.strip() for part in line.split(",") if part.strip()]
if not parts:
continue
rules.append(RuleLine(raw=",".join(parts), rule_type=parts[0].upper()))
return rules
def to_payload_line(
rule: RuleLine,
no_resolve: bool,
unsupported_types: set[str],
type_mapping: dict[str, str] | None = None,
) -> str | None:
parts = [p.strip() for p in rule.raw.split(",") if p.strip()]
if not parts:
return None
rule_type = parts[0].upper()
mapped_type = (type_mapping or {}).get(rule_type, rule_type)
parts[0] = mapped_type
if rule_type in unsupported_types:
return None
if mapped_type in {"IP-CIDR", "IP-CIDR6"}:
payload = [mapped_type, parts[1]] if len(parts) >= 2 else parts
if no_resolve:
payload.append("no-resolve")
return ",".join(payload)
# Strip no-resolve from non-IP rules if present in source.
filtered = [p for p in parts if p.lower() != "no-resolve"]
return ",".join(filtered)
def format_surge(name: str, rules: list[RuleLine], source_path: str) -> str:
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
count = Counter(rule.rule_type for rule in rules)
header = [
f"# NAME: {name}",
"# AUTHOR: gitea-shunt-rules",
f"# SOURCE: {source_path}",
f"# UPDATED: {now}",
]
for k in sorted(count):
header.append(f"# {k}: {count[k]}")
header.append(f"# TOTAL: {len(rules)}")
body = [rule.raw for rule in rules]
return "\n".join(header + body) + "\n"
def format_loon(name: str, rules: list[RuleLine], source_path: str) -> str:
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
count = Counter(rule.rule_type for rule in rules)
header = [
f"# NAME: {name}",
"# AUTHOR: gitea-shunt-rules",
f"# SOURCE: {source_path}",
f"# UPDATED: {now}",
]
for k in sorted(count):
header.append(f"# {k}: {count[k]}")
header.append(f"# TOTAL: {len(rules)}")
body = [rule.raw for rule in rules]
return "\n".join(header + body) + "\n"
def format_yaml_payload(
name: str,
rules: list[RuleLine],
source_path: str,
no_resolve: bool,
unsupported_types: set[str],
type_mapping: dict[str, str] | None = None,
author_name: str = "gitea-shunt-rules",
) -> tuple[str, int]:
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
payload: list[str] = []
counter: Counter[str] = Counter()
skipped: Counter[str] = Counter()
for rule in rules:
converted = to_payload_line(
rule,
no_resolve=no_resolve,
unsupported_types=unsupported_types,
type_mapping=type_mapping,
)
if converted is None:
skipped[rule.rule_type] += 1
continue
payload.append(converted)
counter[rule.rule_type] += 1
lines = [
f"# NAME: {name}",
f"# AUTHOR: {author_name}",
f"# SOURCE: {source_path}",
f"# UPDATED: {now}",
]
for k in sorted(counter):
lines.append(f"# {k}: {counter[k]}")
for k in sorted(skipped):
lines.append(f"# SKIPPED-{k}: {skipped[k]}")
lines.append(f"# TOTAL: {len(payload)}")
lines.append("payload:")
lines.extend(f" - {item}" for item in payload)
return "\n".join(lines) + "\n", len(payload)
def should_include_category(name: str, cfg: Config, cli_names: set[str]) -> bool:
if cli_names and name not in cli_names:
return False
if cfg.include_categories and name not in cfg.include_categories:
return False
if name in cfg.exclude_categories:
return False
return True
def local_abs_path(cfg: Config, relative_path: str) -> Path:
return Path(cfg.local_source_root).expanduser().resolve() / relative_path
def list_dir_source(client: GiteaClient, cfg: Config, path: str) -> list[dict[str, Any]]:
if cfg.source_mode == "local":
base = local_abs_path(cfg, path)
if not base.is_dir():
raise RuntimeError(f"Local source path is not a directory: {base}")
entries: list[dict[str, Any]] = []
for p in base.iterdir():
entry_type = "dir" if p.is_dir() else "file"
entries.append({"name": p.name, "type": entry_type})
return entries
return client.list_dir(cfg.owner, cfg.repo, path, cfg.ref)
def read_source_file(client: GiteaClient, cfg: Config, path: str) -> str:
if cfg.source_mode == "local":
local_path = local_abs_path(cfg, path)
if not local_path.is_file():
raise FileNotFoundError(str(local_path))
return local_path.read_text(encoding="utf-8", errors="replace")
return client.read_file(cfg.owner, cfg.repo, path, cfg.ref)
def find_categories(client: GiteaClient, cfg: Config, cli_names: set[str]) -> list[str]:
if cfg.include_categories:
return sorted([n for n in cfg.include_categories if should_include_category(n, cfg, cli_names)])
entries = list_dir_source(client, cfg, cfg.source_root)
categories: list[str] = []
for entry in entries:
entry_type = entry.get("type")
name = entry.get("name") or ""
# Support nested layout: rule/Surge/<Name>/<Name>.list
if entry_type == "dir":
if should_include_category(name, cfg, cli_names):
categories.append(name)
continue
# Support flat layout: rule/Surge/<Name>.list
if entry_type == "file" and name.endswith(".list"):
cat = name[: -len(".list")]
if cat and should_include_category(cat, cfg, cli_names):
categories.append(cat)
return sorted(categories)
def read_file_optional(client: GiteaClient, cfg: Config, candidate_paths: list[str]) -> tuple[str | None, str | None]:
for path in candidate_paths:
try:
return path, read_source_file(client, cfg, path)
except Exception:
continue
return None, None
def build_one_category(client: GiteaClient, cfg: Config, name: str, base_out: Path) -> tuple[int, int, int, int]:
filename_base = cfg.source_filename_pattern.format(name=name)
# Preferred merge model:
# 1) <Name>.list (keyword/ua/ip with no-resolve)
# 2) <Name>_Domain.list (domain rules)
# 3) <Name>_Resolve.list (keyword/ua/ip without no-resolve)
# Merge then dedupe.
merge_filenames = [
filename_base,
f"{name}_Domain.list",
f"{name}_Resolve.list",
]
merged_chunks: list[str] = []
merged_sources: list[str] = []
for fn in merge_filenames:
nested = f"{cfg.source_root}/{name}/{fn}"
flat = f"{cfg.source_root}/{fn}"
src_path, src_content = read_file_optional(client, cfg, [nested, flat])
if src_path and src_content is not None:
merged_sources.append(src_path)
merged_chunks.append(src_content)
if merged_chunks:
source_rel_path = " + ".join(merged_sources)
rules = parse_rules("\n".join(merged_chunks))
else:
# Fallback for categories that only provide *_All.list or other variants.
fallback_filenames = [
f"{name}_All.list",
f"{name}_Domain.list",
f"{name}_Resolve.list",
filename_base,
]
source_rel_path = ""
source_content = ""
for fn in fallback_filenames:
nested = f"{cfg.source_root}/{name}/{fn}"
flat = f"{cfg.source_root}/{fn}"
src_path, src_content = read_file_optional(client, cfg, [nested, flat])
if src_path and src_content is not None:
source_rel_path = src_path
source_content = src_content
break
if not source_rel_path:
raise RuntimeError(f"unable to locate source list for category: {name}")
rules = parse_rules(source_content)
surge_out = base_out / "surge" / f"{name}.list"
loon_out = base_out / "loon" / f"{name}.list"
clash_out = base_out / "clash" / f"{name}.yaml"
mihomo_out = base_out / "mihomo" / f"{name}.yaml"
surge_out.parent.mkdir(parents=True, exist_ok=True)
loon_out.parent.mkdir(parents=True, exist_ok=True)
clash_out.parent.mkdir(parents=True, exist_ok=True)
mihomo_out.parent.mkdir(parents=True, exist_ok=True)
surge_out.write_text(format_surge(name, rules, source_rel_path), encoding="utf-8")
loon_out.write_text(format_loon(name, rules, source_rel_path), encoding="utf-8")
clash_text, clash_cnt = format_yaml_payload(
name,
rules,
source_rel_path,
no_resolve=cfg.clash_no_resolve,
unsupported_types=UNSUPPORTED_CLASH_TYPES,
type_mapping={"DEST-PORT": "DST-PORT"},
author_name="gitea-shunt-rules",
)
clash_out.write_text(clash_text, encoding="utf-8")
mihomo_text, mihomo_cnt = format_yaml_payload(
name,
rules,
source_rel_path,
no_resolve=cfg.mihomo_no_resolve,
unsupported_types=UNSUPPORTED_MIHOMO_TYPES,
type_mapping={"DEST-PORT": "DST-PORT"},
author_name="gitea-shunt-rules-mihomo",
)
mihomo_out.write_text(
mihomo_text,
encoding="utf-8",
)
# source and loon keep the same parsed set
return len(rules), len(rules), clash_cnt, mihomo_cnt
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Generate Surge/Loon/Clash/Mihomo rules from Gitea source repo.")
p.add_argument("--config", default="config.toml", help="Path to config TOML file")
p.add_argument("--names", default="", help="Comma-separated category names, e.g. YouTube,Netflix")
return p.parse_args()
def main() -> int:
args = parse_args()
cfg = load_config(Path(args.config))
names = {x.strip() for x in args.names.split(",") if x.strip()}
client = GiteaClient(cfg.base_url, cfg.token)
categories = find_categories(client, cfg, names)
if not categories:
print("No categories found after filtering.", file=sys.stderr)
return 2
out_dir = Path(cfg.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
total_source = 0
total_loon = 0
total_clash = 0
total_mihomo = 0
print(f"Found {len(categories)} categories under {cfg.source_root}")
for idx, name in enumerate(categories, start=1):
try:
s_cnt, l_cnt, c_cnt, m_cnt = build_one_category(client, cfg, name, out_dir)
total_source += s_cnt
total_loon += l_cnt
total_clash += c_cnt
total_mihomo += m_cnt
print(f"[{idx}/{len(categories)}] {name}: source={s_cnt}, loon={l_cnt}, clash={c_cnt}, mihomo={m_cnt}")
except Exception as exc:
print(f"[{idx}/{len(categories)}] {name}: failed: {exc}", file=sys.stderr)
print(
"Done. "
f"source_rules={total_source}, "
f"loon_rules={total_loon}, "
f"clash_rules={total_clash}, "
f"mihomo_rules={total_mihomo}, "
f"output={out_dir.resolve()}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())