๐Ÿ“ฆ filterpaper / blocklists

๐Ÿ“„ build_blocklist.py ยท 82 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82#!/usr/bin/env python3
"""Build blocklist by downloading sources, extracting and validating domains."""

import re
import socket
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen

URL_SOURCE = "https://github.com/filterpaper/blocklists/raw/main/src.txt"
DOMAIN_RE = re.compile(r'(([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,63})')
MAX_ENTRIES = 330000
URL_WORKERS = 8
DNS_WORKERS = 100
DNS_TIMEOUT = 3.0

def fetch_url(url: str) -> str:
    """Fetch content from URL."""
    try:
        with urlopen(url, timeout=30) as r:
            return r.read().decode('utf-8', errors='ignore')
    except Exception:
        return ""

def extract_domains(text: str) -> set[str]:
    """Extract valid domain patterns from text."""
    domains = set()
    for line in text.splitlines():
        # Allow lines starting with alphanumeric or * (for RPZ wildcards)
        if line and (line[0].isalnum() or line[0] == '*'):
            if match := DOMAIN_RE.search(line):
                domains.add(match.group(0).lower())
    return domains

def resolve_domain(domain: str) -> str | None:
    """Return domain if it resolves to IPv4 or IPv6, None otherwise."""
    try:
        socket.setdefaulttimeout(DNS_TIMEOUT)
        socket.getaddrinfo(domain, None)
        return domain
    except (socket.gaierror, socket.timeout):
        return None

def main():
    # Fetch source list
    sources = fetch_url(URL_SOURCE)
    src_url = [u.strip() for u in sources.splitlines() if u.strip()]

    # Download all sources in parallel and extract domains
    domains = set()
    with ThreadPoolExecutor(max_workers=URL_WORKERS) as pool:
        for url_content in pool.map(fetch_url, src_url):
            domains.update(extract_domains(url_content))

    total = len(domains)
    print(f"Extracted {total} unique domains", file=sys.stderr)

    # Validate domains in parallel (removing invalid ones)
    completed = 0
    with ThreadPoolExecutor(max_workers=DNS_WORKERS) as pool:
        futures = {pool.submit(resolve_domain, d): d for d in domains}
        for future in as_completed(futures):
            if not future.result():
                domains.discard(futures[future])
            completed += 1
            if completed % 10000 == 0:
                print(f"Validated {completed}/{total} ({completed*100//total}%)", file=sys.stderr)

    count = len(domains)
    if count > MAX_ENTRIES:
        print(f"Too many valid entries: {count}", file=sys.stderr)
        sys.exit(0)
    else:
        print(f"Valid entries: {count}", file=sys.stderr)

    # Output unbound format
    for entry in domains:
        print(f'local-zone: "{entry}" always_null')

if __name__ == "__main__":
    main()