comparison host/acme_tiny.py @ 2034:d718511fc69f acme-tiny

Begin work on moving to tiny-acme.
author Violet7
date Tue, 04 Nov 2025 20:28:50 -0800
parents
children
comparison
equal deleted inserted replaced
2033:905a6ade55f2 2034:d718511fc69f
1 #!/usr/bin/env python3
2 # Copyright Daniel Roesler, under MIT license, see LICENSE at github.com/diafygi/acme-tiny
3 import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging
4 try:
5 from urllib.request import urlopen, Request # Python 3
6 except ImportError: # pragma: no cover
7 from urllib2 import urlopen, Request # Python 2
8
9 DEFAULT_CA = "https://acme-v02.api.letsencrypt.org" # DEPRECATED! USE DEFAULT_DIRECTORY_URL INSTEAD
10 DEFAULT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory"
11
12 LOGGER = logging.getLogger(__name__)
13 LOGGER.addHandler(logging.StreamHandler())
14 LOGGER.setLevel(logging.INFO)
15
16 def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None):
17 directory, acct_headers, alg, jwk = None, None, None, None # global variables
18
19 # helper functions - base64 encode for jose spec
20 def _b64(b):
21 return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
22
23 # helper function - run external commands
24 def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"):
25 proc = subprocess.Popen(cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
26 out, err = proc.communicate(cmd_input)
27 if proc.returncode != 0:
28 raise IOError("{0}\n{1}".format(err_msg, err))
29 return out
30
31 # helper function - make request and automatically parse json response
32 def _do_request(url, data=None, err_msg="Error", depth=0):
33 try:
34 resp = urlopen(Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-tiny"}))
35 resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers
36 except IOError as e:
37 resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e)
38 code, headers = getattr(e, "code", None), {}
39 try:
40 resp_data = json.loads(resp_data) # try to parse json results
41 except ValueError:
42 pass # ignore json parsing errors
43 if depth < 100 and code == 400 and resp_data['type'] == "urn:ietf:params:acme:error:badNonce":
44 raise IndexError(resp_data) # allow 100 retrys for bad nonces
45 if code not in [200, 201, 204]:
46 raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, code, resp_data))
47 return resp_data, code, headers
48
49 # helper function - make signed requests
50 def _send_signed_request(url, payload, err_msg, depth=0):
51 payload64 = "" if payload is None else _b64(json.dumps(payload).encode('utf8'))
52 new_nonce = _do_request(directory['newNonce'])[2]['Replay-Nonce']
53 protected = {"url": url, "alg": alg, "nonce": new_nonce}
54 protected.update({"jwk": jwk} if acct_headers is None else {"kid": acct_headers['Location']})
55 protected64 = _b64(json.dumps(protected).encode('utf8'))
56 protected_input = "{0}.{1}".format(protected64, payload64).encode('utf8')
57 out = _cmd(["openssl", "dgst", "-sha256", "-sign", account_key], stdin=subprocess.PIPE, cmd_input=protected_input, err_msg="OpenSSL Error")
58 data = json.dumps({"protected": protected64, "payload": payload64, "signature": _b64(out)})
59 try:
60 return _do_request(url, data=data.encode('utf8'), err_msg=err_msg, depth=depth)
61 except IndexError: # retry bad nonces (they raise IndexError)
62 return _send_signed_request(url, payload, err_msg, depth=(depth + 1))
63
64 # helper function - poll until complete
65 def _poll_until_not(url, pending_statuses, err_msg):
66 result, t0 = None, time.time()
67 while result is None or result['status'] in pending_statuses:
68 assert (time.time() - t0 < 3600), "Polling timeout" # 1 hour timeout
69 time.sleep(0 if result is None else 2)
70 result, _, _ = _send_signed_request(url, None, err_msg)
71 return result
72
73 # parse account key to get public key
74 log.info("Parsing account key...")
75 out = _cmd(["openssl", "rsa", "-in", account_key, "-noout", "-text"], err_msg="OpenSSL Error")
76 pub_pattern = r"modulus:[\s]+?00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)"
77 pub_hex, pub_exp = re.search(pub_pattern, out.decode('utf8'), re.MULTILINE|re.DOTALL).groups()
78 pub_exp = "{0:x}".format(int(pub_exp))
79 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
80 alg, jwk = "RS256", {
81 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
82 "kty": "RSA",
83 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
84 }
85 accountkey_json = json.dumps(jwk, sort_keys=True, separators=(',', ':'))
86 thumbprint = _b64(hashlib.sha256(accountkey_json.encode('utf8')).digest())
87
88 # find domains
89 log.info("Parsing CSR...")
90 out = _cmd(["openssl", "req", "-in", csr, "-noout", "-text"], err_msg="Error loading {0}".format(csr))
91 domains = set([])
92 common_name = re.search(r"Subject:.*? CN\s?=\s?([^\s,;/]+)", out.decode('utf8'))
93 if common_name is not None:
94 domains.add(common_name.group(1))
95 subject_alt_names = re.search(r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL)
96 if subject_alt_names is not None:
97 for san in subject_alt_names.group(1).split(", "):
98 if san.startswith("DNS:"):
99 domains.add(san[4:])
100 log.info(u"Found domains: {0}".format(", ".join(domains)))
101
102 # get the ACME directory of urls
103 log.info("Getting directory...")
104 directory_url = CA + "/directory" if CA != DEFAULT_CA else directory_url # backwards compatibility with deprecated CA kwarg
105 directory, _, _ = _do_request(directory_url, err_msg="Error getting directory")
106 log.info("Directory found!")
107
108 # create account, update contact details (if any), and set the global key identifier
109 log.info("Registering account...")
110 reg_payload = {"termsOfServiceAgreed": True} if contact is None else {"termsOfServiceAgreed": True, "contact": contact}
111 account, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering")
112 log.info("{0} Account ID: {1}".format("Registered!" if code == 201 else "Already registered!", acct_headers['Location']))
113 if contact is not None:
114 account, _, _ = _send_signed_request(acct_headers['Location'], {"contact": contact}, "Error updating contact details")
115 log.info("Updated contact details:\n{0}".format("\n".join(account.get('contact') or [])))
116
117 # create a new order
118 log.info("Creating new order...")
119 order_payload = {"identifiers": [{"type": "dns", "value": d} for d in domains]}
120 order, _, order_headers = _send_signed_request(directory['newOrder'], order_payload, "Error creating new order")
121 log.info("Order created!")
122
123 # get the authorizations that need to be completed
124 for auth_url in order['authorizations']:
125 authorization, _, _ = _send_signed_request(auth_url, None, "Error getting challenges")
126 domain = authorization['identifier']['value']
127
128 # skip if already valid
129 if authorization['status'] == "valid":
130 log.info("Already verified: {0}, skipping...".format(domain))
131 continue
132 log.info("Verifying {0}...".format(domain))
133
134 # find the http-01 challenge and write the challenge file
135 challenge = [c for c in authorization['challenges'] if c['type'] == "http-01"][0]
136 token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
137 keyauthorization = "{0}.{1}".format(token, thumbprint)
138 wellknown_path = os.path.join(acme_dir, token)
139 with open(wellknown_path, "w") as wellknown_file:
140 wellknown_file.write(keyauthorization)
141
142 # check that the file is in place
143 try:
144 wellknown_url = "http://{0}{1}/.well-known/acme-challenge/{2}".format(domain, "" if check_port is None else ":{0}".format(check_port), token)
145 assert (disable_check or _do_request(wellknown_url)[0] == keyauthorization)
146 except (AssertionError, ValueError) as e:
147 raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e))
148
149 # say the challenge is done
150 _send_signed_request(challenge['url'], {}, "Error submitting challenges: {0}".format(domain))
151 authorization = _poll_until_not(auth_url, ["pending"], "Error checking challenge status for {0}".format(domain))
152 if authorization['status'] != "valid":
153 raise ValueError("Challenge did not pass for {0}: {1}".format(domain, authorization))
154 os.remove(wellknown_path)
155 log.info("{0} verified!".format(domain))
156
157 # finalize the order with the csr
158 log.info("Signing certificate...")
159 csr_der = _cmd(["openssl", "req", "-in", csr, "-outform", "DER"], err_msg="DER Export Error")
160 _send_signed_request(order['finalize'], {"csr": _b64(csr_der)}, "Error finalizing order")
161
162 # poll the order to monitor when it's done
163 order = _poll_until_not(order_headers['Location'], ["pending", "processing"], "Error checking order status")
164 if order['status'] != "valid":
165 raise ValueError("Order failed: {0}".format(order))
166
167 # download the certificate
168 certificate_pem, _, _ = _send_signed_request(order['certificate'], None, "Certificate download failed")
169 log.info("Certificate signed!")
170 return certificate_pem
171
172 def main(argv=None):
173 parser = argparse.ArgumentParser(
174 formatter_class=argparse.RawDescriptionHelpFormatter,
175 description=textwrap.dedent("""\
176 This script automates the process of getting a signed TLS certificate from Let's Encrypt using the ACME protocol.
177 It will need to be run on your server and have access to your private account key, so PLEASE READ THROUGH IT!
178 It's only ~200 lines, so it won't take long.
179
180 Example Usage: python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed_chain.crt
181 """)
182 )
183 parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key")
184 parser.add_argument("--csr", required=True, help="path to your certificate signing request")
185 parser.add_argument("--acme-dir", required=True, help="path to the .well-known/acme-challenge/ directory")
186 parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors")
187 parser.add_argument("--disable-check", default=False, action="store_true", help="disable checking if the challenge file is hosted correctly before telling the CA")
188 parser.add_argument("--directory-url", default=DEFAULT_DIRECTORY_URL, help="certificate authority directory url, default is Let's Encrypt")
189 parser.add_argument("--ca", default=DEFAULT_CA, help="DEPRECATED! USE --directory-url INSTEAD!")
190 parser.add_argument("--contact", metavar="CONTACT", default=None, nargs="*", help="Contact details (e.g. mailto:aaa@bbb.com) for your account-key")
191 parser.add_argument("--check-port", metavar="PORT", default=None, help="what port to use when self-checking the challenge file, default is port 80")
192
193 args = parser.parse_args(argv)
194 LOGGER.setLevel(args.quiet or LOGGER.level)
195 signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port)
196 sys.stdout.write(signed_crt)
197
198 if __name__ == "__main__": # pragma: no cover
199 main(sys.argv[1:])