chore: reorganize repository layout and optimize README
This commit is contained in:
+524
@@ -0,0 +1,524 @@
|
||||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from collections import Counter
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.parse import quote
|
||||
from urllib.error import HTTPError
|
||||
from urllib.request import Request, urlopen
|
||||
try:
|
||||
import tomllib
|
||||
except ModuleNotFoundError: # Python < 3.11
|
||||
tomllib = None
|
||||
|
||||
|
||||
DEFAULT_LIMIT = 100
|
||||
UNSUPPORTED_CLASH_TYPES = {
|
||||
"USER-AGENT",
|
||||
"URL-REGEX",
|
||||
}
|
||||
UNSUPPORTED_MIHOMO_TYPES = {
|
||||
"USER-AGENT",
|
||||
"URL-REGEX",
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
base_url: str
|
||||
owner: str
|
||||
repo: str
|
||||
ref: str
|
||||
token: str | None
|
||||
source_mode: str
|
||||
local_source_root: str
|
||||
source_root: str
|
||||
source_filename_pattern: str
|
||||
output_dir: str
|
||||
include_categories: list[str]
|
||||
exclude_categories: list[str]
|
||||
clash_no_resolve: bool
|
||||
mihomo_no_resolve: bool
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RuleLine:
|
||||
raw: str
|
||||
rule_type: str
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, base_url: str, token: str | None):
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.token = token
|
||||
|
||||
def _request_json(self, path: str, params: dict[str, Any] | None = None) -> Any:
|
||||
url = f"{self.base_url}{path}"
|
||||
if params:
|
||||
query = "&".join(f"{quote(str(k))}={quote(str(v))}" for k, v in params.items())
|
||||
url = f"{url}?{query}"
|
||||
|
||||
headers = {"Accept": "application/json"}
|
||||
if self.token:
|
||||
headers["Authorization"] = f"token {self.token}"
|
||||
|
||||
req = Request(url, headers=headers)
|
||||
with urlopen(req) as resp:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
def _request_text_url(self, url: str) -> str:
|
||||
headers = {"Accept": "text/plain"}
|
||||
if self.token:
|
||||
headers["Authorization"] = f"token {self.token}"
|
||||
req = Request(url, headers=headers)
|
||||
with urlopen(req) as resp:
|
||||
return resp.read().decode("utf-8", errors="replace")
|
||||
|
||||
def list_dir(self, owner: str, repo: str, path: str, ref: str) -> list[dict[str, Any]]:
|
||||
encoded_path = quote(path.strip("/"), safe="/")
|
||||
endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}"
|
||||
|
||||
page = 1
|
||||
all_items: list[dict[str, Any]] = []
|
||||
while True:
|
||||
data = self._request_json(endpoint, {"ref": ref, "page": page, "limit": DEFAULT_LIMIT})
|
||||
if isinstance(data, dict):
|
||||
raise RuntimeError(f"Path is not a directory: {path}")
|
||||
items = list(data)
|
||||
all_items.extend(items)
|
||||
if len(items) < DEFAULT_LIMIT:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return all_items
|
||||
|
||||
def read_file(self, owner: str, repo: str, path: str, ref: str) -> str:
|
||||
encoded_path = quote(path.strip("/"), safe="/")
|
||||
endpoint = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{encoded_path}"
|
||||
try:
|
||||
data = self._request_json(endpoint, {"ref": ref})
|
||||
except HTTPError as exc:
|
||||
if exc.code != 404:
|
||||
raise
|
||||
# Fallback for environments where Gitea API route is not exposed.
|
||||
raw_url = (
|
||||
f"{self.base_url}/{quote(owner)}/{quote(repo)}/raw/branch/"
|
||||
f"{quote(ref)}/{encoded_path}"
|
||||
)
|
||||
return self._request_text_url(raw_url)
|
||||
|
||||
if not isinstance(data, dict):
|
||||
raise RuntimeError(f"Path is not a file: {path}")
|
||||
|
||||
content = data.get("content")
|
||||
encoding = data.get("encoding")
|
||||
if not content:
|
||||
raise RuntimeError(f"Missing file content for path: {path}")
|
||||
if encoding != "base64":
|
||||
raise RuntimeError(f"Unsupported encoding ({encoding}) for path: {path}")
|
||||
|
||||
return base64.b64decode(content).decode("utf-8", errors="replace")
|
||||
|
||||
|
||||
def load_config(path: Path) -> Config:
|
||||
if path.suffix.lower() == ".json":
|
||||
raw = json.loads(path.read_text(encoding="utf-8"))
|
||||
else:
|
||||
if tomllib is None:
|
||||
raise RuntimeError(
|
||||
"TOML config requires Python 3.11+. "
|
||||
"Use Python 3.11+ or provide a JSON config file."
|
||||
)
|
||||
raw = tomllib.loads(path.read_text(encoding="utf-8"))
|
||||
|
||||
gitea = raw.get("gitea", {})
|
||||
source = raw.get("source", {})
|
||||
output = raw.get("output", {})
|
||||
|
||||
token_env = gitea.get("token_env", "GITEA_TOKEN")
|
||||
token = (os.getenv(token_env) or None) if token_env else None
|
||||
|
||||
return Config(
|
||||
base_url=gitea["base_url"],
|
||||
owner=gitea["owner"],
|
||||
repo=gitea["repo"],
|
||||
ref=gitea.get("ref", "main"),
|
||||
token=token,
|
||||
source_mode=source.get("mode", "gitea"),
|
||||
local_source_root=source.get("local_root", "."),
|
||||
source_root=source.get("root", "rule/Surge"),
|
||||
source_filename_pattern=source.get("filename_pattern", "{name}.list"),
|
||||
output_dir=output.get("dir", "dist"),
|
||||
include_categories=source.get("include_categories", []),
|
||||
exclude_categories=source.get("exclude_categories", []),
|
||||
clash_no_resolve=output.get("clash_no_resolve", False),
|
||||
mihomo_no_resolve=output.get("mihomo_no_resolve", False),
|
||||
)
|
||||
|
||||
|
||||
def parse_rules(content: str) -> list[RuleLine]:
|
||||
rules: list[RuleLine] = []
|
||||
seen: set[str] = set()
|
||||
|
||||
for original in content.splitlines():
|
||||
line = original.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
|
||||
if line in seen:
|
||||
continue
|
||||
seen.add(line)
|
||||
|
||||
# Domain-only files (e.g. *_Domain.list) may contain plain host suffixes
|
||||
# without a rule prefix. Normalize them to DOMAIN-SUFFIX.
|
||||
if "," not in line:
|
||||
domain = line.lstrip(".").strip()
|
||||
if not domain:
|
||||
continue
|
||||
normalized = f"DOMAIN-SUFFIX,{domain}"
|
||||
if normalized in seen:
|
||||
continue
|
||||
seen.add(normalized)
|
||||
rules.append(RuleLine(raw=normalized, rule_type="DOMAIN-SUFFIX"))
|
||||
continue
|
||||
|
||||
parts = [part.strip() for part in line.split(",") if part.strip()]
|
||||
if not parts:
|
||||
continue
|
||||
|
||||
rules.append(RuleLine(raw=",".join(parts), rule_type=parts[0].upper()))
|
||||
|
||||
return rules
|
||||
|
||||
|
||||
def to_payload_line(
|
||||
rule: RuleLine,
|
||||
no_resolve: bool,
|
||||
unsupported_types: set[str],
|
||||
type_mapping: dict[str, str] | None = None,
|
||||
) -> str | None:
|
||||
parts = [p.strip() for p in rule.raw.split(",") if p.strip()]
|
||||
if not parts:
|
||||
return None
|
||||
|
||||
rule_type = parts[0].upper()
|
||||
mapped_type = (type_mapping or {}).get(rule_type, rule_type)
|
||||
parts[0] = mapped_type
|
||||
|
||||
if rule_type in unsupported_types:
|
||||
return None
|
||||
|
||||
if mapped_type in {"IP-CIDR", "IP-CIDR6"}:
|
||||
payload = [mapped_type, parts[1]] if len(parts) >= 2 else parts
|
||||
if no_resolve:
|
||||
payload.append("no-resolve")
|
||||
return ",".join(payload)
|
||||
|
||||
# Strip no-resolve from non-IP rules if present in source.
|
||||
filtered = [p for p in parts if p.lower() != "no-resolve"]
|
||||
return ",".join(filtered)
|
||||
|
||||
|
||||
def format_surge(name: str, rules: list[RuleLine], source_path: str) -> str:
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
||||
count = Counter(rule.rule_type for rule in rules)
|
||||
|
||||
header = [
|
||||
f"# NAME: {name}",
|
||||
"# AUTHOR: gitea-shunt-rules",
|
||||
f"# SOURCE: {source_path}",
|
||||
f"# UPDATED: {now}",
|
||||
]
|
||||
for k in sorted(count):
|
||||
header.append(f"# {k}: {count[k]}")
|
||||
header.append(f"# TOTAL: {len(rules)}")
|
||||
|
||||
body = [rule.raw for rule in rules]
|
||||
return "\n".join(header + body) + "\n"
|
||||
|
||||
|
||||
def format_loon(name: str, rules: list[RuleLine], source_path: str) -> str:
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
||||
count = Counter(rule.rule_type for rule in rules)
|
||||
|
||||
header = [
|
||||
f"# NAME: {name}",
|
||||
"# AUTHOR: gitea-shunt-rules",
|
||||
f"# SOURCE: {source_path}",
|
||||
f"# UPDATED: {now}",
|
||||
]
|
||||
for k in sorted(count):
|
||||
header.append(f"# {k}: {count[k]}")
|
||||
header.append(f"# TOTAL: {len(rules)}")
|
||||
|
||||
body = [rule.raw for rule in rules]
|
||||
return "\n".join(header + body) + "\n"
|
||||
|
||||
|
||||
def format_yaml_payload(
|
||||
name: str,
|
||||
rules: list[RuleLine],
|
||||
source_path: str,
|
||||
no_resolve: bool,
|
||||
unsupported_types: set[str],
|
||||
type_mapping: dict[str, str] | None = None,
|
||||
author_name: str = "gitea-shunt-rules",
|
||||
) -> tuple[str, int]:
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
||||
|
||||
payload: list[str] = []
|
||||
counter: Counter[str] = Counter()
|
||||
skipped: Counter[str] = Counter()
|
||||
|
||||
for rule in rules:
|
||||
converted = to_payload_line(
|
||||
rule,
|
||||
no_resolve=no_resolve,
|
||||
unsupported_types=unsupported_types,
|
||||
type_mapping=type_mapping,
|
||||
)
|
||||
if converted is None:
|
||||
skipped[rule.rule_type] += 1
|
||||
continue
|
||||
payload.append(converted)
|
||||
counter[rule.rule_type] += 1
|
||||
|
||||
lines = [
|
||||
f"# NAME: {name}",
|
||||
f"# AUTHOR: {author_name}",
|
||||
f"# SOURCE: {source_path}",
|
||||
f"# UPDATED: {now}",
|
||||
]
|
||||
for k in sorted(counter):
|
||||
lines.append(f"# {k}: {counter[k]}")
|
||||
for k in sorted(skipped):
|
||||
lines.append(f"# SKIPPED-{k}: {skipped[k]}")
|
||||
lines.append(f"# TOTAL: {len(payload)}")
|
||||
lines.append("payload:")
|
||||
lines.extend(f" - {item}" for item in payload)
|
||||
|
||||
return "\n".join(lines) + "\n", len(payload)
|
||||
|
||||
|
||||
def should_include_category(name: str, cfg: Config, cli_names: set[str]) -> bool:
|
||||
if cli_names and name not in cli_names:
|
||||
return False
|
||||
if cfg.include_categories and name not in cfg.include_categories:
|
||||
return False
|
||||
if name in cfg.exclude_categories:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def local_abs_path(cfg: Config, relative_path: str) -> Path:
|
||||
return Path(cfg.local_source_root).expanduser().resolve() / relative_path
|
||||
|
||||
|
||||
def list_dir_source(client: GiteaClient, cfg: Config, path: str) -> list[dict[str, Any]]:
|
||||
if cfg.source_mode == "local":
|
||||
base = local_abs_path(cfg, path)
|
||||
if not base.is_dir():
|
||||
raise RuntimeError(f"Local source path is not a directory: {base}")
|
||||
entries: list[dict[str, Any]] = []
|
||||
for p in base.iterdir():
|
||||
entry_type = "dir" if p.is_dir() else "file"
|
||||
entries.append({"name": p.name, "type": entry_type})
|
||||
return entries
|
||||
|
||||
return client.list_dir(cfg.owner, cfg.repo, path, cfg.ref)
|
||||
|
||||
|
||||
def read_source_file(client: GiteaClient, cfg: Config, path: str) -> str:
|
||||
if cfg.source_mode == "local":
|
||||
local_path = local_abs_path(cfg, path)
|
||||
if not local_path.is_file():
|
||||
raise FileNotFoundError(str(local_path))
|
||||
return local_path.read_text(encoding="utf-8", errors="replace")
|
||||
|
||||
return client.read_file(cfg.owner, cfg.repo, path, cfg.ref)
|
||||
|
||||
|
||||
def find_categories(client: GiteaClient, cfg: Config, cli_names: set[str]) -> list[str]:
|
||||
if cfg.include_categories:
|
||||
return sorted([n for n in cfg.include_categories if should_include_category(n, cfg, cli_names)])
|
||||
|
||||
entries = list_dir_source(client, cfg, cfg.source_root)
|
||||
categories: list[str] = []
|
||||
|
||||
for entry in entries:
|
||||
entry_type = entry.get("type")
|
||||
name = entry.get("name") or ""
|
||||
|
||||
# Support nested layout: rule/Surge/<Name>/<Name>.list
|
||||
if entry_type == "dir":
|
||||
if should_include_category(name, cfg, cli_names):
|
||||
categories.append(name)
|
||||
continue
|
||||
|
||||
# Support flat layout: rule/Surge/<Name>.list
|
||||
if entry_type == "file" and name.endswith(".list"):
|
||||
cat = name[: -len(".list")]
|
||||
if cat and should_include_category(cat, cfg, cli_names):
|
||||
categories.append(cat)
|
||||
|
||||
return sorted(categories)
|
||||
|
||||
|
||||
def read_file_optional(client: GiteaClient, cfg: Config, candidate_paths: list[str]) -> tuple[str | None, str | None]:
|
||||
for path in candidate_paths:
|
||||
try:
|
||||
return path, read_source_file(client, cfg, path)
|
||||
except Exception:
|
||||
continue
|
||||
return None, None
|
||||
|
||||
|
||||
def build_one_category(client: GiteaClient, cfg: Config, name: str, base_out: Path) -> tuple[int, int, int, int]:
|
||||
filename_base = cfg.source_filename_pattern.format(name=name)
|
||||
# Preferred merge model:
|
||||
# 1) <Name>.list (keyword/ua/ip with no-resolve)
|
||||
# 2) <Name>_Domain.list (domain rules)
|
||||
# 3) <Name>_Resolve.list (keyword/ua/ip without no-resolve)
|
||||
# Merge then dedupe.
|
||||
merge_filenames = [
|
||||
filename_base,
|
||||
f"{name}_Domain.list",
|
||||
f"{name}_Resolve.list",
|
||||
]
|
||||
|
||||
merged_chunks: list[str] = []
|
||||
merged_sources: list[str] = []
|
||||
for fn in merge_filenames:
|
||||
nested = f"{cfg.source_root}/{name}/{fn}"
|
||||
flat = f"{cfg.source_root}/{fn}"
|
||||
src_path, src_content = read_file_optional(client, cfg, [nested, flat])
|
||||
if src_path and src_content is not None:
|
||||
merged_sources.append(src_path)
|
||||
merged_chunks.append(src_content)
|
||||
|
||||
if merged_chunks:
|
||||
source_rel_path = " + ".join(merged_sources)
|
||||
rules = parse_rules("\n".join(merged_chunks))
|
||||
else:
|
||||
# Fallback for categories that only provide *_All.list or other variants.
|
||||
fallback_filenames = [
|
||||
f"{name}_All.list",
|
||||
f"{name}_Domain.list",
|
||||
f"{name}_Resolve.list",
|
||||
filename_base,
|
||||
]
|
||||
source_rel_path = ""
|
||||
source_content = ""
|
||||
for fn in fallback_filenames:
|
||||
nested = f"{cfg.source_root}/{name}/{fn}"
|
||||
flat = f"{cfg.source_root}/{fn}"
|
||||
src_path, src_content = read_file_optional(client, cfg, [nested, flat])
|
||||
if src_path and src_content is not None:
|
||||
source_rel_path = src_path
|
||||
source_content = src_content
|
||||
break
|
||||
if not source_rel_path:
|
||||
raise RuntimeError(f"unable to locate source list for category: {name}")
|
||||
rules = parse_rules(source_content)
|
||||
|
||||
surge_out = base_out / "surge" / f"{name}.list"
|
||||
loon_out = base_out / "loon" / f"{name}.list"
|
||||
clash_out = base_out / "clash" / f"{name}.yaml"
|
||||
mihomo_out = base_out / "mihomo" / f"{name}.yaml"
|
||||
surge_out.parent.mkdir(parents=True, exist_ok=True)
|
||||
loon_out.parent.mkdir(parents=True, exist_ok=True)
|
||||
clash_out.parent.mkdir(parents=True, exist_ok=True)
|
||||
mihomo_out.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
surge_out.write_text(format_surge(name, rules, source_rel_path), encoding="utf-8")
|
||||
loon_out.write_text(format_loon(name, rules, source_rel_path), encoding="utf-8")
|
||||
|
||||
clash_text, clash_cnt = format_yaml_payload(
|
||||
name,
|
||||
rules,
|
||||
source_rel_path,
|
||||
no_resolve=cfg.clash_no_resolve,
|
||||
unsupported_types=UNSUPPORTED_CLASH_TYPES,
|
||||
type_mapping={"DEST-PORT": "DST-PORT"},
|
||||
author_name="gitea-shunt-rules",
|
||||
)
|
||||
clash_out.write_text(clash_text, encoding="utf-8")
|
||||
|
||||
mihomo_text, mihomo_cnt = format_yaml_payload(
|
||||
name,
|
||||
rules,
|
||||
source_rel_path,
|
||||
no_resolve=cfg.mihomo_no_resolve,
|
||||
unsupported_types=UNSUPPORTED_MIHOMO_TYPES,
|
||||
type_mapping={"DEST-PORT": "DST-PORT"},
|
||||
author_name="gitea-shunt-rules-mihomo",
|
||||
)
|
||||
mihomo_out.write_text(
|
||||
mihomo_text,
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
# source and loon keep the same parsed set
|
||||
return len(rules), len(rules), clash_cnt, mihomo_cnt
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description="Generate Surge/Loon/Clash/Mihomo rules from Gitea source repo.")
|
||||
p.add_argument("--config", default="configs/config.toml", help="Path to config JSON/TOML file")
|
||||
p.add_argument("--names", default="", help="Comma-separated category names, e.g. YouTube,Netflix")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
cfg = load_config(Path(args.config))
|
||||
names = {x.strip() for x in args.names.split(",") if x.strip()}
|
||||
|
||||
client = GiteaClient(cfg.base_url, cfg.token)
|
||||
categories = find_categories(client, cfg, names)
|
||||
|
||||
if not categories:
|
||||
print("No categories found after filtering.", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
out_dir = Path(cfg.output_dir)
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
total_source = 0
|
||||
total_loon = 0
|
||||
total_clash = 0
|
||||
total_mihomo = 0
|
||||
|
||||
print(f"Found {len(categories)} categories under {cfg.source_root}")
|
||||
for idx, name in enumerate(categories, start=1):
|
||||
try:
|
||||
s_cnt, l_cnt, c_cnt, m_cnt = build_one_category(client, cfg, name, out_dir)
|
||||
total_source += s_cnt
|
||||
total_loon += l_cnt
|
||||
total_clash += c_cnt
|
||||
total_mihomo += m_cnt
|
||||
print(f"[{idx}/{len(categories)}] {name}: source={s_cnt}, loon={l_cnt}, clash={c_cnt}, mihomo={m_cnt}")
|
||||
except Exception as exc:
|
||||
print(f"[{idx}/{len(categories)}] {name}: failed: {exc}", file=sys.stderr)
|
||||
|
||||
print(
|
||||
"Done. "
|
||||
f"source_rules={total_source}, "
|
||||
f"loon_rules={total_loon}, "
|
||||
f"clash_rules={total_clash}, "
|
||||
f"mihomo_rules={total_mihomo}, "
|
||||
f"output={out_dir.resolve()}"
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user