From df505fa2b408b4c2ea65db8282fe7f3f21095774 Mon Sep 17 00:00:00 2001 From: Peter Linss Date: Mon, 14 Dec 2020 23:35:04 -0800 Subject: [PATCH] Select CT log based on certificate notAfter date --- acmebot | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/acmebot b/acmebot index f2ad501..cb5e62f 100755 --- a/acmebot +++ b/acmebot @@ -1474,20 +1474,20 @@ class AcmeManager(object): def _sct_datetime(self, sct_timestamp): return datetime.datetime.utcfromtimestamp(sct_timestamp / 1000) - def _get_ct_log(self, ct_log_name): + def _get_ct_log(self, ct_log_name, certificate): ct_log = self._config('ct_logs', ct_log_name) if (isinstance(ct_log, list)): - now = datetime.datetime.utcnow() + not_after = self._datetime_from_asn1_generaltime(certificate.get_notAfter()) for log in ct_log: start = datetime.datetime.strptime(log.get('start', '2000-01-01T00:00:00Z'), '%Y-%m-%dT%H:%M:%SZ') end = datetime.datetime.strptime(log.get('end', '2999-01-01T00:00:00Z'), '%Y-%m-%dT%H:%M:%SZ') - if ((start <= now) and (now < end)): + if ((start <= not_after) and (not_after < end)): return log return None return ct_log def fetch_sct(self, ct_log_name, certificate, chain): - ct_log = self._get_ct_log(ct_log_name) + ct_log = self._get_ct_log(ct_log_name, certificate) if (ct_log and ('url' in ct_log)): certificates = ([base64.b64encode(self._certificate_bytes(certificate)).decode('ascii')] + [base64.b64encode(self._certificate_bytes(chain_certificate)).decode('ascii') for chain_certificate in chain]) @@ -1512,9 +1512,9 @@ class AcmeManager(object): self._error('Unknown CT log: ', ct_log_name, '\n') return None - def load_sct(self, file_name, key_type, ct_log_name): + def load_sct(self, file_name, key_type, ct_log_name, certificate): try: - ct_log = self._get_ct_log(ct_log_name) + ct_log = self._get_ct_log(ct_log_name, certificate) if (ct_log and ('id' in ct_log)): sct_file_path = self._file_path('sct', file_name, key_type, ct_log_name=ct_log_name) with open(sct_file_path, 'rb') as sct_file: @@ -1532,8 +1532,8 @@ class AcmeManager(object): pass return None - def save_sct(self, file_name, key_type, ct_log_name, sct_data): - ct_log = self._get_ct_log(ct_log_name) + def save_sct(self, file_name, key_type, ct_log_name, sct_data, certificate): + ct_log = self._get_ct_log(ct_log_name, certificate) if (ct_log): with FileTransaction('sct', self._file_path('sct', file_name, key_type, ct_log_name=ct_log_name), chmod=0o640, mode='wb') as transaction: extensions = base64.b64decode(sct_data.extensions) @@ -2892,11 +2892,11 @@ class AcmeManager(object): if (sct_data): self._detail(ct_log_name, ' has SCT for ', key_type.upper(), ' certificate ', certificate_name, ' at ', self._sct_datetime(sct_data.timestamp).isoformat(), '\n') - existing_sct_data = self.load_sct(certificate_name, key_type, ct_log_name) + existing_sct_data = self.load_sct(certificate_name, key_type, ct_log_name, certificate) if (sct_data and ((not existing_sct_data) or (sct_data != existing_sct_data))): self._info('Saving Signed Certificate Timestamp for ', key_type.upper(), ' certificate ', certificate_name, ' from ', ct_log_name, '\n') - transactions.append(self.save_sct(certificate_name, key_type, ct_log_name, sct_data)) + transactions.append(self.save_sct(certificate_name, key_type, ct_log_name, sct_data, certificate)) self._add_hook('sct_installed', key_name=private_key_name, key_type=key_type, certificate_name=certificate_name, ct_log_name=ct_log_name, sct_file=self._file_path('sct', certificate_name, key_type, ct_log_name=ct_log_name))