Skip to content

Commit

Permalink
add public suffix list for computing default alt_names
Browse files Browse the repository at this point in the history
  • Loading branch information
plinss committed May 7, 2019
1 parent 37f5db1 commit b419786
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 3 deletions.
4 changes: 3 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,9 @@ The name of each certificate is used as the name of the certificate files.
so that associated DNS updates happen in the correct zone.
The zone name may be used directly by specifying ``"@"`` for the host name.
Multiple zones may be specified.
The default value is ``{ common_name: ["@"] }``.
The default value is the common name of the certificate in the zone of the first registered domain name according to the [Public Suffix List](https://publicsuffix.org/).
For example, if the common name is "example.com", the default ``alt_names`` will be: ``{"example.com": ["@"] }``;
if the common name is "foo.bar.example.com", the default ``alt_names`` will be: ``{ "example.com": ["foo.bar"] }``.
* ``services`` specifies the list of services to be reloaded when the certificate is issued, renewed, or modified.
This may be omitted.
* ``dhparam_size`` specifies the number of bits to use for custom Diffie-Hellman paramaters for the certificate.
Expand Down
82 changes: 80 additions & 2 deletions acmebot
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class AcmeManager(object):
def __init__(self):
self.script_dir = os.path.dirname(os.path.realpath(__file__))
self.script_name = os.path.basename(__file__)
self.script_version = '2.3.1'
self.script_version = '2.3.2'

self._color_codes = {
'black': 30,
Expand Down Expand Up @@ -297,6 +297,7 @@ class AcmeManager(object):
'acme_directory_url': 'https://acme-v02.api.letsencrypt.org/directory',
'reload_zone_command': '/etc/bind/reload-zone.sh',
'nsupdate_command': '/usr/bin/nsupdate',
'public_suffix_list_url': 'https://publicsuffix.org/list/public_suffix_list.dat',
'verify': None
},
'directories': {
Expand Down Expand Up @@ -1488,6 +1489,8 @@ class AcmeManager(object):
self._error('Certificate ', certificate_name, ' already configured with private key\n')
del self.config['certificates']

self._load_public_suffix_list()

for private_key_name in private_keys:
key_certificates = private_keys[private_key_name].get('certificates', {})
if (not key_certificates):
Expand All @@ -1496,7 +1499,8 @@ class AcmeManager(object):
for certificate_name in key_certificates:
common_name = key_certificates[certificate_name].get('common_name', certificate_name)
if ('alt_names' not in key_certificates[certificate_name]):
private_keys[private_key_name]['certificates'][certificate_name]['alt_names'] = {common_name: ['@']}
registered_name, host_name = self._split_registered_domain(common_name)
private_keys[private_key_name]['certificates'][certificate_name]['alt_names'] = {registered_name: [host_name]}
elif ('@' in key_certificates[certificate_name]['alt_names']):
key_certificates[certificate_name]['alt_names'][common_name] = key_certificates[certificate_name]['alt_names']['@']
del key_certificates[certificate_name]['alt_names']['@']
Expand Down Expand Up @@ -1635,6 +1639,80 @@ class AcmeManager(object):
if (self.acme_client):
del self.acme_client

def _load_public_suffix_list(self):
resource_dir = os.path.join(self.script_dir, self._directory('resource'))
self._makedir(resource_dir, 0o600)
public_suffix_list_path = os.path.join(resource_dir, 'public_suffix_list.dat')
fetch = True
last_update = None
if (os.path.isfile(public_suffix_list_path)):
last_update = datetime.datetime.utcfromtimestamp(os.path.getmtime(public_suffix_list_path))
if ((datetime.datetime.utcnow() - last_update).days < 1):
fetch = False
self._detail('Cached public suffix list less than one day old\n')
if (fetch):
public_suffix_list_url = self._setting('public_suffix_list_url')
self._detail('Fetching public suffix list from ', public_suffix_list_url, '\n')
request = urllib.request.Request(url=public_suffix_list_url)
if (last_update):
request.add_header('If-Modified-Since', last_update.strftime('%a, %d %b %Y %H:%M:%S GMT'))
try:
with urllib.request.urlopen(request) as response:
with open(public_suffix_list_path, 'w') as public_suffix_list_file:
public_suffix_list_file.write(response.read().decode('utf-8'))
except urllib.error.HTTPError as error:
if ((400 <= error.code) and (error.code < 500)):
self._warn('Unable to retrieve public suffix list from ', public_suffix_list_url, ' HTTP error: ', error.code, ' ', error.reason, '\n',
self._indent(error.read()), '\n')
elif (304 == error.code):
self._detail('Public suffix list not modified\n')
else:
self._warn('Unable to retrieve public suffix list from ', public_suffix_list_url, ' HTTP error: ', error.code, ' ', error.reason, '\n')

self._public_suffixes = {}
self._public_suffix_exceptions = {}
if (os.path.isfile(public_suffix_list_path)):
with open(public_suffix_list_path) as public_suffix_list_file:
lines = public_suffix_list_file.read().splitlines()
for line in lines:
line = line.strip()
if ((not line) or line.startswith('//')):
continue
entry = line.split()[:1][0]
list = self._public_suffixes
if (entry.startswith('!')):
entry = entry[1:]
list = self._public_suffix_exceptions
entry_parts = entry.split('.')
for part in reversed(entry_parts):
if part not in list:
list[part] = {}
list = list[part]

def _split_registered_domain(self, domain_name):
registered_parts = []
parts = domain_name.split('.')
list = self._public_suffixes
exceptions = self._public_suffix_exceptions
for part in reversed(parts):
registered_parts.append(part)
if ((exceptions is not None) and (part in exceptions)):
exceptions = exceptions[part]
else:
exceptions = None
if ((exceptions is not None) and (not exceptions)):
break
if ('*' in list):
list = list['*']
elif (part in list):
list = list[part]
else:
break
registered_name = '.'.join(reversed(registered_parts))
if (registered_name == domain_name):
return (registered_name, '@')
return (registered_name, domain_name[:-(len(registered_name) + 1)])

def _is_wildcard_auth(self, authorization_resource):
if (hasattr(authorization_resource.body, 'wildcard')):
return authorization_resource.body.wildcard
Expand Down

0 comments on commit b419786

Please sign in to comment.