관리-도구
편집 파일: directadmin.py
# -*- coding: utf-8 -*- """ CloudLinux API for DirectAdmin control panel """ import glob import os import re import subprocess import sys import syslog from traceback import format_exc from typing import Dict, List, Tuple # NOQA from urllib.parse import urlparse import requests from clcommon.clconfpars import ( WebConfigMissing, WebConfigParsingError, apache_conf_parser, load_fast, nginx_conf_parser, read_unicode_file_with_decode_fallback, ) from clcommon.clconfpars import load as loadconfig from clcommon.clpwd import ClPwd from clcommon.cpapi.cpapicustombin import ( _docroot_under_user_via_custom_bin, get_domains_via_custom_binary, ) from clcommon.cpapi.cpapiexceptions import ( CpApiTypeError, NoDBAccessData, NoDomain, NoPanelUser, ParsingError, ReadFileError, ) from clcommon.cpapi.GeneralPanel import ( DomainDescription, GeneralPanelPluginV1, PHPDescription, ) from clcommon.cpapi.plugins.universal import ( get_admin_email as universal_get_admin_email, ) from clcommon.features import Feature from clcommon.utils import ( ExternalProgramFailed, find_module_param_in_config, get_file_lines, grep, ) __cpname__ = 'DirectAdmin' DA_DIR = '/usr/local/directadmin' DA_CONF = os.path.join(DA_DIR, 'conf/directadmin.conf') DA_DATA_DIR = os.path.join(DA_DIR, 'data') DA_DB_CONF = os.path.join(DA_DIR, 'conf/mysql.conf') DA_USERS_PATH = os.path.join(DA_DATA_DIR, 'users') DA_OPT_PATH = os.path.join(DA_DIR, 'custombuild', 'options.conf') USER_CONF = 'user.conf' DOMAINOWNERS = '/etc/virtual/domainowners' ADMIN_DIR = os.path.join(DA_DATA_DIR, 'admin') RESELLERS_LIST = os.path.join(ADMIN_DIR, 'reseller.list') ADMINS_LIST = os.path.join(ADMIN_DIR, 'admin.list') USER_PATTERN = re.compile(rf'.+/(.+)/{re.escape(USER_CONF)}') # WARN: Probably will be deprecated for our "official" plugins. # See pluginlib.detect_panel_fast() def detect(): return os.path.isfile('/usr/local/directadmin/directadmin') or \ os.path.isfile('/usr/local/directadmin/custombuild/build') def db_access(): access = {} try: login_data = loadconfig(DA_DB_CONF) access['login'] = login_data['user'] access['pass'] = login_data['passwd'] except IOError as err: raise NoDBAccessData( 'Can not open file with data to database access; ' + str(err) ) from err except KeyError as err: raise NoDBAccessData( f'Can not get database access data from file {DA_DB_CONF}' ) from err return access def cpusers(): match_list = [USER_PATTERN.match(path) for path in glob.glob(os.path.join(DA_USERS_PATH, '*', USER_CONF))] users_list = [match.group(1) for match in match_list if match] return tuple(users_list) def resellers(): with open(RESELLERS_LIST, encoding='utf-8') as f: resellers_list = [line.strip() for line in f] return tuple(resellers_list) def admins(): with open(ADMINS_LIST, encoding='utf-8') as f: admins_list = [line.strip() for line in f] return set(admins_list) def dblogin_cplogin_pairs(cplogin_lst=None, with_system_users=False): from clcommon.cpapi.plugins.universal import _dblogin_cplogin_pairs # pylint: disable=import-outside-toplevel access = db_access() data = _dblogin_cplogin_pairs(cplogin_lst=cplogin_lst, access=access) if with_system_users: data += tuple(get_da_user(DA_USERS_PATH).items()) return data def get_da_user(path, quiet=True): users = {} cur_dir = os.getcwd() os.chdir(path) dir_list = glob.glob('./*') for user_dir in dir_list: if os.path.isdir(user_dir): file_domains = path + '/' + user_dir + '/domains.list' try: with open(file_domains, encoding='utf-8') as f: if len(f.readline()) > 0: user_name = user_dir[2:] users[user_name] = user_name except IOError: if not quiet: sys.stderr.write("No file " + file_domains) os.chdir(cur_dir) return users def cpinfo(cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'), search_sys_users=True): returned = [] if isinstance(cpuser, str): cpusers_list = [cpuser] elif isinstance(cpuser, (list, tuple)): cpusers_list = tuple(cpuser) elif cpuser is None: cpusers_list = cpusers() else: raise CpApiTypeError(funcname='cpinfo', supportedtypes='str|unicode|list|tuple', received_type=type(cpuser).__name__) def _get_reseller(config): if config.get('usertype') == 'reseller': return config.get('username') return config.get('creator') _user_conf_map = {'cplogin': lambda config: config.get('username'), 'package': lambda config: config.get('package'), 'mail': lambda config: config.get('email'), 'reseller': lambda config: _get_reseller(config), 'dns': lambda config: config.get('domain'), 'locale': lambda config: config.get('language')} keyls_ = [_user_conf_map[key] for key in keyls] for username in cpusers_list: user_conf_file = os.path.join(DA_USERS_PATH, username, USER_CONF) if os.path.exists(user_conf_file): user_config = load_fast(user_conf_file) returned.append([key(user_config) for key in keyls_]) return returned def _docroot_under_root(domain): # type: (str) -> Tuple[str, str] """ Old method for getting doc_root for domain under root Method reads DA config :return: (doc_root, username) cortege """ user_name = None # Load /etc/virtual/domainowners _domain_to_user_map = _load_domains_owners() # Find supposed owner of domain for main_domain in list(_domain_to_user_map.keys()): if domain == main_domain or domain.endswith(f'.{main_domain}'): # Parent domain found user_name = _domain_to_user_map[main_domain] break if user_name is None: domains_list = [] else: domains_list = userdomains(user_name) for d in domains_list: if domain in d: return d[1], user_name def _docroot_under_user_old_mechanism(domain): # type: (str) -> Tuple[str, str] """ Old method for getting doc_root for domain under user Method parses /home/<username>/domains directory :return: (doc_root, username) cortege """ clpwd = ClPwd() user_pw = clpwd.get_pw_by_uid(os.getuid())[0] list_domains_and_doc_roots = _get_domains_list_as_user(user_pw.pw_dir) for domain_data in list_domains_and_doc_roots: if domain_data['server_name'] == domain: return domain_data['document_root'], user_pw.pw_name def docroot(domain): # type: (str) -> Tuple[str, str] """ Retrieves document root for domain :param domain: Domain to determine doc_root :return: Cortege: (doc_root, domain_user) """ res = None domain = domain.strip() uid = os.getuid() euid = os.geteuid() if euid == 0 and uid == 0: res = _docroot_under_root(domain) else: res = _docroot_under_user_via_custom_bin(domain) # If there was successful result, res object will have # (doc_root, domain_user) format. If there wasn't found any correct # doc_roots, res will be None. if res is not None: return res raise NoDomain(f"Can't obtain document root for domain '{domain}'") def _is_nginx_installed(): """ Check if nginx is installed via custombuild; """ config = loadconfig(DA_CONF) return bool(int(config.get('nginx', 0)) or int(config.get('nginx_proxy', 0))) def _get_domains_list_as_root(user_path): """ Get domains list for user from httpd or nginx config as root :param user_path: path to DA directory of user's profile :return: parsed httpd or nginx config :rtype: list """ try: if _is_nginx_installed(): httpd_conf = nginx_conf_parser(os.path.join(user_path, 'nginx.conf')) else: httpd_conf = apache_conf_parser(os.path.join(user_path, 'httpd.conf')) except WebConfigParsingError as e: raise ParsingError(e.message) from e except WebConfigMissing: return [] return httpd_conf def _get_domains_list_as_user(user_home): # type: (str) -> List[Dict[str, str, bool]] """ Get domains list for user from ~/domains directory as user. Method DOESN'T search subdomains, because it's almost impossible detect by user's folders without privileges escalation :param user_home: path to user home :return: list of dictionaries {'server_name': 'domain', 'document_root': 'doc_root', 'ssl': False} """ domains_dir = 'domains' doc_root_dir = 'public_html' domains_list = [] domains_path = os.path.join(user_home, domains_dir) # Searching main domains # All directories of main domains are saved in ~/domains directory for domain_dir in os.listdir(domains_path): domain_path = os.path.join(domains_path, domain_dir) doc_root_path = os.path.join(domains_path, domain_dir, doc_root_dir) if os.path.isdir(domain_path) and os.path.isdir(doc_root_path): domains_list.append({ 'server_name': domain_dir, 'document_root': doc_root_path, 'ssl': False, }) else: continue return domains_list def userdomains(cpuser, as_root=False): # type: (str, bool) -> List[Tuple[str, str]] """ Get user's domains list :return list: domain names Example: [('cltest1.com', '/home/cltest1/domains/cltest1.com/public_html'), ('mk.cltest1.com', '/home/cltest1/domains/cltest1.com/public_html/mk'), ('cltest11.com', '/home/cltest1/domains/cltest11.com/public_html') ] """ domains_list = [] user_path = os.path.join(DA_USERS_PATH, cpuser) euid = os.geteuid() # old method to get list of user's domains main_domain_path = '' if not os.path.exists(user_path): return [] user_home = os.path.expanduser('~' + cpuser) public_path = os.path.join(user_home, 'public_html') if os.path.exists(public_path) and os.path.islink(public_path): main_domain_path = os.path.realpath(public_path) if euid == 0 or as_root: httpd_conf = _get_domains_list_as_root(user_path) for domain in httpd_conf: if domain['ssl'] is True: continue # Put main domain in start of list if domain['server_name'] in main_domain_path: domains_list.insert(0, (domain['server_name'], domain['document_root'])) else: domains_list.append((domain['server_name'], domain['document_root'])) return domains_list # this case works the same as above but through the rights escalation binary wrapper # call path: here -> binary -> python(diradmin euid) -> userdomains(as_root=True) -> print json result to stdout rc, res = get_domains_via_custom_binary() if rc == 0: return res elif rc == 11: raise NoPanelUser(f'User {cpuser} not found in the database') else: raise ExternalProgramFailed(f'Failed to get userdomains: {res}') def homedirs(): """ Detects and returns list of folders contained the home dirs of users of the DirectAdmin :return: list of folders, which are parent of home dirs of users of the panel """ home_dirs = set() clpwd = ClPwd() users_dict = clpwd.get_user_dict() for user_name, pw_user in list(users_dict.items()): conf_file = os.path.join(DA_USERS_PATH, user_name, USER_CONF) if os.path.exists(conf_file): home_dir = os.path.dirname(pw_user.pw_dir) home_dirs.add(home_dir) return list(home_dirs) def domain_owner(domain): """ Return domain's owner :param domain: Domain/sub-domain/add-domain name :return: user name or None if domain not found """ return _load_domains_owners().get(domain, None) @GeneralPanelPluginV1.cache_call(panel_parker=[DOMAINOWNERS]) def _load_domains_owners() -> Dict[str, str]: """ Get domain<->user map from /etc/virtual/domainowners file """ # 1. Load DA data file try: domains_lines = read_unicode_file_with_decode_fallback(DOMAINOWNERS).splitlines() except (OSError, IOError) as e: raise ReadFileError(str(e)) from e # 2. File loaded successfully, parse data and fill dictionaries _domain_to_user_map = {} for line_ in domains_lines: line_ = line_.strip() # pass empty line if not line_: continue domain_, user_ = line_.split(':') domain_ = domain_.strip() user_ = user_.strip() # Fill domain to user map _domain_to_user_map[domain_] = user_ return _domain_to_user_map def reseller_users(resellername): """ Return list of reseller users :param resellername: reseller name; return empty list if None :return list[str]: user names list """ if resellername is None: return [] all_users_dict = ClPwd().get_user_dict() users_list_file = os.path.join(DA_USERS_PATH, resellername, 'users.list') try: with open(users_list_file, encoding='utf-8') as users_list: users_list = [item.strip() for item in users_list] users_list.append(resellername) # performing intersection to avoid returning non-existing users # that are still present in config file for some reason return list(set(all_users_dict) & set(users_list)) except (IOError, OSError): return [] def reseller_domains(resellername=None): """ Get pairs user <=> domain for given reseller; Empty list if cannot get or no users found; :type resellername: str :return list[tuple[str, str]]: tuple[username, main_domain] """ if resellername is None: return [] users = reseller_users(resellername) return dict(cpinfo(users, keyls=('cplogin', 'dns'))) def get_admin_email(): admin_user_file = os.path.join(DA_USERS_PATH, 'admin', USER_CONF) cnf = loadconfig(admin_user_file) return cnf.get('email', universal_get_admin_email()) def is_reseller(username): """ Check if given user is reseller; :type username: str :rtype: bool :raise: ParsingError, ReadFileError """ user_config = os.path.join(DA_USERS_PATH, username, USER_CONF) if os.path.exists(user_config): try: return loadconfig(user_config)['usertype'] == 'reseller' except IndexError as e: raise ParsingError('User config exists, but no usertype given') from e return False def get_user_login_url(domain): return f'http://{domain}:2222' def _get_da_php_config(): """ Return map (PHP_DA_CODE:{PHP_HANDLER, PHP_VERSION}) :return: """ _php_da_map = {} try: php_cfg = loadconfig(DA_OPT_PATH) except (IOError, OSError): return None # iterate through custombuild options.conf php_mode and php_release options i = 1 while f'php{i}_mode' in php_cfg and f'php{i}_release' in php_cfg: _php_da_map[str(i)] = {} _php_da_map[str(i)]['handler_type'] = php_cfg[f'php{i}_mode'] _php_da_map[str(i)]['php_version_id'] = php_cfg[f'php{i}_release'] i += 1 return _php_da_map def _get_php_code_info_for_domain(domain, owner): """ Return php code from domain config :param domain: :param owner: :return: string '1' or '2' - php code in DA """ domain_config_file = os.path.join(DA_USERS_PATH, str(owner), 'domains', str(domain) + '.conf') try: domain_config = loadconfig(domain_config_file) except (IOError, OSError): return '1' domain_php = domain_config.get('php1_select') # None - DA custombuild has only one php version # '0' - it means that user selected default version PHP of DA custombuild if domain_php is None or domain_php == '0': domain_php = '1' return domain_php def _get_subdomains(all_domains, mapped_all_domains): subdomains = [] for domain in all_domains: if domain[0] in mapped_all_domains.keys(): continue subdomains.append(domain[0]) return subdomains def get_domains_php_info(): """ Return php version information for each domain :return: domain to php info mapping Example output: {'cltest.com': {'handler_type': 'mod_php', 'php_version_id': '7.1', 'username': 'cltest'}, 'cltest2.com': {'handler_type': 'fastcgi', 'php_version_id': '7.3', 'username': 'kek_2'}, 'cltest3.com': {'handler_type': 'suphp', 'php_version_id': '5.5', 'username': 'cltest3'}, 'omg.kek': {'handler_type': 'php-fpm', 'php_version_id': '5.2', 'username': 'cltest'}} :rtype: dict[str, dict] """ # returns only main domains map_domain_user = _load_domains_owners() result_map = {} php_da_map = _get_da_php_config() if php_da_map is None: return result_map owner_to_domains: dict[str, list[str]] = {} for domain, owner in map_domain_user.items(): owner_to_domains.setdefault(owner, []).append(domain) for owner, domains in owner_to_domains.items(): all_domains_in_httpd_file = userdomains(owner) # get safely to not break something to other teams try: subdomains = _get_subdomains(all_domains_in_httpd_file, map_domain_user) except Exception: subdomains = [] for domain in domains: php_info_code = _get_php_code_info_for_domain(domain, owner) if php_info_code not in php_da_map \ or php_da_map[php_info_code]['php_version_id'] == 'no': # 'no' means that php_release specified in user's config # does not exist in custombuild options.conf php_info_code = '1' php_info = php_da_map[php_info_code] try: domain_aliases = _useraliases(owner, domain) except Exception: domain_aliases = [] # https://forum.directadmin.com/threads/sub-domain-different-php-version.58426/ # subdomain version should be the same as main domain for domain_entity in [domain] + subdomains + domain_aliases: result_map[domain_entity] = DomainDescription( username=owner, php_version_id=php_info['php_version_id'], handler_type=php_info['handler_type'], display_version=f'php{php_info["php_version_id"].replace(".", "")}' ) return result_map def _get_installed_alt_php_versions(): """ Gets installed alt-phpXY - could be chosen via CloudLinux PHP Selector w/o being compiled via custombuild """ installed_list = [] alt_phps_directory = '/opt/alt/' pattern = re.compile(r'^php\d+$') for item in os.listdir(alt_phps_directory): item_path = os.path.join(alt_phps_directory, item) # Check if the item is a directory and its name matches the pattern if os.path.isdir(item_path) and pattern.match(item) and os.path.exists(f'{item_path}/usr/bin/php'): version = item.replace('php', '') installed_list.append(PHPDescription( identifier=f'alt-{item}', version=f'{version[:1]}.{version[1:]}', dir=f'{item_path}/', modules_dir=os.path.join(item_path, 'usr/lib64/php/modules/'), bin=os.path.join(item_path, 'usr/bin/php'), ini=os.path.join(item_path, 'link/conf/default.ini'), )) return installed_list def _get_da_php_extension_dir(directadmin_php_dir): return subprocess.run( [f'{directadmin_php_dir}bin/php-config', '--extension-dir'], text=True, capture_output=True, check=False, ).stdout def _get_compiled_custombuild_versions(): """ Gets compiled phpXY - could be chosen via DirectAdmin PHP Selector """ php_da_map = _get_da_php_config() if php_da_map is None: return [] # {'1': {'handler_type': 'php-fpm', 'php_version_id': '7.4'}, # '2': {'handler_type': 'php-fpm', 'php_version_id': '8.0'}, # '3': {'handler_type': 'php-fpm', 'php_version_id': 'no'}, # '4': {'handler_type': 'php-fpm', 'php_version_id': 'no'}} installed_php_data = php_da_map.values() installed_list = [] # obtain php version compiled via custombuild: phpXY for version_info in installed_php_data: version = version_info['php_version_id'] if version == 'no': continue directadmin_php_dir = f'/usr/local/php{version.replace(".", "")}/' if not os.path.exists(directadmin_php_dir): continue modules_dir_path = _get_da_php_extension_dir(directadmin_php_dir) if modules_dir_path: modules_dir_path = modules_dir_path.strip() installed_list.append(PHPDescription( identifier=f'php{version.replace(".", "")}', version=version, dir=os.path.join(directadmin_php_dir), modules_dir=modules_dir_path, bin=os.path.join(directadmin_php_dir, 'bin/php'), ini=os.path.join(directadmin_php_dir, 'lib/php.ini'), )) return installed_list def _get_aliases(path): """ Parse user aliases file and return data """ if not os.path.exists(path): return [] data = [] try: with open(path, encoding='utf-8') as f: data = f.readlines() except IOError as e: syslog.syslog(syslog.LOG_WARNING, f'Can`t open file "{path}" due to : "{e}"') return [record.strip().split('=')[0] for record in data] def _useraliases(cpuser, domain): """ Return aliases from user domain :param str|unicode cpuser: user login :param str|unicode domain: :return list of aliases """ path = f'/usr/local/directadmin/data/users/{cpuser}/domains/{domain}.pointers' data = _get_aliases(path) return data class PanelPlugin(GeneralPanelPluginV1): HTTPD_CONFIG_FILE = '/etc/httpd/conf/httpd.conf' HTTPD_MPM_CONFIG = '/etc/httpd/conf/extra/httpd-mpm.conf' HTTPD_INFO_CONFIG = '/etc/httpd/conf/extra/httpd-info.conf' def __init__(self): super().__init__() self.ADMINS_LIST = os.path.join(ADMIN_DIR, 'admin.list') def getCPName(self): """ Return panel name :return: """ return __cpname__ def get_cp_description(self): """ Retrieve panel name and it's version :return: dict: { 'name': 'panel_name', 'version': 'panel_version', 'additional_info': 'add_info'} or None if can't get info """ try: with subprocess.Popen( ['/usr/local/directadmin/directadmin', 'v'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) as p: out, _ = p.communicate() # output may differ (depending on version): # 'Version: DirectAdmin v.1.642' # 'DirectAdmin v.1.643 55acaa256ec6ed99b9aaec1050de793b298f62b0' # 'DirectAdmin 1.644 55acaa256ec6ed99b9aaec1050de793b298f62b0' version_words = (word.lstrip('v.') for word in out.split()) def _is_float(s): return s.replace('.', '').isdigit() version = next(filter(_is_float, version_words), '') return {'name': __cpname__, 'version': version, 'additional_info': None} except Exception: return None def db_access(self): """ Getting root access to mysql database. For example {'login': 'root', 'db': 'mysql', 'host': 'localhost', 'pass': '9pJUv38sAqqW'} :return: root access to mysql database :rtype: dict :raises: NoDBAccessData """ return db_access() def cpusers(self): """ Generates a list of cpusers registered in the control panel :return: list of cpusers registered in the control panel :rtype: tuple """ return cpusers() def resellers(self): """ Generates a list of resellers in the control panel :return: list of cpusers registered in the control panel :rtype: tuple """ return resellers() def is_reseller(self, username): """ Check if given user is reseller; :type username: str :rtype: bool """ return is_reseller(username) # unlike admins(), this method works fine in post_create_user # hook; looks like directadmin updates admins.list a little bit later # then calls post_create_user.sh def is_admin(self, username): """ Return True if username is in admin names :param str username: user to check :return: bool """ user_conf_file = os.path.join(DA_USERS_PATH, username, USER_CONF) if not os.path.exists(user_conf_file): return False user_config = load_fast(user_conf_file) return user_config['usertype'] == 'admin' def dblogin_cplogin_pairs(self, cplogin_lst=None, with_system_users=False): """ Get mapping between system and DB users @param cplogin_lst :list: list with usernames for generate mapping @param with_system_users :bool: add system users to result list or no. default: False """ return dblogin_cplogin_pairs(cplogin_lst, with_system_users) def cpinfo(self, cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'), search_sys_users=True): """ Retrieves info about panel user(s) :param str|unicode|list|tuple|None cpuser: user login :param keyls: list of data which is necessary to obtain the user, the valuescan be: cplogin - name/login user control panel mail - Email users reseller - name reseller/owner users locale - localization of the user account package - User name of the package dns - domain of the user :param bool search_sys_users: search for cpuser in sys_users or in control panel users (e.g. for Plesk) :return: returns a tuple of tuples of data in the same sequence as specified keys in keylst :rtype: tuple """ return cpinfo(cpuser, keyls, search_sys_users=search_sys_users) def get_admin_email(self): """ Retrieve admin email address :return: Host admin's email """ return get_admin_email() def docroot(self, domain): """ Return document root for domain :param str|unicode domain: :return Cortege: (document_root, owner) """ return docroot(domain) @staticmethod def useraliases(cpuser, domain): return _useraliases(cpuser, domain) def userdomains(self, cpuser): """ Return domain and document root pairs for control panel user first domain is main domain :param str|unicode cpuser: user login :return list of tuples (domain_name, documen_root) """ return userdomains(cpuser) def homedirs(self): """ Detects and returns list of folders contained the home dirs of users of the cPanel :return: list of folders, which are parent of home dirs of users of the panel """ return homedirs() def reseller_users(self, resellername=None): """ Return reseller users :param resellername: reseller name; autodetect name if None :return list[str]: user names list """ return reseller_users(resellername) def reseller_domains(self, resellername=None): """ Get dict[user, domain] :param reseller_name: reseller's name :rtype: dict[str, str|None] :raises DomainException: if cannot obtain domains """ return reseller_domains(resellername) def get_user_login_url(self, domain): """ Get login url for current panel; :type domain: str :rtype: str """ return get_user_login_url(domain) def admins(self): """ List all admins names in given control panel :return: list of strings """ return admins() def domain_owner(self, domain): """ Return domain's owner :param domain: Domain/sub-domain/add-domain name :rtype: str :return: user name or None if domain not found """ return domain_owner(domain) def get_domains_php_info(self): """ Return php version information for each domain :return: domain to php info mapping :rtype: dict[str, dict] """ return get_domains_php_info() @staticmethod def _get_da_skin_name(): """ Retrieve current DA skin name :return: Current DA skin name. None if unknown """ config = loadconfig(DA_CONF) # starting from DA 1.664 `docsroot` option was replaced by `system_skin` if 'system_skin' in config: return config['system_skin'] # grep '^docsroot=' /usr/local/directadmin/conf/directadmin.conf | cut -d/ -f4 docsroot = config.get('docsroot', None) # docsroot like './data/skins/evolution' if docsroot is None: return None return docsroot.split('/')[-1] @staticmethod def get_encoding_name(): """ Retrieve encoding name, used for package/reseller names :return: """ enhanced_skin_config = os.path.join(DA_DIR, "data/skins/enhanced/lang/en/lf_standard.html") default_encoding = 'utf8' current_skin = PanelPlugin._get_da_skin_name() if current_skin == 'enhanced': # For enchanced skin we read encoding from its config # :LANG_ENCODING=iso-8859-1 see LU-99 for more info skin_config = loadconfig(enhanced_skin_config) # Option in file is 'LANG_ENCODING', but key is lowercase return skin_config.get('lang_encoding', default_encoding) return default_encoding def get_unsupported_cl_features(self) -> tuple[Feature, ...]: return ( Feature.RUBY_SELECTOR, ) @staticmethod def get_apache_ports_list() -> List[int]: """ Retrieves active httpd's ports from httpd's config :return: list of apache's ports """ # cat /etc/apache2/conf/httpd.conf | grep Listen _httpd_ports_list = [] try: lines = get_file_lines(PanelPlugin.HTTPD_CONFIG_FILE) except (OSError, IOError): return None lines = [line.strip() for line in lines] for line in grep('Listen', match_any_position=False, multiple_search=True, data_from_file=lines): # line examples: # Listen 0.0.0.0:80 # Listen [::]:80 try: value = int(line.split(' ')[1]) if value not in _httpd_ports_list: _httpd_ports_list.append(value) except (IndexError, ValueError): pass if not _httpd_ports_list: _httpd_ports_list.append(80) return _httpd_ports_list @staticmethod def _get_active_web_server_params() -> Tuple[str, str]: """ Determines active web server from options.conf, directive 'webserver' :return: tuple (active_web_server_name, apache_active_module_name) active_web_server_name: 'apache', 'nginx', 'nginx_apache', 'litespeed', 'openlitespeed', etc apache_active_module_name: 'prefork', 'event', 'worker' (None, None) if DA options.conf read/parse error """ web_server_name = None apache_active_module_name = None try: # cat /usr/local/directadmin/custombuild/options.conf | grep webserver # webserver=apache # webserver can be: apache, nginx, nginx_apache, litespeed, openlitespeed. options_lines = get_file_lines(DA_OPT_PATH) grep_result_list = list(grep('^apache_mpm|^webserver', fixed_string=False, match_any_position=False, multiple_search=True, data_from_file=options_lines)) # grep_result_list example: ['webserver=apache\n', 'apache_mpm=auto\n'] for line in grep_result_list: line_parts = line.strip().split('=') if line_parts[0] == 'webserver': web_server_name = line_parts[1] if line_parts[0] == 'apache_mpm': apache_active_module_name = line_parts[1] # modules are 'prefork', 'event', 'worker'. 'auto' == 'worker' if apache_active_module_name == 'auto': apache_active_module_name = 'worker' except (OSError, IOError, IndexError): pass return web_server_name, apache_active_module_name def _get_max_request_workers_for_module(self, apache_module_name: str) -> Tuple[int, str]: """ Determine MaxRequestWorkers directive value for specified apache module. Reads config file /etc/httpd/conf/extra/httpd-mpm.conf :param apache_module_name: Current apache's module name: 'prefork', 'event', 'worker' :return: tuple (max_req_num, message) max_req_num - Maximum request apache workers number or 0 if error message - OK/Error message """ try: return find_module_param_in_config(self.HTTPD_MPM_CONFIG, apache_module_name, 'MaxRequestWorkers') except (OSError, IOError, IndexError, ValueError): return 0, format_exc() def get_apache_max_request_workers(self) -> Tuple[int, str]: """ Get current maximum request apache workers from httpd's config :return: tuple (max_req_num, message) max_req_num - Maximum request apache workers number or 0 if error message - OK/Error message """ web_server_name, apache_active_module_name = self._get_active_web_server_params() if web_server_name is None or apache_active_module_name is None: return 0, f"There was error during read/parse {DA_OPT_PATH}. Apache collector will not work" if web_server_name != "apache": return 0, f"DA is configured for web server '{web_server_name}'; but 'apache' is needed. " \ "Apache collector will not work" return self._get_max_request_workers_for_module(apache_active_module_name) def _get_httpd_status_uri(self) -> str: """ Determine apache mod_status URI from /etc/httpd/conf/extra/httpd-info.conf config :return Apache mod_status URI or None if error/not found """ location_uri = None try: # # grep -B 2 'SetHandler server-status' /etc/httpd/conf/extra/httpd-info.conf # # <Location /server-status> # SetHandler server-status info_lines = get_file_lines(self.HTTPD_INFO_CONFIG) location_directive = '<Location' location_line = None for line in info_lines: line = line.strip() if line.startswith(location_directive): # Location directive found, save it location_line = line continue if line.startswith('SetHandler server-status') and location_line: # server-status found, Extract URI from Location directive start tag location_uri = location_line.replace(location_directive, '').replace('>', '').strip() break except (OSError, IOError): pass return location_uri def get_apache_connections_number(self): """ Retrieves Apache's connections number (from apache's mod_status) :return: tuple (conn_num, message) conn_num - current connections number, 0 if error message - OK/Trace """ web_server_name, _ = self._get_active_web_server_params() if web_server_name is None: return 0, f"There was error during read/parse {DA_OPT_PATH}. Apache collector will not work" if web_server_name != "apache": return 0, f"DA is configured for web server '{web_server_name}'; but 'apache' is needed. " \ "Apache collector will not work" try: # curl localhost/server-status?auto | grep "Total Accesses" # Total Accesses: 25 location_uri = self._get_httpd_status_uri() if location_uri is None: return 0, "Can't found mod_status URI in configs" url = f'http://127.0.0.1{location_uri}?auto' response = requests.get(url, timeout=5) if response.status_code != 200: return 0, f"GET {url} response code is {response.status_code}" s_response = response.content.decode('utf-8') s_response_list = s_response.split('\n') out_list = list(grep("Total Accesses", data_from_file=s_response_list)) # out_list example: ['Total Accesses: 200'] s_total_accesses = out_list[0].split(':')[1].strip() return int(s_total_accesses), 'OK' except Exception: return 0, format_exc() @staticmethod def get_installed_php_versions(): """ Returns installed alt-php(s) on the server compiled phpXY via custombuild and alt-phpXY has different paths also user could choose version via PHP selector which was not compiled with custombuild (will be absent in DA configs) """ return _get_installed_alt_php_versions() + _get_compiled_custombuild_versions() def get_server_ip(self): ip_list_file = '/usr/local/directadmin/data/admin/ip.list' if not os.path.exists(ip_list_file): return '' with open(ip_list_file, encoding='utf-8') as f: ips = f.readlines() if not ips: return '' return ips[0].strip() @staticmethod def get_user_emails_list(username: str, domain: str): user_conf = f'/usr/local/directadmin/data/users/{username}/user.conf' if not os.path.exists(user_conf): return '' user_conf = load_fast(user_conf) return user_conf.get('email', '') @staticmethod def panel_login_link(username): generated_login = subprocess.run(['/usr/local/directadmin/directadmin', '--create-login-url', f'user={username}'], capture_output=True, text=True, check=False).stdout # http://server-206-252-237-2.da.direct:2222/api/login/url?key=0SrJm1CNAIh34w4Fk8Kp4ohypUFp_pMm if len(generated_login) == 0: return '' parsed = urlparse(generated_login) return f'{parsed.scheme}://{parsed.netloc}/' @staticmethod def panel_awp_link(username): link = PanelPlugin.panel_login_link(username).rstrip("/") if len(link) == 0: return '' return f'{link}/evo/user/plugins/awp#/' def suspended_users_list(self): all_users = cpusers() suspended_users = [] for user in all_users: user_conf_file = os.path.join(DA_USERS_PATH, user, USER_CONF) if not os.path.exists(user_conf_file): continue user_config = load_fast(user_conf_file) if user_config.get('suspended') == 'yes': suspended_users.append(user) return suspended_users