feat: initial gitea shunt rules generator

This commit is contained in:
袁震
2026-04-06 11:52:50 +08:00
commit b9ac36321a
7 changed files with 696 additions and 0 deletions
+318
View File
@@ -0,0 +1,318 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import json
import os
import sys
from collections import Counter
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import quote
from urllib.request import Request, urlopen
try:
import tomllib
except ModuleNotFoundError: # Python < 3.11
tomllib = None
DEFAULT_LIMIT = 100
UNSUPPORTED_CLASH_TYPES = {
"USER-AGENT",
"URL-REGEX",
"DEST-PORT", # Surge alias, Clash usually uses DST-PORT
}
@dataclass
class Config:
base_url: str
owner: str
repo: str
ref: str
token: str | None
source_root: str
source_filename_pattern: str
output_dir: str
include_categories: list[str]
exclude_categories: list[str]
clash_no_resolve: bool
@dataclass(frozen=True)
class RuleLine:
raw: str
rule_type: str
class GiteaClient:
def __init__(self, base_url: str, token: str | None):
self.base_url = base_url.rstrip("/")
self.token = token
def _request_json(self, path: str, params: dict[str, Any] | None = None) -> Any:
url = f"{self.base_url}{path}"
if params:
query = "&".join(f"{quote(str(k))}={quote(str(v))}" for k, v in params.items())
url = f"{url}?{query}"
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"token {self.token}"
req = Request(url, headers=headers)
with urlopen(req) as resp:
return json.loads(resp.read().decode("utf-8"))
def list_dir(self, owner: str, repo: str, path: str, ref: str) -> list[dict[str, Any]]:
encoded_path = quote(path.strip("/"), safe="/")
endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}"
page = 1
all_items: list[dict[str, Any]] = []
while True:
data = self._request_json(endpoint, {"ref": ref, "page": page, "limit": DEFAULT_LIMIT})
if isinstance(data, dict):
raise RuntimeError(f"Path is not a directory: {path}")
items = list(data)
all_items.extend(items)
if len(items) < DEFAULT_LIMIT:
break
page += 1
return all_items
def read_file(self, owner: str, repo: str, path: str, ref: str) -> str:
encoded_path = quote(path.strip("/"), safe="/")
endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}"
data = self._request_json(endpoint, {"ref": ref})
if not isinstance(data, dict):
raise RuntimeError(f"Path is not a file: {path}")
content = data.get("content")
encoding = data.get("encoding")
if not content:
raise RuntimeError(f"Missing file content for path: {path}")
if encoding != "base64":
raise RuntimeError(f"Unsupported encoding ({encoding}) for path: {path}")
return base64.b64decode(content).decode("utf-8", errors="replace")
def load_config(path: Path) -> Config:
if path.suffix.lower() == ".json":
raw = json.loads(path.read_text(encoding="utf-8"))
else:
if tomllib is None:
raise RuntimeError(
"TOML config requires Python 3.11+. "
"Use Python 3.11+ or provide a JSON config file."
)
raw = tomllib.loads(path.read_text(encoding="utf-8"))
gitea = raw.get("gitea", {})
source = raw.get("source", {})
output = raw.get("output", {})
token_env = gitea.get("token_env", "GITEA_TOKEN")
token = os.getenv(token_env) if token_env else None
return Config(
base_url=gitea["base_url"],
owner=gitea["owner"],
repo=gitea["repo"],
ref=gitea.get("ref", "main"),
token=token,
source_root=source.get("root", "rule/Surge"),
source_filename_pattern=source.get("filename_pattern", "{name}.list"),
output_dir=output.get("dir", "dist"),
include_categories=source.get("include_categories", []),
exclude_categories=source.get("exclude_categories", []),
clash_no_resolve=output.get("clash_no_resolve", False),
)
def parse_rules(content: str) -> list[RuleLine]:
rules: list[RuleLine] = []
seen: set[str] = set()
for original in content.splitlines():
line = original.strip()
if not line or line.startswith("#"):
continue
if line in seen:
continue
seen.add(line)
parts = [part.strip() for part in line.split(",") if part.strip()]
if not parts:
continue
rules.append(RuleLine(raw=",".join(parts), rule_type=parts[0].upper()))
return rules
def to_clash_payload_line(rule: RuleLine, no_resolve: bool) -> str | None:
parts = [p.strip() for p in rule.raw.split(",") if p.strip()]
if not parts:
return None
rule_type = parts[0].upper()
parts[0] = rule_type
if rule_type in UNSUPPORTED_CLASH_TYPES:
return None
if rule_type in {"IP-CIDR", "IP-CIDR6"}:
payload = [rule_type, parts[1]] if len(parts) >= 2 else parts
if no_resolve:
payload.append("no-resolve")
return ",".join(payload)
# Strip no-resolve from non-IP rules if present in source.
filtered = [p for p in parts if p.lower() != "no-resolve"]
return ",".join(filtered)
def format_surge(name: str, rules: list[RuleLine], source_path: str) -> str:
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
count = Counter(rule.rule_type for rule in rules)
header = [
f"# NAME: {name}",
"# AUTHOR: gitea-shunt-rules",
f"# SOURCE: {source_path}",
f"# UPDATED: {now}",
]
for k in sorted(count):
header.append(f"# {k}: {count[k]}")
header.append(f"# TOTAL: {len(rules)}")
body = [rule.raw for rule in rules]
return "\n".join(header + body) + "\n"
def format_clash(name: str, rules: list[RuleLine], source_path: str, no_resolve: bool) -> str:
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
payload: list[str] = []
counter: Counter[str] = Counter()
skipped: Counter[str] = Counter()
for rule in rules:
converted = to_clash_payload_line(rule, no_resolve=no_resolve)
if converted is None:
skipped[rule.rule_type] += 1
continue
payload.append(converted)
counter[rule.rule_type] += 1
lines = [
f"# NAME: {name}",
"# AUTHOR: gitea-shunt-rules",
f"# SOURCE: {source_path}",
f"# UPDATED: {now}",
]
for k in sorted(counter):
lines.append(f"# {k}: {counter[k]}")
for k in sorted(skipped):
lines.append(f"# SKIPPED-{k}: {skipped[k]}")
lines.append(f"# TOTAL: {len(payload)}")
lines.append("payload:")
lines.extend(f" - {item}" for item in payload)
return "\n".join(lines) + "\n"
def should_include_category(name: str, cfg: Config, cli_names: set[str]) -> bool:
if cli_names and name not in cli_names:
return False
if cfg.include_categories and name not in cfg.include_categories:
return False
if name in cfg.exclude_categories:
return False
return True
def find_categories(client: GiteaClient, cfg: Config, cli_names: set[str]) -> list[str]:
entries = client.list_dir(cfg.owner, cfg.repo, cfg.source_root, cfg.ref)
categories: list[str] = []
for entry in entries:
if entry.get("type") != "dir":
continue
name = entry.get("name")
if not name:
continue
if should_include_category(name, cfg, cli_names):
categories.append(name)
return sorted(categories)
def build_one_category(client: GiteaClient, cfg: Config, name: str, base_out: Path) -> tuple[int, int]:
source_rel_path = f"{cfg.source_root}/{name}/{cfg.source_filename_pattern.format(name=name)}"
source_content = client.read_file(cfg.owner, cfg.repo, source_rel_path, cfg.ref)
rules = parse_rules(source_content)
surge_out = base_out / "surge" / f"{name}.list"
clash_out = base_out / "clash" / f"{name}.yaml"
surge_out.parent.mkdir(parents=True, exist_ok=True)
clash_out.parent.mkdir(parents=True, exist_ok=True)
surge_out.write_text(format_surge(name, rules, source_rel_path), encoding="utf-8")
clash_out.write_text(
format_clash(name, rules, source_rel_path, no_resolve=cfg.clash_no_resolve),
encoding="utf-8",
)
return len(rules), sum(1 for r in rules if to_clash_payload_line(r, no_resolve=cfg.clash_no_resolve) is not None)
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Generate Surge/Clash rules from Gitea source repo.")
p.add_argument("--config", default="config.toml", help="Path to config TOML file")
p.add_argument("--names", default="", help="Comma-separated category names, e.g. YouTube,Netflix")
return p.parse_args()
def main() -> int:
args = parse_args()
cfg = load_config(Path(args.config))
names = {x.strip() for x in args.names.split(",") if x.strip()}
client = GiteaClient(cfg.base_url, cfg.token)
categories = find_categories(client, cfg, names)
if not categories:
print("No categories found after filtering.", file=sys.stderr)
return 2
out_dir = Path(cfg.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
total_source = 0
total_clash = 0
print(f"Found {len(categories)} categories under {cfg.source_root}")
for idx, name in enumerate(categories, start=1):
try:
s_cnt, c_cnt = build_one_category(client, cfg, name, out_dir)
total_source += s_cnt
total_clash += c_cnt
print(f"[{idx}/{len(categories)}] {name}: source={s_cnt}, clash={c_cnt}")
except Exception as exc:
print(f"[{idx}/{len(categories)}] {name}: failed: {exc}", file=sys.stderr)
print(f"Done. source_rules={total_source}, clash_rules={total_clash}, output={out_dir.resolve()}")
return 0
if __name__ == "__main__":
raise SystemExit(main())