#!/usr/bin/env python3 from __future__ import annotations import argparse import base64 import json import os import sys from collections import Counter from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import quote from urllib.error import HTTPError from urllib.request import Request, urlopen try: import tomllib except ModuleNotFoundError: # Python < 3.11 tomllib = None DEFAULT_LIMIT = 100 UNSUPPORTED_CLASH_TYPES = { "USER-AGENT", "URL-REGEX", "DEST-PORT", # Surge alias, Clash usually uses DST-PORT } @dataclass class Config: base_url: str owner: str repo: str ref: str token: str | None source_root: str source_filename_pattern: str output_dir: str include_categories: list[str] exclude_categories: list[str] clash_no_resolve: bool @dataclass(frozen=True) class RuleLine: raw: str rule_type: str class GiteaClient: def __init__(self, base_url: str, token: str | None): self.base_url = base_url.rstrip("/") self.token = token def _request_json(self, path: str, params: dict[str, Any] | None = None) -> Any: url = f"{self.base_url}{path}" if params: query = "&".join(f"{quote(str(k))}={quote(str(v))}" for k, v in params.items()) url = f"{url}?{query}" headers = {"Accept": "application/json"} if self.token: headers["Authorization"] = f"token {self.token}" req = Request(url, headers=headers) with urlopen(req) as resp: return json.loads(resp.read().decode("utf-8")) def _request_text_url(self, url: str) -> str: headers = {"Accept": "text/plain"} if self.token: headers["Authorization"] = f"token {self.token}" req = Request(url, headers=headers) with urlopen(req) as resp: return resp.read().decode("utf-8", errors="replace") def list_dir(self, owner: str, repo: str, path: str, ref: str) -> list[dict[str, Any]]: encoded_path = quote(path.strip("/"), safe="/") endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}" page = 1 all_items: list[dict[str, Any]] = [] while True: data = self._request_json(endpoint, {"ref": ref, "page": page, "limit": DEFAULT_LIMIT}) if isinstance(data, dict): raise RuntimeError(f"Path is not a directory: {path}") items = list(data) all_items.extend(items) if len(items) < DEFAULT_LIMIT: break page += 1 return all_items def read_file(self, owner: str, repo: str, path: str, ref: str) -> str: encoded_path = quote(path.strip("/"), safe="/") endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}" try: data = self._request_json(endpoint, {"ref": ref}) except HTTPError as exc: if exc.code != 404: raise # Fallback for environments where Gitea API route is not exposed. raw_url = ( f"{self.base_url}/{quote(owner)}/{quote(repo)}/raw/branch/" f"{quote(ref)}/{encoded_path}" ) return self._request_text_url(raw_url) if not isinstance(data, dict): raise RuntimeError(f"Path is not a file: {path}") content = data.get("content") encoding = data.get("encoding") if not content: raise RuntimeError(f"Missing file content for path: {path}") if encoding != "base64": raise RuntimeError(f"Unsupported encoding ({encoding}) for path: {path}") return base64.b64decode(content).decode("utf-8", errors="replace") def load_config(path: Path) -> Config: if path.suffix.lower() == ".json": raw = json.loads(path.read_text(encoding="utf-8")) else: if tomllib is None: raise RuntimeError( "TOML config requires Python 3.11+. " "Use Python 3.11+ or provide a JSON config file." ) raw = tomllib.loads(path.read_text(encoding="utf-8")) gitea = raw.get("gitea", {}) source = raw.get("source", {}) output = raw.get("output", {}) token_env = gitea.get("token_env", "GITEA_TOKEN") token = (os.getenv(token_env) or None) if token_env else None return Config( base_url=gitea["base_url"], owner=gitea["owner"], repo=gitea["repo"], ref=gitea.get("ref", "main"), token=token, source_root=source.get("root", "rule/Surge"), source_filename_pattern=source.get("filename_pattern", "{name}.list"), output_dir=output.get("dir", "dist"), include_categories=source.get("include_categories", []), exclude_categories=source.get("exclude_categories", []), clash_no_resolve=output.get("clash_no_resolve", False), ) def parse_rules(content: str) -> list[RuleLine]: rules: list[RuleLine] = [] seen: set[str] = set() for original in content.splitlines(): line = original.strip() if not line or line.startswith("#"): continue if line in seen: continue seen.add(line) parts = [part.strip() for part in line.split(",") if part.strip()] if not parts: continue rules.append(RuleLine(raw=",".join(parts), rule_type=parts[0].upper())) return rules def to_clash_payload_line(rule: RuleLine, no_resolve: bool) -> str | None: parts = [p.strip() for p in rule.raw.split(",") if p.strip()] if not parts: return None rule_type = parts[0].upper() parts[0] = rule_type if rule_type in UNSUPPORTED_CLASH_TYPES: return None if rule_type in {"IP-CIDR", "IP-CIDR6"}: payload = [rule_type, parts[1]] if len(parts) >= 2 else parts if no_resolve: payload.append("no-resolve") return ",".join(payload) # Strip no-resolve from non-IP rules if present in source. filtered = [p for p in parts if p.lower() != "no-resolve"] return ",".join(filtered) def format_surge(name: str, rules: list[RuleLine], source_path: str) -> str: now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") count = Counter(rule.rule_type for rule in rules) header = [ f"# NAME: {name}", "# AUTHOR: gitea-shunt-rules", f"# SOURCE: {source_path}", f"# UPDATED: {now}", ] for k in sorted(count): header.append(f"# {k}: {count[k]}") header.append(f"# TOTAL: {len(rules)}") body = [rule.raw for rule in rules] return "\n".join(header + body) + "\n" def format_clash(name: str, rules: list[RuleLine], source_path: str, no_resolve: bool) -> str: now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") payload: list[str] = [] counter: Counter[str] = Counter() skipped: Counter[str] = Counter() for rule in rules: converted = to_clash_payload_line(rule, no_resolve=no_resolve) if converted is None: skipped[rule.rule_type] += 1 continue payload.append(converted) counter[rule.rule_type] += 1 lines = [ f"# NAME: {name}", "# AUTHOR: gitea-shunt-rules", f"# SOURCE: {source_path}", f"# UPDATED: {now}", ] for k in sorted(counter): lines.append(f"# {k}: {counter[k]}") for k in sorted(skipped): lines.append(f"# SKIPPED-{k}: {skipped[k]}") lines.append(f"# TOTAL: {len(payload)}") lines.append("payload:") lines.extend(f" - {item}" for item in payload) return "\n".join(lines) + "\n" def should_include_category(name: str, cfg: Config, cli_names: set[str]) -> bool: if cli_names and name not in cli_names: return False if cfg.include_categories and name not in cfg.include_categories: return False if name in cfg.exclude_categories: return False return True def find_categories(client: GiteaClient, cfg: Config, cli_names: set[str]) -> list[str]: if cfg.include_categories: return sorted([n for n in cfg.include_categories if should_include_category(n, cfg, cli_names)]) entries = client.list_dir(cfg.owner, cfg.repo, cfg.source_root, cfg.ref) categories: list[str] = [] for entry in entries: entry_type = entry.get("type") name = entry.get("name") or "" # Support nested layout: rule/Surge//.list if entry_type == "dir": if should_include_category(name, cfg, cli_names): categories.append(name) continue # Support flat layout: rule/Surge/.list if entry_type == "file" and name.endswith(".list"): cat = name[: -len(".list")] if cat and should_include_category(cat, cfg, cli_names): categories.append(cat) return sorted(categories) def build_one_category(client: GiteaClient, cfg: Config, name: str, base_out: Path) -> tuple[int, int]: nested_path = f"{cfg.source_root}/{name}/{cfg.source_filename_pattern.format(name=name)}" flat_path = f"{cfg.source_root}/{cfg.source_filename_pattern.format(name=name)}" source_rel_path = nested_path try: source_content = client.read_file(cfg.owner, cfg.repo, nested_path, cfg.ref) except Exception: source_rel_path = flat_path source_content = client.read_file(cfg.owner, cfg.repo, flat_path, cfg.ref) rules = parse_rules(source_content) surge_out = base_out / "surge" / f"{name}.list" clash_out = base_out / "clash" / f"{name}.yaml" surge_out.parent.mkdir(parents=True, exist_ok=True) clash_out.parent.mkdir(parents=True, exist_ok=True) surge_out.write_text(format_surge(name, rules, source_rel_path), encoding="utf-8") clash_out.write_text( format_clash(name, rules, source_rel_path, no_resolve=cfg.clash_no_resolve), encoding="utf-8", ) return len(rules), sum(1 for r in rules if to_clash_payload_line(r, no_resolve=cfg.clash_no_resolve) is not None) def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Generate Surge/Clash rules from Gitea source repo.") p.add_argument("--config", default="config.toml", help="Path to config TOML file") p.add_argument("--names", default="", help="Comma-separated category names, e.g. YouTube,Netflix") return p.parse_args() def main() -> int: args = parse_args() cfg = load_config(Path(args.config)) names = {x.strip() for x in args.names.split(",") if x.strip()} client = GiteaClient(cfg.base_url, cfg.token) categories = find_categories(client, cfg, names) if not categories: print("No categories found after filtering.", file=sys.stderr) return 2 out_dir = Path(cfg.output_dir) out_dir.mkdir(parents=True, exist_ok=True) total_source = 0 total_clash = 0 print(f"Found {len(categories)} categories under {cfg.source_root}") for idx, name in enumerate(categories, start=1): try: s_cnt, c_cnt = build_one_category(client, cfg, name, out_dir) total_source += s_cnt total_clash += c_cnt print(f"[{idx}/{len(categories)}] {name}: source={s_cnt}, clash={c_cnt}") except Exception as exc: print(f"[{idx}/{len(categories)}] {name}: failed: {exc}", file=sys.stderr) print(f"Done. source_rules={total_source}, clash_rules={total_clash}, output={out_dir.resolve()}") return 0 if __name__ == "__main__": raise SystemExit(main())