관리-도구
편집 파일: utils.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import configparser import io import os import json import fcntl import struct import time from typing import Optional, Dict, AnyStr, Union from jwt import decode, exceptions from clcommon.lib.consts import DEFAULT_JWT_ES_TOKEN_PATH from clcommon.lib.whmapi_lib import WhmApiRequest, WhmApiError from clcommon.clcaptain import mkdir as mkdir_p from clcommon.utils import process_is_running from clconfig.cagefs_statistics_config import check_cagefs_initialized from cldetectlib import CL_CONFIG_FILE from secureio import write_file_via_tempfile def dummy_none_function(*a, **kw): return None try: from clselect.clselectctl import interpreter_versions_short_summary from clselector.clpassenger_detectlib import is_clpassenger_active except ImportError: interpreter_versions_short_summary = dummy_none_function is_clpassenger_active = dummy_none_function _CL_STATISTICS_SECTION = "license_check" _CL_STATISTICS_COLLECT_STATE_OPTION = "cl_statistics_enabled" _CL_STATISTICS_COLLECT_RPM_STATE_OPTION = "cl_statistics_rpm_enabled" _CL_STATISTICS_DIR = '/var/lve' _CL_STATISTICS_SEND_STATUS_FILE = os.path.join(_CL_STATISTICS_DIR, 'summary_status.json') _CL_STATISTICS_LOCK_PATH = '/var/run/cloudlinux_summary.send.lock' _CL_STATISTICS_LOCK_FILE = None CL_PLUS_SENDER_FILE_PATH = '/usr/share/cloudlinux/cl_plus/clplus_sender.py' ALT_PYTHON_VIRTUALENV_BIN = '/opt/cloudlinux/venv/bin/virtualenv' class SummaryStatus: """ Status of both, collecting and sending statistics If process still collects statistics -> IN_PROGRESS If statistics collected and sent correctly -> SUCCESS If any error during collecting or sending -> FAILED """ SUCCESS = 'success' IN_PROGRESS = 'in_progress' FAILED = 'failed' def is_virtualenv_installed(): """ Checks is virtualenv installed :return: True/False - installed or not """ return os.path.exists(ALT_PYTHON_VIRTUALENV_BIN) def is_locked(lock_file): """ Check if file is locked by another process without acquiring lock. IMPORTANT! This function should NOT be used to check lock acquired by the same process that executes the is_locked() function. For example, when process executes fcntl.lockf(LOCK_FILE), and then the same process executes is_locked(LOCK_FILE), the is_locked(LOCK_FILE) call returns False. Use is_locked() function to check lock acquired by another process only. :param lock_file: file to check lock on :type lock_file: file object or descriptor """ lock_data = struct.pack("hhllhh", fcntl.F_WRLCK, 0, 0, 0, 0, 0) try: lock_query = fcntl.fcntl(lock_file, fcntl.F_GETLK, lock_data) lock_status = struct.unpack("hhllhh", lock_query)[0] except (OSError, IOError): # should never happen return False return lock_status != fcntl.F_UNLCK def is_sending_process_running(acquire_lock=False): """ Check if processes collecting stats are running already (with --send option in command line) :param acquire_lock: True = acquire lock when possible :type acquire_lock: bool :return bool: True = Processes are running """ global _CL_STATISTICS_LOCK_FILE _CL_STATISTICS_LOCK_FILE = open( # pylint: disable=consider-using-with _CL_STATISTICS_LOCK_PATH, 'w', encoding='utf-8' ) if not acquire_lock: return is_locked(_CL_STATISTICS_LOCK_FILE) try: fcntl.lockf(_CL_STATISTICS_LOCK_FILE, fcntl.LOCK_EX | fcntl.LOCK_NB) except (OSError, IOError): return True return False def _get_config(): """ Retrieves ConfigParser object for /etc/sysconfig/cloudlinux file :return: """ config = configparser.ConfigParser(interpolation=None, strict=True) config.optionxform = str # make config case sensitive config.read(CL_CONFIG_FILE) return config def _write_config(config): """ write config to /etc/sysconfig/cloudlinux file :param config: configParser object to write return boolean: True|False """ try: string_fp = io.StringIO() config.write(string_fp) content = string_fp.getvalue() write_file_via_tempfile(content, CL_CONFIG_FILE, 0o644, prefix='cloudlinux_') except (OSError, IOError): return False return True def _get_config_value(parameter: str, default: bool = True) -> bool: """ Retrieves parameter's value from /etc/sysconfig/cloudlinux file, _CL_STATISTICS_SECTION section """ config = _get_config() res = default try: res = config.getboolean(_CL_STATISTICS_SECTION, parameter) except (configparser.NoSectionError, configparser.NoOptionError, ValueError): # Treat absent/missing value as default pass return res def _set_config_value(parameter: str, value: bool) -> None: """ Sets parameter's value to /etc/sysconfig/cloudlinux file, _CL_STATISTICS_SECTION section """ config = _get_config() config.set(_CL_STATISTICS_SECTION, parameter, str(int(value))) _write_config(config) def is_statistic_enabled() -> bool: """ Retrieves statistic collection status from /etc/sysconfig/cloudlinux file :return: True/False - enabled/disabled """ return _get_config_value(_CL_STATISTICS_COLLECT_STATE_OPTION) def is_statistic_rpm_enabled() -> bool: """ Retrieves rpm statistic collection status from /etc/sysconfig/cloudlinux file :return: True/False - enabled/disabled """ return _get_config_value(_CL_STATISTICS_COLLECT_RPM_STATE_OPTION) def set_statistic_collection_enabled(is_enabled: bool) -> None: """ Set statistic collection status to /etc/sysconfig/cloudlinux file :param is_enabled: True/False - enabled/disabled """ _set_config_value(_CL_STATISTICS_COLLECT_STATE_OPTION, is_enabled) def set_statistic_rpm_collection_enabled(is_enabled: bool) -> None: """ Set rpm statistic collection status to /etc/sysconfig/cloudlinux file :param is_enabled: True/False - enabled/disabled """ _set_config_value(_CL_STATISTICS_COLLECT_RPM_STATE_OPTION, is_enabled) def write_statistics_send_status_to_file(status_dict): """ Writes statistics send status to file /var/lve/summary_status.json :param status_dict: status dictionary for write to file :return: """ try: if not os.path.exists(_CL_STATISTICS_DIR): mkdir_p(_CL_STATISTICS_DIR) content = json.dumps(status_dict) # Write to file readable only for root write_file_via_tempfile(content, _CL_STATISTICS_SEND_STATUS_FILE, 0o600, prefix='cloudlinux_') except (OSError, IOError): pass def get_statistics_send_status_from_file(): """ Retrieves statistics send status from file /var/lve/summary_status.json :return: Dictionary with last send status. None if any error """ status_dict = None try: with open(_CL_STATISTICS_SEND_STATUS_FILE, encoding='utf-8') as f: s_content = f.read() status_dict = json.loads(s_content) if status_dict['result'] == SummaryStatus.IN_PROGRESS \ and not is_sending_process_running(): # something went wrong during collection status_dict['result'] = SummaryStatus.FAILED status_dict['reason'] = 'Collecting statistics was failed. Error ' \ 'report has been sent to developers and will be fixed soon' except (OSError, IOError, ValueError, AttributeError, TypeError): pass return status_dict def installed_interpreters_list(interpreter): """ Returns list of installed interpreters :param interpreter: str - name of interpreter :rtype: List of InterpreterSummary """ return [i for i in interpreter_versions_short_summary(interpreter) if i.installed] def is_python_selector_installed(): """ Checks that python selector is installed Installed if: - ea-apache24-mod-alt-passenger or alt-mod-passenger is installed - alt-python-virtualenv is installed :rtype: bool """ return is_clpassenger_active() and is_virtualenv_installed() def is_ruby_selector_installed(): """ Checks that ruby selector is installed Installed if: - ea-apache24-mod-alt-passenger or alt-mod-passenger is installed - alt-python-virtualenv is installed :rtype: bool """ return is_clpassenger_active() and is_virtualenv_installed() def is_nodejs_selector_installed(): """ Checks that nodejs selector is installed Installed if: - ea-apache24-mod-alt-passenger or alt-mod-passenger is installed - At least one version is installed :rtype: bool """ return is_clpassenger_active() and bool(installed_interpreters_list('nodejs')) def is_php_selector_installed(): """ Checks that php selector is installed Installed if: - CageFS is initialized :rtype: bool """ return bool(check_cagefs_initialized()) def get_packages_with_lve_extensions(): """ Gets packages with set lve limits via extension """ try: result = WhmApiRequest('listpkgs').call() except WhmApiError: return [] lve_extensions_packages = [item['name'] for item in result['pkg'] if '_PACKAGE_EXTENSIONS' in item and item['_PACKAGE_EXTENSIONS'] == 'lve'] return lve_extensions_packages def get_client_data_from_jwt_token(check_expiration=True) -> Optional[Dict[AnyStr, Union[AnyStr, bool]]]: """ Gets (if any) fields cl_plus and client_id from jwt token :return: decoded jwt_token value, None if error jwt_token: result of the successful decoding """ try: with open(DEFAULT_JWT_ES_TOKEN_PATH, mode='rb') as file: file_content = file.read().strip() except (OSError, IOError): return None # JWT read success try: jwt_token = decode(file_content, algorithms=['HS256'], options={'require_exp': True, "verify_exp": check_expiration, "verify_iss": True, 'verify_signature': False}, issuer='CloudLinux') return jwt_token # JWT format error except exceptions.PyJWTError: return None def is_active_cloudlinux_license(token_data): """ Checks whether license ts is expired """ if not token_data: return None if not token_data.get('exp'): return None return int(time.time()) < int(token_data.get('exp')) def get_cl_plus_sender_status() -> Optional[AnyStr]: """ Retrieves data from status of cl_plus_sender service :return: status of service, Optional[AnyStr] - 'active' - 'inactive' """ try: result = process_is_running(CL_PLUS_SENDER_FILE_PATH, False) except FileNotFoundError: result = False return 'active' if result else 'inactive'