관리-도구
편집 파일: pluginlib.py
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ Library to work with plug-ins """ import os import py_compile import subprocess import sys import pwd import glob import shutil import syslog from importlib import import_module from configparser import ConfigParser from clcommon.clcustomscript import lvectl_custompanel_script, CLSYSCONFIG from .const import CACHE_CPNAME, UNKNOWN_CP_NAME, UNKNOWN_CP_IMPORT from .cpapiexceptions import PluginImportError clsysconfig = ConfigParser(interpolation=None, strict=False) clsysconfig.read(CLSYSCONFIG) CONTROLPANELNAME_VAR = '__cpname__' DETECTFUNCNAME_VAR = 'detect' PLUGINS_PKG_PREFIX = 'clcommon.cpapi' PLUGINS_DIR = None if clsysconfig.has_option(section='cpapi', option='plugindir'): PLUGINS_DIR = clsysconfig.get(section='cpapi', option='plugindir') if not os.path.exists(PLUGINS_DIR): print("WARNING: plugindir is configured, but doesn't exists!") API_LINK_PATH = os.path.join(os.path.dirname(__file__), 'apilink.py') CPANEL_NAME = 'cPanel' DIRECTADMIN_NAME = 'DirectAdmin' PLESK_NAME = 'Plesk' ISPMANAGER_NAME = 'ISPManager' INTERWORX_NAME = 'InterWorx' UNKNOWN_NAME = 'Unknown' INTEGRATED_NAME = 'Integrated' OFFICIAL_PLUGINS = { CPANEL_NAME: 'cpanel', DIRECTADMIN_NAME: 'directadmin', PLESK_NAME: 'plesk', ISPMANAGER_NAME: 'ispmanager', INTERWORX_NAME: 'interworx', UNKNOWN_NAME: 'backward_plugin', INTEGRATED_NAME: 'vendors' } def detect_panel_fast(): """ This function will try to detect our officially supported control panels :return: tuple of CP name and CP plugin name to import """ panel_name, official_plugin_name = None, None if os.path.isfile('/opt/cpvendor/etc/integration.ini'): panel_name, official_plugin_name = INTEGRATED_NAME, OFFICIAL_PLUGINS[INTEGRATED_NAME] elif os.path.isfile('/usr/local/cpanel/cpanel'): panel_name, official_plugin_name = CPANEL_NAME, OFFICIAL_PLUGINS[CPANEL_NAME] elif os.path.isfile('/usr/local/directadmin/directadmin') or\ os.path.isfile('/usr/local/directadmin/custombuild/build'): panel_name, official_plugin_name = DIRECTADMIN_NAME, OFFICIAL_PLUGINS[DIRECTADMIN_NAME] elif os.path.isfile('/usr/local/psa/version'): panel_name, official_plugin_name = PLESK_NAME, OFFICIAL_PLUGINS[PLESK_NAME] # ispmanager must have: # v5: /usr/local/mgr5/ directory, # v4: /usr/local/ispmgr/bin/ispmgr file elif os.path.isfile('/usr/local/ispmgr/bin/ispmgr') or os.path.isdir('/usr/local/mgr5'): panel_name, official_plugin_name = ISPMANAGER_NAME, OFFICIAL_PLUGINS[ISPMANAGER_NAME] elif os.path.isdir('/usr/local/interworx'): panel_name, official_plugin_name = INTERWORX_NAME, OFFICIAL_PLUGINS[INTERWORX_NAME] elif lvectl_custompanel_script() is not None: # Check custom panel script to use backward plugin panel_name, official_plugin_name = UNKNOWN_NAME, OFFICIAL_PLUGINS[UNKNOWN_NAME] return panel_name, official_plugin_name def get_cp_plugin_module(plugin_name=None): if plugin_name: # Case when some "official" plugin was already detected return import_module(f'{PLUGINS_PKG_PREFIX}.plugins.{plugin_name}') # Use cached module if detect_panel_fast() can't detect anything # Actual import statement in file is updated on each rebuild_cache() from .apilink import api as _apiplugin # pylint: disable=import-outside-toplevel return _apiplugin def detectplugin(plugin_pack, ignore_errors=False): """ Scan directory for presence of plugins. Plugin is a file that has the extension ".py" and a non-empty variable "__cpname__" (the name of the control panel). The file must contain a function "detect" which returns True in case of presence of the corresponding control panel. >>> detectplugin('plugins') ('from .plugins import cpanel as api', 'cPanel') :param plugin_pack: package name or the name of the plug-ins directory ('cache' - cache plugins users; 'plugins' - officially supported plug-ins) :rtype: tuple :return: a pair of values: (line to import the package, the name of the control panel) """ plugin_dir = os.path.join(os.path.dirname(__file__), plugin_pack) plugin_path_pattern = os.path.join(plugin_dir, '*.py') modules = [ os.path.splitext(os.path.basename(py_full_path))[0] for py_full_path in glob.glob(plugin_path_pattern) ] for mod_name in modules: if mod_name == '__init__': continue absolute_module_name = f'{PLUGINS_PKG_PREFIX}.{plugin_pack}.{mod_name}' import_string = f'from .{plugin_pack} import {mod_name} as api' try: api = import_module(absolute_module_name) except ImportError as e: if ignore_errors: continue raise PluginImportError(f'Can not import {absolute_module_name} plugin') from e try: panel_class = api.PanelPlugin() # Try to detect panel panel_data = panel_class.get_cp_description() return import_string, panel_data['name'] except (AttributeError, TypeError, KeyError) as exc: syslog.syslog( syslog.LOG_WARNING, f'cpapi: Plugin {mod_name} does not satisfy cpapi requirements:\n{exc}', ) return None, None def getuser(): """ Get current user's username based on os.getuid() :rtype: str """ uid = os.getuid() if uid == 0: return 'root' return pwd.getpwuid(uid)[0] def _reinit_module(init_path): """create (or rewrite) an empty __init__.py and it's .pyc/.pyo parts""" with open(init_path, 'w', encoding='utf-8'): pass py_compile.compile(init_path) # In Py2 ".pyo" files can be generated only like this: subprocess.run( f'{sys.executable} -O -m py_compile "{init_path}"', shell=True, executable='/bin/bash', check=False, ) def rebuild_cache(plugins_dir=PLUGINS_DIR): CACHE_DIR = 'cache' PLUGINS_PATH = 'plugins' # directory with official supported plugins cache_dir = os.path.join(os.path.dirname(__file__), CACHE_DIR) # delete all in cache dir shutil.rmtree(cache_dir) if plugins_dir and os.path.isdir(plugins_dir): # copy all from plugins dir to cache shutil.copytree(plugins_dir, cache_dir) else: os.mkdir(cache_dir) os.chmod(cache_dir, 0o755) init_path = os.path.join(cache_dir, '__init__.py') _reinit_module(init_path) try: import_string, cpname = detectplugin(CACHE_DIR) except PluginImportError as exc: import_string, cpname = None, None print(f'WARNING: {exc}') if cpname is None: shutil.rmtree(cache_dir) os.mkdir(cache_dir) _reinit_module(init_path) import_string, cpname = detectplugin(PLUGINS_PATH, ignore_errors=True) if cpname is None: import_string, cpname = (UNKNOWN_CP_IMPORT, UNKNOWN_CP_NAME) print(f'WARNING: can not detect control panel; the control panel is set to "{cpname}"') # write control panel name to cache file if cpname: # write control panel name to /var/cache/controlpanelname with open(CACHE_CPNAME, 'w', encoding='utf-8') as f: f.write(cpname) # Write plugin import string for fast loading. # Example: # from .plugins import nopanel as api with open(API_LINK_PATH, 'w', encoding='utf-8') as f: f.write(import_string + '\n')