관리-도구
편집 파일: cledition.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # import os import subprocess import warnings from enum import Enum from typing import AnyStr from jwt import exceptions from clcommon.lib.consts import CLN_JWT_TOKEN_PATH, CL_EDITION_FILE_FOR_USERS from clcommon.lib.jwt_token import read_jwt, decode_jwt, jwt_token_check from clcommon.clexception import FormattedException from clcommon.clcagefs import in_cagefs # The function below used to be in this module, but were moved to clcommon.utils # So, importing them just for backward compatibility from clcommon.utils import get_os_version, is_ubuntu, is_secureboot_enabled # NOQA CL_SOLO_EDITION_FILE_MARKER = '/etc/cloudlinux-edition-solo' CL_ADMIN_EDITION_FILE_MARKER = '/etc/cloudlinux-edition-admin' SHARED_PRO_EDITION_HUMAN_READABLE = 'CloudLinux OS Shared Pro' SHARED_EDITION_HUMAN_READABLE = 'CloudLinux OS Shared' SOLO_EDITION_HUMAN_READABLE = 'CloudLinux OS Solo' ADMIN_EDITION_HUMAN_READABLE = 'CloudLinux OS Admin' class SupportedEditions(Enum): """ Keeps supported CloudLinux editions """ SOLO = 'solo' SHARED = 'shared' SHARED_PRO = 'shared_pro' ADMIN = 'admin' class CLEditionDetectionError(FormattedException): def __init__(self, message, **context): FormattedException.__init__(self, { 'message': message, 'context': context }) HUMAN_READABLE_TOKEN_EDITION_PAIRS = { SOLO_EDITION_HUMAN_READABLE: SupportedEditions.SOLO.value, SHARED_PRO_EDITION_HUMAN_READABLE: SupportedEditions.SHARED_PRO.value, SHARED_EDITION_HUMAN_READABLE: SupportedEditions.SHARED.value, ADMIN_EDITION_HUMAN_READABLE: SupportedEditions.ADMIN.value } class CLEditions: @staticmethod def get_from_jwt(token=None, verify_exp=True): """ Note: be careful when modifying this method. Passing token in is used in X-Ray, ask @dkavchuk or someone else from C-Projects team for details :param token: :param verify_exp: :return: """ if token is None: token = read_jwt(CLN_JWT_TOKEN_PATH) try: jwt = decode_jwt(token, verify_exp=verify_exp) except exceptions.PyJWTError as e: raise CLEditionDetectionError( f'Unable to detect edition from jwt token: {CLN_JWT_TOKEN_PATH}. ' f'Please, make sure it is not broken, error: {e}' ) from e try: return jwt['edition'] except KeyError as e: # fallback for old format of tokens cl_plus = jwt.get('cl_plus') if cl_plus is None: raise CLEditionDetectionError( f'Unable to detect edition from jwt token: {CLN_JWT_TOKEN_PATH}. ' f'Please, make sure it is not broken, error: not enough fields for detection' ) from e return SupportedEditions.SHARED_PRO.value if cl_plus else SupportedEditions.SHARED.value @staticmethod def is_package_installed(package_name): with subprocess.Popen( ['rpm', '-q', package_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env={**os.environ, **{'LC_ALL': 'C'}}, ) as proc: out, err = proc.communicate() if proc.returncode != 0: if 'is not installed' in out: return False raise CLEditionDetectionError( f'Unable to check if package {package_name} is installed, ' f'reason: {out}, {err}' ) return True @classmethod def get_cl_edition( cls, skip_jwt_check=False, skip_marker_check=False, raw_jwt=None, verify_exp=True, ) -> str | None: """ 1. Try to detect CL edition from JWT token; If edition field is in token -> return edition If edition field is not present -> recheck via cl_plus flag (see get_from_jwt) If token is broken -> raise an error about it 2. Try to detect edition from files. If JWT edition retreival was attempted and failed, or if we're in cagefs -> get edition from CL_EDITION_FILE_FOR_USERS If a edition marker file for Solo or Admin edition is present -> return corresponding edition Otherwise, fall back to CL_EDITION_FILE_FOR_USERS again Detection from marker files may be switched off using the `skip_marker_check` parameter. @param skip_jwt_check: if True, skip checking JWT token for edition. @param skip_marker_check: if True, skip checking marker files for edition. @param raw_jwt: raw JWT token to use for edition detection. If None, token will be read from file. @param verify_exp: if True, verify JWT token expiration time. @return: Edition of CloudLinux OS or None if edition could not be determined. Note: be careful when modifying this method. It is used in X-Ray, ask @dkavchuk or someone else from C-Projects team for details. """ # Skipping both jwt and file checks is not allowed if skip_jwt_check and skip_marker_check: raise CLEditionDetectionError( "Unable to detect edition: neither jwt token check, no file marker check enabled" ) # If we have a valid JWT token, try to get edition from it if not skip_jwt_check and os.path.isfile(CLN_JWT_TOKEN_PATH): try: edition = cls.get_from_jwt(token=raw_jwt, verify_exp=verify_exp) except PermissionError: edition = user_cl_edition() if edition: return edition # If user is in cagefs, look at the CL_EDITION_FILE_FOR_USERS file first if in_cagefs(): return user_cl_edition() # Otherwise, check for edition marker files if not skip_marker_check: # jwt has no 'edition' field -> ensure it really is Solo via file if os.path.isfile(CL_SOLO_EDITION_FILE_MARKER): return SupportedEditions.SOLO.value elif os.path.isfile(CL_ADMIN_EDITION_FILE_MARKER): return SupportedEditions.ADMIN.value # Finally, if no marker files are present, try CL_EDITION_FILE_FOR_USERS return user_cl_edition() # Couldn't detect edition through any means - return None return None def is_cl_solo_edition(skip_jwt_check=True, skip_marker_check=False, verify_exp=True): """ Allow skip_jwt_check ONLY if it not critical when license is not valid Use skip_marker_check=True when validity of license is critical and fallback to file is not applicable Note: be careful when modifying this method. It is used in X-Ray and SSA, ask @dkavchuk or someone else from C-Projects team for details """ edition = CLEditions.get_cl_edition( skip_jwt_check=skip_jwt_check, skip_marker_check=skip_marker_check, verify_exp=verify_exp ) return edition is not None and edition == SupportedEditions.SOLO.value def is_cl_shared_edition(skip_jwt_check=False, skip_marker_check=False, verify_exp=True): """ Allow skip_jwt_check ONLY if it not critical when license is not valid Use skip_marker_check=True when validity of license is critical and fallback to file is not applicable """ edition = CLEditions.get_cl_edition( skip_jwt_check=skip_jwt_check, skip_marker_check=skip_marker_check, verify_exp=verify_exp ) return edition is not None and edition == SupportedEditions.SHARED.value def is_cl_shared_pro_edition(skip_jwt_check=False, skip_marker_check=False, verify_exp=True): """ Allow skip_jwt_check ONLY if it not critical when license is not valid Use skip_marker_check=True when validity of license is critical and fallback to file is not applicable """ edition = CLEditions.get_cl_edition( skip_jwt_check=skip_jwt_check, skip_marker_check=skip_marker_check, verify_exp=verify_exp ) return edition is not None and edition == SupportedEditions.SHARED_PRO.value def is_cl_shared_pro_edition_safely(**kwargs): """ Supress edition checking error and returns False """ try: return is_cl_shared_pro_edition(**kwargs) except CLEditionDetectionError: return False def is_cl_admin_edition(skip_jwt_check=False, skip_marker_check=False, verify_exp=True): """ Allow skip_jwt_check ONLY if it not critical when license is not valid Use skip_marker_check=True when validity of license is critical and fallback to file is not applicable """ edition = CLEditions.get_cl_edition( skip_jwt_check=skip_jwt_check, skip_marker_check=skip_marker_check, verify_exp=verify_exp ) return edition is not None and edition == SupportedEditions.ADMIN.value def get_cl_edition_readable() -> AnyStr: """ Function returns current edition of CL: - CloudLinux OS Shared - CloudLinux OS Shared Pro - CloudLinux OS Solo - Error string """ try: if is_cl_solo_edition(skip_jwt_check=True): return SOLO_EDITION_HUMAN_READABLE elif is_cl_admin_edition(skip_jwt_check=True): return ADMIN_EDITION_HUMAN_READABLE except CLEditionDetectionError: return SHARED_EDITION_HUMAN_READABLE success_flag, error_message, _ = jwt_token_check() # This error message indicates that user could not read jwt.token because not enough privileges # We have to check existence of token file, to be sure that privilege error acquired if 'read error' in error_message and (os.path.isfile(CLN_JWT_TOKEN_PATH) or in_cagefs()): with open(CL_EDITION_FILE_FOR_USERS, 'r', encoding='utf-8') as f: return f.read().strip() if success_flag: return SHARED_PRO_EDITION_HUMAN_READABLE else: return SHARED_EDITION_HUMAN_READABLE def print_skip_message_on_solo(): """Just print skip message""" warnings.warn( 'as part of unification effort, features might not map directly to editions', DeprecationWarning, ) print('CloudLinux Solo edition detected! \n' 'Command is skipped, because it is unsupported and unneeded on current edition') # NOTE(vlebedev): To properly avoid circular imports, these 2 functions' implementations # are moved to "utils" submodule of "clcommon.cpapi" as it is the level # where all necessary panel stuff is imported and constructed and which # uses "clcommon.lib" module as a dependency. In other words "clcommon.lib" lays # lower in the modules hierarchy and should not import from "clcommon.cpapi"). def skip_without_lve(): warnings.warn( 'since v3.4.1 use "clcommon.cpapi.utils:skip_without_lve" instead', DeprecationWarning, ) from clcommon.cpapi.utils import skip_without_lve # pylint: disable=cyclic-import,import-outside-toplevel return skip_without_lve() def lve_supported_or_exit(f): warnings.warn( 'since v3.4.1 use "clcommon.cpapi.utils:lve_supported_or_exit" instead', DeprecationWarning, ) from clcommon.cpapi.utils import lve_supported_or_exit # pylint: disable=cyclic-import,import-outside-toplevel return lve_supported_or_exit(f) def user_cl_edition(): """ This function is used as a workaround for users to be able to get the CL Edition. Crontab will write output of `cldetect --detect-edition` command to CL_EDITION_FILE_FOR_USERS, with 0644 permission. """ if os.path.exists(CL_EDITION_FILE_FOR_USERS): with open(CL_EDITION_FILE_FOR_USERS, "r", encoding="utf-8") as f: edition = f.read().strip() return HUMAN_READABLE_TOKEN_EDITION_PAIRS.get(edition) return None def is_container() -> bool: """ Determines is this system running inside container """ return os.path.exists('/etc/cloudlinux-container')