관리-도구
편집 파일: dom_collector.py
# -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT import base64 import os import subprocess import sys import lvectllib from clcommon.clpwd import ClPwd from clcommon.const import Feature from clcommon.cpapi import is_panel_feature_supported, list_users, userdomains class DomainCollector: """ Class for print panel users domain docroots. See LU-1751 for details """ def __init__(self): self.is_not_in_lve = not bool(os.environ.get('DOM_COLL_RUNNING_IN_LVE')) self.is_not_limited_process = self.is_not_limited() self._clpwd = ClPwd() @staticmethod def is_not_limited(): is_limited_by_lve = bool(os.environ.get('DOM_COLL_RUNNING_IN_LVE')) is_limited_by_nice = bool(os.environ.get('DOM_COLL_RUNNING_IN_NICE')) return not is_limited_by_lve and not is_limited_by_nice def _get_panel_user_names_list(self): """ Get panel user names list :return: user names list """ panel_uids_list = list(list_users().keys()) panel_users_list = [] for uid in panel_uids_list: try: name = self._clpwd.get_names(uid)[0] panel_users_list.append(name) except ClPwd.NoSuchUserException: pass return panel_users_list def _print_users_doc_roots_and_domains(self): """ Method prints pair `domain,doc_root` for an each user on a server domain is encoded in punycode if it's needed doc_root is always encoded in base64 """ panel_users_list = self._get_panel_user_names_list() for username in panel_users_list: try: for domain, doc_root in userdomains(username): encoded_doc_root = base64.b64encode( doc_root.encode('utf-8'), ).decode('utf-8') print(f'{domain},{encoded_doc_root}') except Exception: # userdomains has very various implementations # in cpapi plugins for different panels, # so we catch common Exception instead separate exceptions pass def run(self): if self.is_not_limited_process: if is_panel_feature_supported(Feature.LVE): try: rc = self._run_self_in_lve() sys.exit(rc) except lvectllib.PyLveError as e: error_msg = f'failed to run task in lve, error: {e}' print(error_msg) sys.exit(-1) else: rc = self._run_by_nice() sys.exit(rc) else: # We in LVE, collect docroots list self._print_users_doc_roots_and_domains() # TODO: we need this bicycle because method pylve.lve_enter_pid does not work properly (surprise!) # when we call lve_enter_pid, lve limits process only by cpu usage, other parameters are unlimited @staticmethod def _run_self_in_lve(): """ Run same command in lve and set environ RUNNING_IN_LVE=True in order to check it in child process. :return: """ settings = lvectllib.make_liblve_settings( lvectllib.create_liblve_settings( ls_cpu=15, # 15 percents of CPU (NOT core) ls_cpus=0, ls_memory_phy=1024 * 1024 ** 2 # 1gb ) ) with lvectllib.temporary_lve(settings) as lve_id: return subprocess.call( ['/bin/lve_suwrapper', '-n', str(lve_id), '/usr/bin/cloudlinux_domains_collector'], env=dict(os.environ, DOM_COLL_RUNNING_IN_LVE='1')) @staticmethod def _run_by_nice(): return subprocess.call( ['/usr/bin/nice', '-n', '19', '/usr/bin/cloudlinux_domains_collector'], env=dict( os.environ, DOM_COLL_RUNNING_IN_NICE='1', ) )