Files
shunt-rules/main.py
T

375 lines
12 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import json
import os
import sys
from collections import Counter
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import quote
from urllib.error import HTTPError
from urllib.request import Request, urlopen
try:
import tomllib
except ModuleNotFoundError: # Python < 3.11
tomllib = None
DEFAULT_LIMIT = 100
UNSUPPORTED_CLASH_TYPES = {
"USER-AGENT",
"URL-REGEX",
"DEST-PORT", # Surge alias, Clash usually uses DST-PORT
}
@dataclass
class Config:
base_url: str
owner: str
repo: str
ref: str
token: str | None
source_root: str
source_filename_pattern: str
output_dir: str
include_categories: list[str]
exclude_categories: list[str]
clash_no_resolve: bool
@dataclass(frozen=True)
class RuleLine:
raw: str
rule_type: str
class GiteaClient:
def __init__(self, base_url: str, token: str | None):
self.base_url = base_url.rstrip("/")
self.token = token
def _request_json(self, path: str, params: dict[str, Any] | None = None) -> Any:
url = f"{self.base_url}{path}"
if params:
query = "&".join(f"{quote(str(k))}={quote(str(v))}" for k, v in params.items())
url = f"{url}?{query}"
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"token {self.token}"
req = Request(url, headers=headers)
with urlopen(req) as resp:
return json.loads(resp.read().decode("utf-8"))
def _request_text_url(self, url: str) -> str:
headers = {"Accept": "text/plain"}
if self.token:
headers["Authorization"] = f"token {self.token}"
req = Request(url, headers=headers)
with urlopen(req) as resp:
return resp.read().decode("utf-8", errors="replace")
def list_dir(self, owner: str, repo: str, path: str, ref: str) -> list[dict[str, Any]]:
encoded_path = quote(path.strip("/"), safe="/")
endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}"
page = 1
all_items: list[dict[str, Any]] = []
while True:
data = self._request_json(endpoint, {"ref": ref, "page": page, "limit": DEFAULT_LIMIT})
if isinstance(data, dict):
raise RuntimeError(f"Path is not a directory: {path}")
items = list(data)
all_items.extend(items)
if len(items) < DEFAULT_LIMIT:
break
page += 1
return all_items
def read_file(self, owner: str, repo: str, path: str, ref: str) -> str:
encoded_path = quote(path.strip("/"), safe="/")
endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}"
try:
data = self._request_json(endpoint, {"ref": ref})
except HTTPError as exc:
if exc.code != 404:
raise
# Fallback for environments where Gitea API route is not exposed.
raw_url = (
f"{self.base_url}/{quote(owner)}/{quote(repo)}/raw/branch/"
f"{quote(ref)}/{encoded_path}"
)
return self._request_text_url(raw_url)
if not isinstance(data, dict):
raise RuntimeError(f"Path is not a file: {path}")
content = data.get("content")
encoding = data.get("encoding")
if not content:
raise RuntimeError(f"Missing file content for path: {path}")
if encoding != "base64":
raise RuntimeError(f"Unsupported encoding ({encoding}) for path: {path}")
return base64.b64decode(content).decode("utf-8", errors="replace")
def load_config(path: Path) -> Config:
if path.suffix.lower() == ".json":
raw = json.loads(path.read_text(encoding="utf-8"))
else:
if tomllib is None:
raise RuntimeError(
"TOML config requires Python 3.11+. "
"Use Python 3.11+ or provide a JSON config file."
)
raw = tomllib.loads(path.read_text(encoding="utf-8"))
gitea = raw.get("gitea", {})
source = raw.get("source", {})
output = raw.get("output", {})
token_env = gitea.get("token_env", "GITEA_TOKEN")
token = (os.getenv(token_env) or None) if token_env else None
return Config(
base_url=gitea["base_url"],
owner=gitea["owner"],
repo=gitea["repo"],
ref=gitea.get("ref", "main"),
token=token,
source_root=source.get("root", "rule/Surge"),
source_filename_pattern=source.get("filename_pattern", "{name}.list"),
output_dir=output.get("dir", "dist"),
include_categories=source.get("include_categories", []),
exclude_categories=source.get("exclude_categories", []),
clash_no_resolve=output.get("clash_no_resolve", False),
)
def parse_rules(content: str) -> list[RuleLine]:
rules: list[RuleLine] = []
seen: set[str] = set()
for original in content.splitlines():
line = original.strip()
if not line or line.startswith("#"):
continue
if line in seen:
continue
seen.add(line)
parts = [part.strip() for part in line.split(",") if part.strip()]
if not parts:
continue
rules.append(RuleLine(raw=",".join(parts), rule_type=parts[0].upper()))
return rules
def to_clash_payload_line(rule: RuleLine, no_resolve: bool) -> str | None:
parts = [p.strip() for p in rule.raw.split(",") if p.strip()]
if not parts:
return None
rule_type = parts[0].upper()
parts[0] = rule_type
if rule_type in UNSUPPORTED_CLASH_TYPES:
return None
if rule_type in {"IP-CIDR", "IP-CIDR6"}:
payload = [rule_type, parts[1]] if len(parts) >= 2 else parts
if no_resolve:
payload.append("no-resolve")
return ",".join(payload)
# Strip no-resolve from non-IP rules if present in source.
filtered = [p for p in parts if p.lower() != "no-resolve"]
return ",".join(filtered)
def format_surge(name: str, rules: list[RuleLine], source_path: str) -> str:
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
count = Counter(rule.rule_type for rule in rules)
header = [
f"# NAME: {name}",
"# AUTHOR: gitea-shunt-rules",
f"# SOURCE: {source_path}",
f"# UPDATED: {now}",
]
for k in sorted(count):
header.append(f"# {k}: {count[k]}")
header.append(f"# TOTAL: {len(rules)}")
body = [rule.raw for rule in rules]
return "\n".join(header + body) + "\n"
def format_clash(name: str, rules: list[RuleLine], source_path: str, no_resolve: bool) -> str:
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
payload: list[str] = []
counter: Counter[str] = Counter()
skipped: Counter[str] = Counter()
for rule in rules:
converted = to_clash_payload_line(rule, no_resolve=no_resolve)
if converted is None:
skipped[rule.rule_type] += 1
continue
payload.append(converted)
counter[rule.rule_type] += 1
lines = [
f"# NAME: {name}",
"# AUTHOR: gitea-shunt-rules",
f"# SOURCE: {source_path}",
f"# UPDATED: {now}",
]
for k in sorted(counter):
lines.append(f"# {k}: {counter[k]}")
for k in sorted(skipped):
lines.append(f"# SKIPPED-{k}: {skipped[k]}")
lines.append(f"# TOTAL: {len(payload)}")
lines.append("payload:")
lines.extend(f" - {item}" for item in payload)
return "\n".join(lines) + "\n"
def should_include_category(name: str, cfg: Config, cli_names: set[str]) -> bool:
if cli_names and name not in cli_names:
return False
if cfg.include_categories and name not in cfg.include_categories:
return False
if name in cfg.exclude_categories:
return False
return True
def find_categories(client: GiteaClient, cfg: Config, cli_names: set[str]) -> list[str]:
if cfg.include_categories:
return sorted([n for n in cfg.include_categories if should_include_category(n, cfg, cli_names)])
entries = client.list_dir(cfg.owner, cfg.repo, cfg.source_root, cfg.ref)
categories: list[str] = []
for entry in entries:
entry_type = entry.get("type")
name = entry.get("name") or ""
# Support nested layout: rule/Surge/<Name>/<Name>.list
if entry_type == "dir":
if should_include_category(name, cfg, cli_names):
categories.append(name)
continue
# Support flat layout: rule/Surge/<Name>.list
if entry_type == "file" and name.endswith(".list"):
cat = name[: -len(".list")]
if cat and should_include_category(cat, cfg, cli_names):
categories.append(cat)
return sorted(categories)
def build_one_category(client: GiteaClient, cfg: Config, name: str, base_out: Path) -> tuple[int, int]:
filename_base = cfg.source_filename_pattern.format(name=name)
candidate_filenames = [
filename_base,
f"{name}_All.list",
f"{name}_Domain.list",
f"{name}_Resolve.list",
]
candidate_paths: list[str] = []
for fn in candidate_filenames:
candidate_paths.append(f"{cfg.source_root}/{name}/{fn}") # nested
candidate_paths.append(f"{cfg.source_root}/{fn}") # flat
source_rel_path = ""
source_content = ""
last_error: Exception | None = None
for path in candidate_paths:
try:
source_content = client.read_file(cfg.owner, cfg.repo, path, cfg.ref)
source_rel_path = path
break
except Exception as exc:
last_error = exc
if not source_rel_path:
if last_error is not None:
raise last_error
raise RuntimeError(f"unable to locate source list for category: {name}")
rules = parse_rules(source_content)
surge_out = base_out / "surge" / f"{name}.list"
clash_out = base_out / "clash" / f"{name}.yaml"
surge_out.parent.mkdir(parents=True, exist_ok=True)
clash_out.parent.mkdir(parents=True, exist_ok=True)
surge_out.write_text(format_surge(name, rules, source_rel_path), encoding="utf-8")
clash_out.write_text(
format_clash(name, rules, source_rel_path, no_resolve=cfg.clash_no_resolve),
encoding="utf-8",
)
return len(rules), sum(1 for r in rules if to_clash_payload_line(r, no_resolve=cfg.clash_no_resolve) is not None)
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Generate Surge/Clash rules from Gitea source repo.")
p.add_argument("--config", default="config.toml", help="Path to config TOML file")
p.add_argument("--names", default="", help="Comma-separated category names, e.g. YouTube,Netflix")
return p.parse_args()
def main() -> int:
args = parse_args()
cfg = load_config(Path(args.config))
names = {x.strip() for x in args.names.split(",") if x.strip()}
client = GiteaClient(cfg.base_url, cfg.token)
categories = find_categories(client, cfg, names)
if not categories:
print("No categories found after filtering.", file=sys.stderr)
return 2
out_dir = Path(cfg.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
total_source = 0
total_clash = 0
print(f"Found {len(categories)} categories under {cfg.source_root}")
for idx, name in enumerate(categories, start=1):
try:
s_cnt, c_cnt = build_one_category(client, cfg, name, out_dir)
total_source += s_cnt
total_clash += c_cnt
print(f"[{idx}/{len(categories)}] {name}: source={s_cnt}, clash={c_cnt}")
except Exception as exc:
print(f"[{idx}/{len(categories)}] {name}: failed: {exc}", file=sys.stderr)
print(f"Done. source_rules={total_source}, clash_rules={total_clash}, output={out_dir.resolve()}")
return 0
if __name__ == "__main__":
raise SystemExit(main())