관리-도구
편집 파일: client.py
import getpass import json import os import socket import time from typing import Union from .exceptions import InvalidArgumentError, JsonRpcClientError, MethodNotFoundError from .payload import Params, ParamsType, Payload DEFAULT_ROOT_SOCKET_PATH = "/var/run/hc_mds/root_mds.socket" DEFAULT_USER_SOCKET_PATH = "/var/run/hc_mds/user_mds.socket" DEFAULT_USER_TOKEN_FILE_FMT = ( "/var/cpanel/userdata/{}/hc_mds.token" # nosec bandit B105 ) RPC_VERSION = "2.0" class Client: def __init__( self, socket_path: Union[str, None] = None, token: Union[str, None] = None ): self._socket_path = socket_path self._token = token self._transport = None @classmethod def is_root(cls) -> bool: return os.geteuid() == 0 @property def token(self): if not self._token and not self.is_root(): with open( DEFAULT_USER_TOKEN_FILE_FMT.format(getpass.getuser()), "r", encoding="utf-8", ) as f: self._token = f.read().strip() return self._token @classmethod def _get_socket_path(cls) -> str: if cls.is_root(): return DEFAULT_ROOT_SOCKET_PATH return DEFAULT_USER_SOCKET_PATH @property def socket_path(self) -> str: if not self._socket_path: self._socket_path = self._get_socket_path() return self._socket_path def _prepare_payload( self, method: str, params: Union[ParamsType, "Params"] ) -> dict: if not isinstance(params, Params): _params = Params.from_dict(params) else: _params = params.model_copy(deep=True) _params.metadata.token = self.token # type: ignore return Payload( method=method, params=_params, # type: ignore jsonrpc=RPC_VERSION, id=int(time.time()), ).model_dump() @staticmethod def _encode_json(payload: dict) -> bytes: return json.dumps(payload).encode("utf-8") @staticmethod def _decode_json(payload: bytes) -> dict: return json.loads(payload.decode("utf-8")) @property def transport(self): if not self._transport: self._transport = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._transport.connect(self.socket_path) return self._transport def _execute(self, payload: dict) -> dict: self.transport.sendall(self._encode_json(payload)) response = self.transport.recv(1024) return self._decode_json(response) @staticmethod def _validate_response(response: dict) -> None: if error := response.get("error", None): if error["code"] == -32601: raise MethodNotFoundError(error["message"]) if error["code"] == -32602: raise InvalidArgumentError(error["message"]) raise JsonRpcClientError(response["error"]["message"]) def send(self, params: Union[ParamsType, "Params"], method: str = "event") -> dict: response = self._execute(self._prepare_payload(method, params)) self._validate_response(response) return response