관리-도구
편집 파일: wmt-api.py
#!/opt/cloudlinux/venv/bin/python3 import json import fcntl import os import time import sys import psutil import signal from argparse import ArgumentParser from datetime import datetime, timedelta from wmt.common.const import ( WMT_SCANNER_SERVICE, WMT_LOCK_FILE, CONFIG_PATH, WMT_DB ) from wmt.common.report import generate_report, report_dict from wmt.db import setup_database from wmt.common.service import set_service_state from wmt.common.notification import Notifier, SupportedNotificationTypes from wmt.common import cfg from cllicense import CloudlinuxLicenseLib from wmt.common.utils import send_report_to_clickhouse, manage_crons def print_result_and_exit(result='success', exit_code=0, **extra): message = { 'result': result, 'timestamp': time.time() } message.update(extra) print(json.dumps(message, indent=2, sort_keys=True)) sys.exit(exit_code) def get_status(): if not os.path.exists(WMT_LOCK_FILE): return 'stopped' with open(WMT_LOCK_FILE) as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) # locked except OSError: return 'started' return 'stopped' def get_scanner_pid(): if not os.path.exists(WMT_LOCK_FILE): return with open(WMT_LOCK_FILE) as f: pid = int(f.read().strip()) return pid if psutil.pid_exists(pid) else None def get_config(): return cfg.to_dict() def change_config(new_json_config): config = cfg.modify(new_json_config) scanner_pid = get_scanner_pid() if scanner_pid is not None: os.kill(scanner_pid, signal.SIGUSR1) return config def run(): args = ArgumentParser() args.add_argument('--config-get', action='store_true') args.add_argument('--config-change', type=str) args.add_argument('--report-get', action='store_true') args.add_argument('--send-clickhouse', action='store_true') # just for 24h report by mail executed by cron args.add_argument('--send-email', action='store_true') args.add_argument('--status', action='store_true') args.add_argument('--start', action='store_true') args.add_argument('--stop', action='store_true') opts = args.parse_args() if opts.config_get: config = get_config() config['default_report_email'] = cfg.default_report_email if config.get("ignore_list"): tmp_val = ",".join(config.get("ignore_list")) config['ignore_list'] = tmp_val print_result_and_exit(config=config) elif opts.config_change: config = change_config(opts.config_change) config['default_report_email'] = cfg.default_report_email print_result_and_exit(config=config) elif opts.report_get: engine = setup_database(readonly=os.path.exists(WMT_DB)) # 24h back from NOW start, end = datetime.now() - timedelta(days=1), datetime.now() report = report_dict(generate_report(engine, start, end)) print_result_and_exit(report=report, date=datetime.now().strftime('%Y-%m-%d %H:%M')) elif opts.send_clickhouse: engine = setup_database(readonly=True) # for the day before start, end = datetime.now().date() - timedelta(days=1), datetime.now().date() send_report_to_clickhouse(report_dict(generate_report(engine, start, end))) print_result_and_exit() elif opts.send_email: is_scanner_running = get_status() == 'started' if is_scanner_running and cfg.cfg.summary_notification_enabled: if not CloudlinuxLicenseLib().get_license_status(): print_result_and_exit('CloudLinux license is expired. ' 'You may buy new license here: https://lp.cloudlinux.com/cloudlinux-os-solo', exit_code=1) engine = setup_database() start, end = datetime.now().date() - timedelta(days=1), datetime.now().date() report = generate_report(engine, start, end) Notifier( target_email=cfg.target_email, from_email=cfg.from_email, report=report, notification_type=SupportedNotificationTypes.REPORT, ).notify() print_result_and_exit() else: print_result_and_exit('Summary report email will not be sent! ' f'Please, ensure "{WMT_SCANNER_SERVICE}" service is running and ' f'alert_notifications is enabled in "{CONFIG_PATH}"', exit_code=1) elif opts.status: status = get_status() print_result_and_exit(status=status) elif opts.start: set_service_state(WMT_SCANNER_SERVICE, 'start') manage_crons(status=True) print_result_and_exit() elif opts.stop: set_service_state(WMT_SCANNER_SERVICE, 'stop') manage_crons(status=False) print_result_and_exit() else: args.print_help() def main(): try: run() except Exception as e: print_result_and_exit(result='error', exit_code=1, context=str(e)) if __name__ == '__main__': main()