관리-도구
편집 파일: cleanup.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ from defence360agent.contracts.messages import MessageType from defence360agent.feature_management.constants import AV, FULL from defence360agent.feature_management.lookup import feature from defence360agent.rpc_tools.lookup import ( CommonEndpoints, RootEndpoints, bind, ) from defence360agent.contracts.permissions import ( MS_CLEAN_REQUIRES_MYIMUNIFY_PROTECTION, check_permission, ) from defence360agent.rpc_tools.validate import validate_av_plus_license from imav.malwarelib.cleanup.cleaner import MalwareCleanupProxy from imav.malwarelib.cleanup.storage import restore_hits from imav.malwarelib.model import MalwareHit from imav.malwarelib.utils.endpoints import MaliciousEndpointStatus @feature(AV, [FULL]) class CleanupEndpoints(CommonEndpoints): @bind("malware", "malicious", "cleanup") @validate_av_plus_license async def cleanup(self, ids, standard_only=False, user=None): check_permission(MS_CLEAN_REQUIRES_MYIMUNIFY_PROTECTION, user) hits = MalwareHit.malicious_select(ids, user=user, cleanup=True) if hits: await self._sink.process_message( MessageType.MalwareCleanupTask( hits=hits, initiator=user, standard_only=standard_only, ) ) @bind("malware", "malicious", "cleanup-all") @validate_av_plus_license async def cleanup_all(self, standard_only=False, user=None): check_permission(MS_CLEAN_REQUIRES_MYIMUNIFY_PROTECTION, user) hits = MalwareHit.malicious_select(user=user, cleanup=True) if hits: await self._sink.process_message( MessageType.MalwareCleanupTask( hits=hits, initiator=user, standard_only=standard_only, ) ) @bind("malware", "malicious", "restore-original") @validate_av_plus_license async def restore_original(self, ids, user=None): hits = MalwareHit.malicious_select(ids, user=user, restore=True) succeded, failed = await restore_hits(hits, self._sink) return MaliciousEndpointStatus(succeded, failed) class CleanupRootEndpoints(RootEndpoints): def __init__(self, sink): super().__init__(sink) self._proxy = MalwareCleanupProxy() @bind("malware", "cleanup", "status") async def cleanup_status(self): progress = self._proxy.get_progress() if progress is None: status = {"status": "stopped"} else: status = {"status": "running", "progress": progress} return {"items": status} @bind("malware", "user", "cleanup") @validate_av_plus_license async def user_cleanup(self, user, standard_only=False): users = user # according to schema 'user' is actually a list of users hits = MalwareHit.malicious_select(user=users, cleanup=True) if hits: for user in users: user_hits = [hit for hit in hits if hit.user == user] if user_hits: await self._sink.process_message( MessageType.MalwareCleanupTask( hits=user_hits, initiator=user, standard_only=standard_only, ) ) @bind("malware", "user", "restore-original") @validate_av_plus_license async def user_restore_original(self, user): hits = MalwareHit.malicious_select(user=user, restore=True) succeded, failed = await restore_hits(hits, self._sink, user) return MaliciousEndpointStatus(succeded, failed)