455 lines
14 KiB
Python
455 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import os
|
|
import sys
|
|
from collections import Counter
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import quote
|
|
from urllib.error import HTTPError
|
|
from urllib.request import Request, urlopen
|
|
try:
|
|
import tomllib
|
|
except ModuleNotFoundError: # Python < 3.11
|
|
tomllib = None
|
|
|
|
|
|
DEFAULT_LIMIT = 100
|
|
UNSUPPORTED_CLASH_TYPES = {
|
|
"USER-AGENT",
|
|
"URL-REGEX",
|
|
}
|
|
UNSUPPORTED_MIHOMO_TYPES = {
|
|
"USER-AGENT",
|
|
"URL-REGEX",
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class Config:
|
|
base_url: str
|
|
owner: str
|
|
repo: str
|
|
ref: str
|
|
token: str | None
|
|
source_root: str
|
|
source_filename_pattern: str
|
|
output_dir: str
|
|
include_categories: list[str]
|
|
exclude_categories: list[str]
|
|
clash_no_resolve: bool
|
|
mihomo_no_resolve: bool
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RuleLine:
|
|
raw: str
|
|
rule_type: str
|
|
|
|
|
|
class GiteaClient:
|
|
def __init__(self, base_url: str, token: str | None):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.token = token
|
|
|
|
def _request_json(self, path: str, params: dict[str, Any] | None = None) -> Any:
|
|
url = f"{self.base_url}{path}"
|
|
if params:
|
|
query = "&".join(f"{quote(str(k))}={quote(str(v))}" for k, v in params.items())
|
|
url = f"{url}?{query}"
|
|
|
|
headers = {"Accept": "application/json"}
|
|
if self.token:
|
|
headers["Authorization"] = f"token {self.token}"
|
|
|
|
req = Request(url, headers=headers)
|
|
with urlopen(req) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
def _request_text_url(self, url: str) -> str:
|
|
headers = {"Accept": "text/plain"}
|
|
if self.token:
|
|
headers["Authorization"] = f"token {self.token}"
|
|
req = Request(url, headers=headers)
|
|
with urlopen(req) as resp:
|
|
return resp.read().decode("utf-8", errors="replace")
|
|
|
|
def list_dir(self, owner: str, repo: str, path: str, ref: str) -> list[dict[str, Any]]:
|
|
encoded_path = quote(path.strip("/"), safe="/")
|
|
endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}"
|
|
|
|
page = 1
|
|
all_items: list[dict[str, Any]] = []
|
|
while True:
|
|
data = self._request_json(endpoint, {"ref": ref, "page": page, "limit": DEFAULT_LIMIT})
|
|
if isinstance(data, dict):
|
|
raise RuntimeError(f"Path is not a directory: {path}")
|
|
items = list(data)
|
|
all_items.extend(items)
|
|
if len(items) < DEFAULT_LIMIT:
|
|
break
|
|
page += 1
|
|
|
|
return all_items
|
|
|
|
def read_file(self, owner: str, repo: str, path: str, ref: str) -> str:
|
|
encoded_path = quote(path.strip("/"), safe="/")
|
|
endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}"
|
|
try:
|
|
data = self._request_json(endpoint, {"ref": ref})
|
|
except HTTPError as exc:
|
|
if exc.code != 404:
|
|
raise
|
|
# Fallback for environments where Gitea API route is not exposed.
|
|
raw_url = (
|
|
f"{self.base_url}/{quote(owner)}/{quote(repo)}/raw/branch/"
|
|
f"{quote(ref)}/{encoded_path}"
|
|
)
|
|
return self._request_text_url(raw_url)
|
|
|
|
if not isinstance(data, dict):
|
|
raise RuntimeError(f"Path is not a file: {path}")
|
|
|
|
content = data.get("content")
|
|
encoding = data.get("encoding")
|
|
if not content:
|
|
raise RuntimeError(f"Missing file content for path: {path}")
|
|
if encoding != "base64":
|
|
raise RuntimeError(f"Unsupported encoding ({encoding}) for path: {path}")
|
|
|
|
return base64.b64decode(content).decode("utf-8", errors="replace")
|
|
|
|
|
|
def load_config(path: Path) -> Config:
|
|
if path.suffix.lower() == ".json":
|
|
raw = json.loads(path.read_text(encoding="utf-8"))
|
|
else:
|
|
if tomllib is None:
|
|
raise RuntimeError(
|
|
"TOML config requires Python 3.11+. "
|
|
"Use Python 3.11+ or provide a JSON config file."
|
|
)
|
|
raw = tomllib.loads(path.read_text(encoding="utf-8"))
|
|
|
|
gitea = raw.get("gitea", {})
|
|
source = raw.get("source", {})
|
|
output = raw.get("output", {})
|
|
|
|
token_env = gitea.get("token_env", "GITEA_TOKEN")
|
|
token = (os.getenv(token_env) or None) if token_env else None
|
|
|
|
return Config(
|
|
base_url=gitea["base_url"],
|
|
owner=gitea["owner"],
|
|
repo=gitea["repo"],
|
|
ref=gitea.get("ref", "main"),
|
|
token=token,
|
|
source_root=source.get("root", "rule/Surge"),
|
|
source_filename_pattern=source.get("filename_pattern", "{name}.list"),
|
|
output_dir=output.get("dir", "dist"),
|
|
include_categories=source.get("include_categories", []),
|
|
exclude_categories=source.get("exclude_categories", []),
|
|
clash_no_resolve=output.get("clash_no_resolve", False),
|
|
mihomo_no_resolve=output.get("mihomo_no_resolve", False),
|
|
)
|
|
|
|
|
|
def parse_rules(content: str) -> list[RuleLine]:
|
|
rules: list[RuleLine] = []
|
|
seen: set[str] = set()
|
|
|
|
for original in content.splitlines():
|
|
line = original.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
|
|
if line in seen:
|
|
continue
|
|
seen.add(line)
|
|
|
|
parts = [part.strip() for part in line.split(",") if part.strip()]
|
|
if not parts:
|
|
continue
|
|
|
|
rules.append(RuleLine(raw=",".join(parts), rule_type=parts[0].upper()))
|
|
|
|
return rules
|
|
|
|
|
|
def to_payload_line(
|
|
rule: RuleLine,
|
|
no_resolve: bool,
|
|
unsupported_types: set[str],
|
|
type_mapping: dict[str, str] | None = None,
|
|
) -> str | None:
|
|
parts = [p.strip() for p in rule.raw.split(",") if p.strip()]
|
|
if not parts:
|
|
return None
|
|
|
|
rule_type = parts[0].upper()
|
|
mapped_type = (type_mapping or {}).get(rule_type, rule_type)
|
|
parts[0] = mapped_type
|
|
|
|
if rule_type in unsupported_types:
|
|
return None
|
|
|
|
if mapped_type in {"IP-CIDR", "IP-CIDR6"}:
|
|
payload = [mapped_type, parts[1]] if len(parts) >= 2 else parts
|
|
if no_resolve:
|
|
payload.append("no-resolve")
|
|
return ",".join(payload)
|
|
|
|
# Strip no-resolve from non-IP rules if present in source.
|
|
filtered = [p for p in parts if p.lower() != "no-resolve"]
|
|
return ",".join(filtered)
|
|
|
|
|
|
def format_surge(name: str, rules: list[RuleLine], source_path: str) -> str:
|
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
count = Counter(rule.rule_type for rule in rules)
|
|
|
|
header = [
|
|
f"# NAME: {name}",
|
|
"# AUTHOR: gitea-shunt-rules",
|
|
f"# SOURCE: {source_path}",
|
|
f"# UPDATED: {now}",
|
|
]
|
|
for k in sorted(count):
|
|
header.append(f"# {k}: {count[k]}")
|
|
header.append(f"# TOTAL: {len(rules)}")
|
|
|
|
body = [rule.raw for rule in rules]
|
|
return "\n".join(header + body) + "\n"
|
|
|
|
|
|
def format_loon(name: str, rules: list[RuleLine], source_path: str) -> str:
|
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
count = Counter(rule.rule_type for rule in rules)
|
|
|
|
header = [
|
|
f"# NAME: {name}",
|
|
"# AUTHOR: gitea-shunt-rules",
|
|
f"# SOURCE: {source_path}",
|
|
f"# UPDATED: {now}",
|
|
]
|
|
for k in sorted(count):
|
|
header.append(f"# {k}: {count[k]}")
|
|
header.append(f"# TOTAL: {len(rules)}")
|
|
|
|
body = [rule.raw for rule in rules]
|
|
return "\n".join(header + body) + "\n"
|
|
|
|
|
|
def format_yaml_payload(
|
|
name: str,
|
|
rules: list[RuleLine],
|
|
source_path: str,
|
|
no_resolve: bool,
|
|
unsupported_types: set[str],
|
|
type_mapping: dict[str, str] | None = None,
|
|
author_name: str = "gitea-shunt-rules",
|
|
) -> tuple[str, int]:
|
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
|
payload: list[str] = []
|
|
counter: Counter[str] = Counter()
|
|
skipped: Counter[str] = Counter()
|
|
|
|
for rule in rules:
|
|
converted = to_payload_line(
|
|
rule,
|
|
no_resolve=no_resolve,
|
|
unsupported_types=unsupported_types,
|
|
type_mapping=type_mapping,
|
|
)
|
|
if converted is None:
|
|
skipped[rule.rule_type] += 1
|
|
continue
|
|
payload.append(converted)
|
|
counter[rule.rule_type] += 1
|
|
|
|
lines = [
|
|
f"# NAME: {name}",
|
|
f"# AUTHOR: {author_name}",
|
|
f"# SOURCE: {source_path}",
|
|
f"# UPDATED: {now}",
|
|
]
|
|
for k in sorted(counter):
|
|
lines.append(f"# {k}: {counter[k]}")
|
|
for k in sorted(skipped):
|
|
lines.append(f"# SKIPPED-{k}: {skipped[k]}")
|
|
lines.append(f"# TOTAL: {len(payload)}")
|
|
lines.append("payload:")
|
|
lines.extend(f" - {item}" for item in payload)
|
|
|
|
return "\n".join(lines) + "\n", len(payload)
|
|
|
|
|
|
def should_include_category(name: str, cfg: Config, cli_names: set[str]) -> bool:
|
|
if cli_names and name not in cli_names:
|
|
return False
|
|
if cfg.include_categories and name not in cfg.include_categories:
|
|
return False
|
|
if name in cfg.exclude_categories:
|
|
return False
|
|
return True
|
|
|
|
|
|
def find_categories(client: GiteaClient, cfg: Config, cli_names: set[str]) -> list[str]:
|
|
if cfg.include_categories:
|
|
return sorted([n for n in cfg.include_categories if should_include_category(n, cfg, cli_names)])
|
|
|
|
entries = client.list_dir(cfg.owner, cfg.repo, cfg.source_root, cfg.ref)
|
|
categories: list[str] = []
|
|
|
|
for entry in entries:
|
|
entry_type = entry.get("type")
|
|
name = entry.get("name") or ""
|
|
|
|
# Support nested layout: rule/Surge/<Name>/<Name>.list
|
|
if entry_type == "dir":
|
|
if should_include_category(name, cfg, cli_names):
|
|
categories.append(name)
|
|
continue
|
|
|
|
# Support flat layout: rule/Surge/<Name>.list
|
|
if entry_type == "file" and name.endswith(".list"):
|
|
cat = name[: -len(".list")]
|
|
if cat and should_include_category(cat, cfg, cli_names):
|
|
categories.append(cat)
|
|
|
|
return sorted(categories)
|
|
|
|
|
|
def build_one_category(client: GiteaClient, cfg: Config, name: str, base_out: Path) -> tuple[int, int, int, int]:
|
|
filename_base = cfg.source_filename_pattern.format(name=name)
|
|
candidate_filenames = [
|
|
filename_base,
|
|
f"{name}_All.list",
|
|
f"{name}_Domain.list",
|
|
f"{name}_Resolve.list",
|
|
]
|
|
|
|
candidate_paths: list[str] = []
|
|
for fn in candidate_filenames:
|
|
candidate_paths.append(f"{cfg.source_root}/{name}/{fn}") # nested
|
|
candidate_paths.append(f"{cfg.source_root}/{fn}") # flat
|
|
|
|
source_rel_path = ""
|
|
source_content = ""
|
|
last_error: Exception | None = None
|
|
for path in candidate_paths:
|
|
try:
|
|
source_content = client.read_file(cfg.owner, cfg.repo, path, cfg.ref)
|
|
source_rel_path = path
|
|
break
|
|
except Exception as exc:
|
|
last_error = exc
|
|
|
|
if not source_rel_path:
|
|
if last_error is not None:
|
|
raise last_error
|
|
raise RuntimeError(f"unable to locate source list for category: {name}")
|
|
|
|
rules = parse_rules(source_content)
|
|
|
|
surge_out = base_out / "surge" / f"{name}.list"
|
|
loon_out = base_out / "loon" / f"{name}.list"
|
|
clash_out = base_out / "clash" / f"{name}.yaml"
|
|
mihomo_out = base_out / "mihomo" / f"{name}.yaml"
|
|
surge_out.parent.mkdir(parents=True, exist_ok=True)
|
|
loon_out.parent.mkdir(parents=True, exist_ok=True)
|
|
clash_out.parent.mkdir(parents=True, exist_ok=True)
|
|
mihomo_out.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
surge_out.write_text(format_surge(name, rules, source_rel_path), encoding="utf-8")
|
|
loon_out.write_text(format_loon(name, rules, source_rel_path), encoding="utf-8")
|
|
|
|
clash_text, clash_cnt = format_yaml_payload(
|
|
name,
|
|
rules,
|
|
source_rel_path,
|
|
no_resolve=cfg.clash_no_resolve,
|
|
unsupported_types=UNSUPPORTED_CLASH_TYPES,
|
|
type_mapping={"DEST-PORT": "DST-PORT"},
|
|
author_name="gitea-shunt-rules",
|
|
)
|
|
clash_out.write_text(clash_text, encoding="utf-8")
|
|
|
|
mihomo_text, mihomo_cnt = format_yaml_payload(
|
|
name,
|
|
rules,
|
|
source_rel_path,
|
|
no_resolve=cfg.mihomo_no_resolve,
|
|
unsupported_types=UNSUPPORTED_MIHOMO_TYPES,
|
|
type_mapping={"DEST-PORT": "DST-PORT"},
|
|
author_name="gitea-shunt-rules-mihomo",
|
|
)
|
|
mihomo_out.write_text(
|
|
mihomo_text,
|
|
encoding="utf-8",
|
|
)
|
|
|
|
# source and loon keep the same parsed set
|
|
return len(rules), len(rules), clash_cnt, mihomo_cnt
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(description="Generate Surge/Loon/Clash/Mihomo rules from Gitea source repo.")
|
|
p.add_argument("--config", default="config.toml", help="Path to config TOML file")
|
|
p.add_argument("--names", default="", help="Comma-separated category names, e.g. YouTube,Netflix")
|
|
return p.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
cfg = load_config(Path(args.config))
|
|
names = {x.strip() for x in args.names.split(",") if x.strip()}
|
|
|
|
client = GiteaClient(cfg.base_url, cfg.token)
|
|
categories = find_categories(client, cfg, names)
|
|
|
|
if not categories:
|
|
print("No categories found after filtering.", file=sys.stderr)
|
|
return 2
|
|
|
|
out_dir = Path(cfg.output_dir)
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
total_source = 0
|
|
total_loon = 0
|
|
total_clash = 0
|
|
total_mihomo = 0
|
|
|
|
print(f"Found {len(categories)} categories under {cfg.source_root}")
|
|
for idx, name in enumerate(categories, start=1):
|
|
try:
|
|
s_cnt, l_cnt, c_cnt, m_cnt = build_one_category(client, cfg, name, out_dir)
|
|
total_source += s_cnt
|
|
total_loon += l_cnt
|
|
total_clash += c_cnt
|
|
total_mihomo += m_cnt
|
|
print(f"[{idx}/{len(categories)}] {name}: source={s_cnt}, loon={l_cnt}, clash={c_cnt}, mihomo={m_cnt}")
|
|
except Exception as exc:
|
|
print(f"[{idx}/{len(categories)}] {name}: failed: {exc}", file=sys.stderr)
|
|
|
|
print(
|
|
"Done. "
|
|
f"source_rules={total_source}, "
|
|
f"loon_rules={total_loon}, "
|
|
f"clash_rules={total_clash}, "
|
|
f"mihomo_rules={total_mihomo}, "
|
|
f"output={out_dir.resolve()}"
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|