관리-도구
편집 파일: schedule_watcher.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ from contextlib import suppress from logging import getLogger from pathlib import Path from defence360agent import utils from defence360agent.contracts.config import ANTIVIRUS_MODE from defence360agent.contracts.config import MalwareScanSchedule as Config from defence360agent.contracts.config import ( MalwareScanScheduleInterval as Interval, ) from defence360agent.contracts.config import SystemConfig from defence360agent.contracts.license import LicenseCLN from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from imav.malwarelib.utils import reset_malware_schedule logger = getLogger(__name__) AVAILABLE_INTERVALS = [ Interval.NONE, Interval.DAY, Interval.WEEK, Interval.MONTH, ] AVP_INTERVALS = [ Interval.NONE, Interval.MONTH, ] def allowed_schedule_interval(): valid_avp = LicenseCLN.is_valid_av_plus() condition = (not ANTIVIRUS_MODE) or valid_avp return AVAILABLE_INTERVALS if condition else AVP_INTERVALS class ScheduleWatcher(MessageSink, MessageSource): def __init__(self): self._cron = self._read_cron() self._update_cron() self._sink = None async def create_sink(self, loop): pass async def create_source(self, loop, sink): self._sink = sink @staticmethod def _read_cron(): with suppress(FileNotFoundError): return Path(Config.CRON_PATH).read_text() @staticmethod def _write_cron(job): path = Path(Config.CRON_PATH) with suppress(FileNotFoundError): path.unlink() if job: path.touch(mode=0o600) path.write_text(job) def _get_job(self): if Config.INTERVAL == Interval.NONE: return None elif Config.INTERVAL not in AVAILABLE_INTERVALS: logger.error("Unsupported interval value: %s", Config.INTERVAL) return self._cron elif Config.INTERVAL not in allowed_schedule_interval(): logger.info("Malware schedule interval is being reset to defaults") reset_malware_schedule() schedule_schema = { Interval.DAY: (Config.HOUR, "*", "*"), Interval.WEEK: (Config.HOUR, "*", Config.DAY_OF_WEEK), Interval.MONTH: (Config.HOUR, Config.DAY_OF_MONTH, "*"), } schedule = schedule_schema[Config.INTERVAL] job = Config.CRON_STRING.format(*schedule, cmd=Config.CMD) return job def _update_cron(self): job = self._get_job() if self._cron != job: logger.info("Update background scan schedule") self._write_cron(job) self._cron = job return True return False async def _stop_background_scan(self): await self._sink.process_message( MessageType.MalwareScanQueueStopBackground() ) @expect(MessageType.ConfigUpdate) @utils.log_error_and_ignore() async def schedule_config_updated(self, message): if isinstance(message["conf"], SystemConfig): if self._update_cron() and not self._cron: await self._stop_background_scan()