관리-도구
편집 파일: notification.py
#!/opt/cloudlinux/venv/bin/python3 import html import os import base64 import time from dataclasses import dataclass from datetime import timedelta, datetime from email.header import Header from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from prettytable import PrettyTable, ALL from wmt.common.const import WMT_TEMPLATES_DIR, LICENSE_EXPIRED_FAREWELL_LETTER_MARKER, JWT_TOKEN from wmt.common.exceptions import WmtEmailException from clcommon import clemail from clcommon.mail_helper import MailHelper from clcommon.lib.network import get_hostname, get_ip_addr HEADERS_REPORT_MAPPING = { 'summary_report': ['All', 'Successful', 'Failed', 'Undone', 'Average time (ms)'], 'error_report': ['URL', 'Errors', 'Error codes'], 'duration_report': ['URL', 'Average time (ms)'] } class SupportedNotificationTypes: ALERT = 'alert' REPORT = 'report' FAREWELL = 'farewell' @dataclass class Email: subject: str text_sample: str html_sample: str @dataclass class EmailReport: text_option: str html_option: str @dataclass class EmailTemplate: target_mail: str from_mail: str summary_report: EmailReport = None error_report: EmailReport = None duration_report: EmailReport = None target_name: str = 'Administrator' locale: str = 'en_US' @staticmethod def _get_hostname(): return get_hostname() or 'UNKNOWN' @staticmethod def get_ip_addr(hostname): if not hostname: return 'UNKNOWN' return get_ip_addr(hostname) or 'UNKNOWN' @staticmethod def _get_logo(): logo_path = os.path.join(WMT_TEMPLATES_DIR, 'logo.png') with open(logo_path, 'rb') as logo: logo_img_encoded = base64.b64encode(logo.read()).decode('utf-8') return logo_img_encoded def to_text_template(self, date, notify_type): """ Convert to txt template keys see wmt_notify.txt """ hostname = self._get_hostname() template = { 'TONAME': self.target_name, 'DATE': date, 'HOSTNAME': hostname, 'IP_ADDR': self.get_ip_addr(hostname) } if notify_type in [SupportedNotificationTypes.REPORT, SupportedNotificationTypes.ALERT]: template.update({ 'ERROR_REPORT': self.error_report.text_option }) if notify_type == SupportedNotificationTypes.REPORT: template.update({ 'SUMMARY_REPORT': self.summary_report.text_option, 'DURATION_REPORT': self.duration_report.text_option, }) return template def to_html_template(self, date, notify_type): """ Convert to html template keys see wmt_notify.html """ hostname = self._get_hostname() template = { 'TONAME': self.target_name, 'DATE': date, 'LOGO': self._get_logo(), 'HOSTNAME': hostname, 'IP_ADDR': self.get_ip_addr(hostname) } if notify_type in [SupportedNotificationTypes.REPORT, SupportedNotificationTypes.ALERT]: template.update({ 'ERROR_HTML_REPORT': self.error_report.html_option }) if notify_type == SupportedNotificationTypes.REPORT: template.update({ 'SUMMARY_HTML_REPORT': self.summary_report.html_option, 'DURATION_HTML_REPORT': self.duration_report.html_option }) return template @dataclass class Notifier: target_email: str from_email: str report: dict notification_type: str @property def period(self) -> str: if self.notification_type == SupportedNotificationTypes.REPORT: return datetime.strftime(datetime.now() - timedelta(days=1), '%Y-%m-%d') elif self.notification_type == SupportedNotificationTypes.ALERT: return datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S') elif self.notification_type == SupportedNotificationTypes.FAREWELL: if not os.path.exists(JWT_TOKEN): return '' seconds = time.time() - os.path.getmtime(JWT_TOKEN) days = int(seconds // (3600 * 24)) if days > 0: return f'{str(days)} day(s) ago' else: hours = int(seconds // 3600) return f'{str(hours)} hour(s) ago' else: raise NotImplementedError(f'unexpected notifier type: {self.notification_type}') @staticmethod def create_farewell_letter_marker(): open(LICENSE_EXPIRED_FAREWELL_LETTER_MARKER, 'w').close() def notify(self): mail_manager = MailHelper() template = self._generate_template_data() message = self._generate_final_message(template) mail_manager.sendmail(template.from_mail, [template.target_mail], message) if self.notification_type == SupportedNotificationTypes.FAREWELL: self.create_farewell_letter_marker() def _get_templates(self): if self.notification_type == SupportedNotificationTypes.REPORT: return os.path.join(WMT_TEMPLATES_DIR, 'wmt_notify.txt'), \ os.path.join(WMT_TEMPLATES_DIR, 'wmt_notify.html') elif self.notification_type == SupportedNotificationTypes.ALERT: return os.path.join(WMT_TEMPLATES_DIR, 'wmt_alert.txt'), \ os.path.join(WMT_TEMPLATES_DIR, 'wmt_alert.html') elif self.notification_type == SupportedNotificationTypes.FAREWELL: return os.path.join(WMT_TEMPLATES_DIR, 'wmt_farewell_solo.txt'), \ os.path.join(WMT_TEMPLATES_DIR, 'wmt_farewell_solo.html') else: raise NotImplementedError(f'unexpected notifier type: {self.notification_type}') @staticmethod def _get_table_headers(section): return HEADERS_REPORT_MAPPING[section] def _generate_tables(self, section) -> EmailReport: no_domains_message = f'No domains for {self.period}' text_table = self._get_table('text', section) html_table = self._get_table('html', section) text_final_str = text_table.get_string() if text_table else no_domains_message html_final_str = html_table.get_html_string(format=True, border=True, hrules=ALL, vrules=ALL) if html_table else no_domains_message return EmailReport( text_option=text_final_str, html_option=html.unescape(html_final_str.replace('text-align: center', 'text-align: left')) ) def _get_table(self, alternative, section): table_data = self.report[section] if not table_data: return None table = PrettyTable(self._get_table_headers(section)) table.align = 'l' if not isinstance(table_data, list): table_data = [table_data] for row in table_data: table.add_row(row.to_template(alternative)) return table def _generate_template_data(self) -> EmailTemplate: template = { 'target_mail': self.target_email, 'from_mail': self.from_email } if self.notification_type in [SupportedNotificationTypes.REPORT, SupportedNotificationTypes.ALERT]: template.update({ 'error_report': self._generate_tables('error_report'), }) if self.notification_type == SupportedNotificationTypes.REPORT: template.update({ 'summary_report': self._generate_tables('summary_report'), 'duration_report': self._generate_tables('duration_report') }) return EmailTemplate(**template) def _generate_email(self, report: EmailTemplate): text_template, html_template = self._get_templates() if not os.path.exists(text_template) or not os.path.exists(html_template): raise WmtEmailException(f'unable to find email templates: {text_template}, {html_template}. ' 'Try to reinstall package') try: period = self.period subject = f'Web Monitoring Tool {self.notification_type} for {period} from server {get_hostname()}' if self.notification_type == SupportedNotificationTypes.ALERT: subject = f'[{self.notification_type.upper()}] {subject}' if self.notification_type == SupportedNotificationTypes.FAREWELL: subject = f'[EXPIRED CloudLinux LICENSE] Web Monitoring Tool {self.notification_type}' subject, text_body = clemail.ClEmail.generate_mail_jinja2( text_template, templ_data=report.to_text_template(period, self.notification_type), subject=subject) _, html_body = clemail.ClEmail.generate_mail_jinja2( html_template, templ_data=report.to_html_template(period, self.notification_type), subject=subject) except clemail.jinja2.exceptions.TemplateError as e: raise WmtEmailException(f'cannot generate email, reason: {e}') return Email(subject=subject, text_sample=text_body, html_sample=html_body) def _generate_final_message(self, template_data: EmailTemplate): email = self._generate_email(template_data) text_body = email.text_sample.encode('utf-8', 'xmlcharrefreplace').decode('utf-8') html_body = email.html_sample.encode('utf-8', 'xmlcharrefreplace').decode('utf-8') message = MIMEMultipart('alternative') message.attach(MIMEText(text_body, 'plain', 'utf-8')) message.attach(MIMEText(html_body, 'html', 'utf-8')) message['Subject'] = Header(email.subject, 'utf-8').encode() message['From'] = template_data.from_mail message['To'] = template_data.target_mail return message