관리-도구
편집 파일: snapshot.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import json import logging import os import pwd import shutil import sys import zlib from contextlib import contextmanager from typing import List, Optional # NOQA SNAPSHOT_PATH = '/var/lve/snapshots' SNAPSHOT_EXT = '.snapshot' SNAPSHOT_EXT_LEN = len(SNAPSHOT_EXT) class Snapshot(object): """ This class is responsible for loading and saving snapshot files for the interval The files will be saved in a format of: /var/lve/snapshots/[uid]/dump_time.snapshot dump_time is the timestamp/integer The directories /var/lve/snaphosts/[uid] and the dump files itself will be owned by user They will not be readable by other users. """ def __init__(self, incident, compresslevel=1): # type: (dict, int) -> None self.compresslevel = compresslevel self.incident = incident self.log = logging.getLogger('lib-snapshot') def save(self, data): # type: (dict) -> str dump_date = data['dump_time'] assert dump_date is not None # convert possible non-ascii data to unicode self._replace_unicode_data(data) json_compressed = zlib.compress(json.dumps(data).encode(), self.compresslevel) with self.create_file(dump_date) as f: f.write(json_compressed) self.log.debug('Snapshot dumped to file %s', f.name) return f.name @staticmethod def _to_unicode(obj): if isinstance(obj, bytes): return obj.decode('utf-8', 'replace') return obj def _replace_unicode_data(self, data): # type: (dict) -> None u_queries = [] for query in data.get('snap_sql', []): u_queries.append(list(map(self._to_unicode, query))) data['snap_sql'] = u_queries u_urls = [] for http in data.get('snap_http', []): u_urls.append(list(map(self._to_unicode, http))) data['snap_http'] = u_urls def get_file_list(self): # type: () -> List[str] dir_ = self.get_dir() if os.path.isdir(dir_): return os.listdir(dir_) return [] def get_ts_list(self, from_ts, to_ts): # type: (Optional[int], Optional[int]) -> List[int] """ Return ordered list of timestamps when snapshots for this use were created :param from_ts: :param to_ts: :return: list of timesteamps, ordered for that period """ return self.snapshot_filter(self.get_file_list(), from_ts, to_ts) def get_snapshots(self, from_ts, to_ts): # type: (Optional[int], Optional[int]) -> list """ Get all snapshot for a period :param from_ts :param to_ts :return: list of snapshots, ordered by date """ result = [] for ts in self.get_ts_list(from_ts, to_ts): try: filename = self.get_file_name(self.ts_to_name(ts)) if not os.geteuid(): with drop_privileges(self.incident["uid"]): content = self.read_file_content(filename) else: content = self.read_file_content(filename) result.append(json.loads(content)) except (IOError, ValueError, OSError) as ve: self.log.warning( "Corrupted file: %s (%s)", self.get_file_name(self.ts_to_name(ts)), str(ve), ) return result def read_file_content(self, filename): # type: (str) -> str with open(filename, 'rb') as f: content = f.read() try: content = zlib.decompress(content) except zlib.error: compressed_content = zlib.compress(content, self.compresslevel) with open(filename, 'wb') as f: f.write(compressed_content) return content.decode() def get_incident_snapshots(self): # type: () -> list """ Load all snapshots for given incident :return: list of snapshots """ return self.get_snapshots(self.incident["incident_start_time"], self.incident["incident_end_time"]) def get_dir(self): # type: () -> str return os.path.join(SNAPSHOT_PATH, str(self.incident["uid"])) def get_file_name(self, name): # type: (str) -> str return os.path.join(self.get_dir(), name) def create_file(self, dump_date): # type: (int) -> open() """ create file, change its ownership & permissions if needed. Create directories if needed as well :param dump_date: int timestamp used as file name :return: open File object """ dir_ = self.get_dir() if not os.path.exists(dir_): try: # sacrifice security if we cannot setup ownership properly os.makedirs(dir_) os.chmod(dir_, 0o751) os.chown(dir_, self.incident["uid"], 0) except (IOError, OSError) as e: self.log.error('Unable to create dir %s (%s)', dir_, str(e)) file_name = self.get_file_name(self.ts_to_name(dump_date)) with drop_privileges(self.incident["uid"]): file_ = open(file_name, 'wb') # pylint: disable=consider-using-with try: os.fchmod(file_.fileno(), 0o400) except (IOError, OSError) as e: self.log.error('Unable to set file permissions %s (%s)', file_name, str(e)) return file_ def delete_old(self, to_ts): # type: (int) -> None """ Delete old snapshots. If there are no more :param to_ts: up to which timestamp to remove snapshots :return: None """ _dir = self.get_dir() files = os.listdir(_dir) all_snapshots = self.snapshot_filter(files) ts_to_remove = self.snapshot_filter(files, to_ts=to_ts) if all_snapshots == ts_to_remove: shutil.rmtree(_dir, ignore_errors=True) else: for ts in ts_to_remove: os.remove(self.get_file_name(self.ts_to_name(ts))) @staticmethod def get_ts(file_): # type: (str) -> Optional[int] if file_.endswith(SNAPSHOT_EXT): ts = file_[0:-SNAPSHOT_EXT_LEN] if ts.isdigit(): try: return int(ts) except ValueError: pass return None @staticmethod def snapshot_filter(files, from_ts=None, to_ts=None): # type: (List[str], Optional[int], Optional[int]) -> List[int] if from_ts is None: from_ts = 0 if to_ts is None: to_ts = sys.maxsize result = [] for filename in files: ts = Snapshot.get_ts(filename) if ts is not None and from_ts <= ts <= to_ts: result.append(ts) return sorted(result) @staticmethod def ts_to_name(ts): # type: (int) -> str return str(ts) + SNAPSHOT_EXT @contextmanager def drop_privileges(uid): old_uid, old_gid, old_groups = os.getuid(), os.getgid(), os.getgroups() gid = pwd.getpwnam('nobody')[3] os.setgroups([]) os.setegid(gid) os.seteuid(uid) try: yield finally: os.seteuid(old_uid) os.setegid(old_gid) os.setgroups(old_groups)