1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172import shutil
import subprocess
import atexit
from pathlib import Path
from urllib.request import urlopen
from tqdm.auto import tqdm
from pycloudflared.try_cloudflare import Urls, url_pattern, metrics_pattern
from pycloudflared.util import Info, get_info
def download(info: Info | None = None) -> str:
if info is None:
info = get_info()
if info.system == "darwin" and info.machine == "arm64":
print(
"* On a MacOS system with an Apple Silicon chip, "
"Rosetta 2 needs to be installed, "
"refer to this guide to learn more: "
"https://support.apple.com/en-us/HT211861"
) # noqa: E501
dest = Path('/tmp') / info.url.split("/")[-1]
with urlopen(info.url) as resp:
total = int(resp.headers.get("Content-Length", 0))
with tqdm.wrapattr(
resp, "read", total=total, desc="Download cloudflared..."
) as src:
with dest.open("wb") as dst:
shutil.copyfileobj(src, dst)
if info.system == "darwin":
# macOS file is a tgz archive
shutil.unpack_archive(dest, dest.parent)
dest.unlink()
excutable = dest.parent / "cloudflared"
else:
excutable = dest
excutable.chmod(0o777)
return str(excutable)
def patch_dns(api_url, api_key: str, target: str) -> None:
import json
import http.client
from urllib.parse import urlparse
fqdn = target.replace("https://", "")
parsed_url = urlparse(api_url)
body = json.dumps({"content": fqdn})
if parsed_url.scheme == "https":
conn = http.client.HTTPSConnection(parsed_url.hostname, parsed_url.port or 443)
else:
conn = http.client.HTTPConnection(parsed_url.hostname, parsed_url.port or 80)
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
conn.request(
"PATCH",
parsed_url.path or "/",
body=body,
headers=headers
)
response = conn.getresponse()
response.read() # Consume response to close the connection
conn.close()
print(f'DNS updated to {fqdn}')
class TryCloudflare:
def __init__(self):
self.running: dict[int, Urls] = {}
def __call__(
self,
port: int | str,
metrics_port: int | str | None = None,
verbose: bool = False,
update_dns: bool = False,
secrets: dict = None,
) -> Urls:
port = int(port)
if port in self.running:
urls = self.running[port]
if verbose:
self._print(urls.tunnel, urls.metrics)
return urls
self.running[port] = Urls('running', '', None)
info = get_info()
info.executable = Path('/tmp') / info.url.split("/")[-1]
if not Path(info.executable).exists():
info.executable = download(info)
args = [
info.executable,
"tunnel",
"--url",
f"http://127.0.0.1:{port}",
]
if metrics_port is not None:
args += [
"--metrics",
f"127.0.0.1:{metrics_port}",
]
if info.system == "darwin" and info.machine == "arm64":
args = ["arch", "-x86_64"] + args
cloudflared = subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
encoding="utf-8",
)
atexit.register(cloudflared.terminate)
tunnel_url = metrics_url = ""
lines = 20
for _ in range(lines):
line = cloudflared.stderr.readline()
url_match = url_pattern.search(line)
metric_match = metrics_pattern.search(line)
if url_match:
tunnel_url = url_match.group("url")
if metric_match:
metrics_url = "http://" + metric_match.group("url")
if tunnel_url:
break
else:
raise RuntimeError("Cloudflared failed to start")
urls = Urls(tunnel_url, metrics_url, cloudflared)
if verbose:
self._print(urls.tunnel, urls.metrics)
self.running[port] = urls
if update_dns:
patch_dns(secrets['dns_api_url'], secrets['dns_api_key'], tunnel_url)
return urls
@staticmethod
def _print(tunnel_url: str, metrics_url: str) -> None:
print(f" * Running on {tunnel_url}")
print(f" * Traffic stats available on {metrics_url}")
def terminate(self, port: int | str) -> None:
port = int(port)
if port in self.running:
self.running[port].process.terminate()
atexit.unregister(self.running[port].process.terminate)
del self.running[port]
else:
raise ValueError(f"port {port!r} is not running.")
cloudflared = TryCloudflare()