⚝
One Hat Cyber Team
⚝
Your IP:
216.73.216.177
Server IP:
50.6.168.112
Server:
Linux server-617809.webnetzimbabwe.com 5.14.0-570.25.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Jul 9 04:57:09 EDT 2025 x86_64
Server Software:
Apache
PHP Version:
8.4.10
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
usr
/
libexec
/
kcare
/
python
/
kcarectl
/
View File Name :
platform_utils.py
# Copyright (c) Cloud Linux Software, Inc # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT import base64 import json import os import platform import socket import sys from . import constants from . import config from . import log_utils from . import utils from . import process_utils from . import selinux if False: # pragma: no cover from typing import Optional, Any # noqa: F401 VIRTWHAT = '/usr/libexec/kcare/virt-what' def get_distro(): if sys.version_info[:2] < (3, 6): # pragma: no py3 cover return platform.linux_distribution() else: # pragma: no distro cover import distro return distro.linux_distribution(full_distribution_name=False) @utils.cached def get_system_uname(): return platform.uname()[2] def get_python_version(): # type: () -> str return '%s.%s' % (sys.version_info[0], sys.version_info[1]) def app_info(is_json=False): # type: (bool) -> str info = { 'python_version': get_python_version(), 'agent_version': constants.VERSION, } if selinux.is_selinux_enabled(): rc, stdout, stderr = process_utils.run_command(['ps', '-Z', '--no-headers', '--pid', str(os.getpid())], catch_stdout=True) if not rc: selinux_context = stdout.split()[0] else: selinux_context = 'error: %s' % stderr info['selinux_context'] = selinux_context if is_json: return json.dumps(info) info_keys = sorted(info) info_str = '' for info_key in info_keys: info_str += '%s: %s\n' % (info_key, info[info_key]) return info_str.rstrip() EFIVARS_PATH = '/sys/firmware/efi/efivars' EFI_VENDORS = { 'global': '8be4df61-93ca-11d2-aa0d-00e098032b8c', 'shim': '605dab50-e046-4300-abb6-3dd810dd8b23', } def _read_uefi_var(name, vendor, max_bytes=256): # type: (str, str, Optional[int]) -> Optional[bytes] var_path = os.path.join(EFIVARS_PATH, '%s-%s' % (name, vendor)) if not os.path.exists(var_path): return None with open(var_path, 'rb') as var: return var.read(max_bytes) def is_secure_boot(): # mocked: tests/unit/test_load_kmod.py # type: () -> bool try: secure_boot_var = _read_uefi_var('SecureBoot', EFI_VENDORS['global']) if secure_boot_var: return secure_boot_var[-1:] == b'\x01' # Get last byte except Exception: # pragma: no cover pass return False def _get_uefi_var_encoded(name, vendor): # type: (str, str) -> Optional[str] try: value_bytes = _read_uefi_var(name, vendor) if value_bytes is None: return None except Exception as e: value_bytes = str(e).encode() return utils.nstr(base64.urlsafe_b64encode(value_bytes)) def secure_boot_info(): # type: () -> dict[str, Any] cmdline = utils.try_to_read('/proc/cmdline') if cmdline and len(cmdline) > 1024: # pragma: no cover cmdline = cmdline[:1024] info = {'cmdline': cmdline, 'has_efi': os.path.exists(os.path.dirname(EFIVARS_PATH))} # type: dict[str, Any] if not info['has_efi']: return info try: info['global'] = dict((var, _get_uefi_var_encoded(var, EFI_VENDORS['global'])) for var in ('SecureBoot', 'SetupMode')) shim_vars = sorted( [var[0 : -len(EFI_VENDORS['shim']) - 1] for var in os.listdir(EFIVARS_PATH) if var.endswith(EFI_VENDORS['shim'])] ) info['shim'] = {'vars': shim_vars} shim_exclude_vars = set(['MokListRT', 'MokListXRT', 'MokListTrustedRT', 'SbatLevelRT']) for var in shim_vars: if var in ('HSIStatus', 'MokIgnoreDB') or (var.endswith('RT') and var not in shim_exclude_vars): info['shim'][var] = _get_uefi_var_encoded(var, EFI_VENDORS['shim']) except Exception as err: log_utils.logwarn(err) return info @utils.cached def get_hostname(): # type: () -> str # KCARE-1165 If fqdn gathering is forced if config.REPORT_FQDN: try: # getaddrinfo() -> [(family, socktypeget_hostname, proto, canonname, sockaddr), ...] hostname = socket.getaddrinfo(socket.gethostname(), 0, 0, 0, 0, socket.AI_CANONNAME)[0][3] except socket.gaierror as ge: log_utils.logerror(ge) hostname = platform.node() else: hostname = platform.node() return hostname @utils.cached def get_uptime(): # type: () -> str if os.path.isfile('/proc/uptime'): f = open('/proc/uptime', 'r') line = f.readline() result = str(int(float(line.split()[0]))) f.close() return result return '-1' @utils.cached def get_virt(): if os.path.isfile(VIRTWHAT): return process_utils.check_output([VIRTWHAT]).strip() return 'no-virt-what' # pragma: no cover def is_cpanel(): return os.path.isfile('/usr/local/cpanel/cpanel') def inside_vz_container(): # mocked: tests/unit/test_load_kmod.py return os.path.exists('/proc/vz/veinfo') and not os.path.exists('/proc/vz/version') def inside_lxc_container(): # mocked: tests/unit/test_load_kmod.py return '/lxc/' in open('/proc/1/cgroup').read() def inside_docker_container(): # mocked: tests/unit/test_load_kmod.py return os.path.isfile('/.dockerenv')