summarylogtreecommitdiffstats
path: root/vultr-hook.py
blob: 41a11090566870986216cee161fe7d622b9d06a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python3
import os
import requests
import sys
from typing import Iterator

DOMAIN = os.environ.get("CERTBOT_DOMAIN")
TOKEN = os.environ.get("CERTBOT_VALIDATION")
AUTH_OUTPUT = os.environ.get("CERTBOT_AUTH_OUTPUT")
API_KEY = os.environ.get("VULTR_API_KEY")

API_EP = "https://api.vultr.com/v1/"
DOMAIN_LIST_EP = "{}{}".format(API_EP, "dns/list")
RECORD_LIST_EP = "{}{}".format(API_EP, "dns/records")
RECORD_ADD_EP = "{}{}".format(API_EP, "dns/create_record")
RECORD_DEL_EP = "{}{}".format(API_EP, "dns/delete_record")
SESSION = requests.Session()
SESSION.headers.update({"API-Key": API_KEY})


def main():
    if None in [DOMAIN, TOKEN, API_KEY]:
        sys.stderr.write("CERTBOT_DOMAIN, CERTBOT_VALIDATION "
                         " and VULTR_API_KEY must all be defined\n")
        sys.exit(1)

    if AUTH_OUTPUT is None:
        pre_hook()
    else:
        post_hook()


def subdomains(domain: str) -> Iterator[str]:
    """Generator of all subdomains within the domain,
    given from the longest to the shortest.

    Ex: "one.two.three" ->
            "one.two.three"
            "two.three"
            "three"
    """
    for i in range(domain.count(".") + 1):
        yield domain.split(".", i)[-1]


def pre_hook():
    """Add a new TXT record."""

    # extract all domains managed by the Vultr API
    managed_domains = SESSION.get(DOMAIN_LIST_EP).json()
    managed_domains = [o["domain"] for o in managed_domains]
    managed_domain = None

    # find the subdomain of DOMAIN managed by Vultr
    for subdomain in subdomains(DOMAIN):
        if subdomain in managed_domains:
            managed_domain = subdomain
            break
    if managed_domain is None:
        raise ValueError(
            "no suitable managed domain found for {}".format(DOMAIN))

    # echo managed domain for post hook to pick up
    sys.stdout.write(managed_domain)

    # get the subdomain part relative to the managed domain
    subdomain = DOMAIN.rpartition(managed_domain)[0][:-1]

    # add the record
    if subdomain == "":
        name = "_acme-challenge"
    else:
        name = "_acme-challenge.{}".format(subdomain)
    SESSION.post(
        RECORD_ADD_EP,
        data={
            "domain": managed_domain,
            "name": name,
            "type": "TXT",
            "data": '"{}"'.format(TOKEN),
        }).raise_for_status()


def post_hook():
    """Remove the added TXT record."""

    managed_domain = AUTH_OUTPUT
    records = SESSION.get(
        RECORD_LIST_EP, params={
            "domain": managed_domain,
        }).json()
    txt_records = [r for r in records if r["type"] == "TXT"]
    record = [r for r in txt_records if r["data"] == '"{}"'.format(TOKEN)][0]
    SESSION.post(
        RECORD_DEL_EP,
        data={
            "domain": managed_domain,
            "RECORDID": record["RECORDID"]
        }).raise_for_status()


if __name__ == "__main__":
    main()