python3 监控域名证书到期时间,并实现叮叮的及时报警

# _*_coding=utf-8 _*_
# @author 云深沾衣
# @date 2021/11/5 16:33

import re
import subprocess
from datetime import datetime
import requests


class DomainCheck:
    def __init__(self, domain_file='domain_file.txt', log_file='log.txt'):
        self.domain_file = domain_file
        self.log_file = log_file

    check_days = 5
    dingding_token = 'your_dingding_token'
    dingding_url = 'https://oapi.dingtalk.com/robot/send?access_token='

    def check_domain_ssl_status(self):
        with open(self.domain_file) as f:
            self.write_log_to_file('<------  ' + str(datetime.now()) + '    ------>' + '\n')
            for line in f:
                line = line.replace('\n', '').replace('\r', '').strip()
                if not line.startswith("#") and len(line) > 0:
                    expire_date = self.get_cert_expire_date(line)
                    content = line + ' :    ' + str(expire_date) + '\n'
                    self.write_log_to_file(content)
                    if expire_date <= self.check_days:
                        self.send_to_dingding(content)

    def send_to_dingding(self, content):
        content_data = {
            "msgtype": "text",
            "text": {
                "content": content
            }
        }
        requests.post(self.dingding_url + self.dingding_token, json=content_data)

    def write_log_to_file(self, content):
        with open(self.log_file, mode='a') as f:
            f.write(content)

    @staticmethod
    def parse_time(date_str):
        return datetime.strptime(date_str, "%b %d %H:%M:%S %Y GMT")

    @staticmethod
    def get_re_match_result(pattern, string):
        match = re.search(pattern, string)
        return match.group(1)

    def get_cert_info(self, domain):
        """获取证书信息"""
        cmd = f"curl -Ivs https://{domain} --connect-timeout 10"  # 这儿的 curl 应该使用最新版
        exitcode, output = subprocess.getstatusoutput(cmd)
        try:
            expire_date = self.get_re_match_result('expire date: (.*)', output)
        except:
            content = domain + ": This domain check failed , please check reason"
            self.send_to_dingding(content)
            expire_date = "Aug  7 00:28:51 2020 GMT"
        expire_date = self.parse_time(expire_date)
        return expire_date

    def get_cert_expire_date(self, domain):
        """获取证书剩余时间"""
        expire_date = self.get_cert_info(domain)
        # 剩余天数
        return (expire_date - datetime.now()).days


domain_check = DomainCheck('/opt/sh/text.txt', '/opt/sh/log.txt')
domain_check.check_domain_ssl_status()