# _*_coding=utf-8 _*_
# @author 云深沾衣
# @date 2021/11/5 16:33
import re
import subprocess
from datetime import datetime
import requests
class DomainCheck:
def __init__(self, domain_file='domain_file.txt', log_file='log.txt'):
self.domain_file = domain_file
self.log_file = log_file
check_days = 5
dingding_token = 'your_dingding_token'
dingding_url = 'https://oapi.dingtalk.com/robot/send?access_token='
def check_domain_ssl_status(self):
with open(self.domain_file) as f:
self.write_log_to_file('<------ ' + str(datetime.now()) + ' ------>' + '\n')
for line in f:
line = line.replace('\n', '').replace('\r', '').strip()
if not line.startswith("#") and len(line) > 0:
expire_date = self.get_cert_expire_date(line)
content = line + ' : ' + str(expire_date) + '\n'
self.write_log_to_file(content)
if expire_date <= self.check_days:
self.send_to_dingding(content)
def send_to_dingding(self, content):
content_data = {
"msgtype": "text",
"text": {
"content": content
}
}
requests.post(self.dingding_url + self.dingding_token, json=content_data)
def write_log_to_file(self, content):
with open(self.log_file, mode='a') as f:
f.write(content)
@staticmethod
def parse_time(date_str):
return datetime.strptime(date_str, "%b %d %H:%M:%S %Y GMT")
@staticmethod
def get_re_match_result(pattern, string):
match = re.search(pattern, string)
return match.group(1)
def get_cert_info(self, domain):
"""获取证书信息"""
cmd = f"curl -Ivs https://{domain} --connect-timeout 10" # 这儿的 curl 应该使用最新版
exitcode, output = subprocess.getstatusoutput(cmd)
try:
expire_date = self.get_re_match_result('expire date: (.*)', output)
except:
content = domain + ": This domain check failed , please check reason"
self.send_to_dingding(content)
expire_date = "Aug 7 00:28:51 2020 GMT"
expire_date = self.parse_time(expire_date)
return expire_date
def get_cert_expire_date(self, domain):
"""获取证书剩余时间"""
expire_date = self.get_cert_info(domain)
# 剩余天数
return (expire_date - datetime.now()).days
domain_check = DomainCheck('/opt/sh/text.txt', '/opt/sh/log.txt')
domain_check.check_domain_ssl_status()