$32 GRAYBYTE WORDPRESS FILE MANAGER $21

SERVER : premium201.web-hosting.com #1 SMP Wed Mar 26 12:08:09 UTC 2025
SERVER IP : 104.21.15.130 | ADMIN IP 216.73.216.238
OPTIONS : CRL = ON | WGT = ON | SDO = OFF | PKEX = OFF
DEACTIVATED : NONE

/opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/plugins/generic/burster/

HOME
Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/plugins/generic/burster//profiler.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import enum
import os
import contextlib
import gc
import time
import statistics
import typing
from pathlib import Path
from contextlib import ExitStack
from collections import defaultdict
from enum import StrEnum
from dataclasses import dataclass
from typing import NamedTuple, Generator, Callable, TypeAlias, ContextManager, Self, Sequence, Iterable, Protocol

import sqlalchemy as sa

import psutil
import pytest
from _pytest.terminal import TerminalReporter

_db_metadata = sa.MetaData()
_measurements_table = sa.Table(
    'measurements',
    _db_metadata,
    sa.Column('sequence_id', sa.String, primary_key=True),
    sa.Column('timestamp', sa.Float, primary_key=True),
    sa.Column('cpu_usage', sa.Float, nullable=False),
    sa.Column('run_time_seconds', sa.Float, nullable=False),
    sa.Column('memory_allocated_bytes', sa.Integer, nullable=False),
    sa.Column('total_memory_bytes', sa.Integer, nullable=False),
)


class Measurement(NamedTuple):
    timestamp: float
    cpu_usage: float
    run_time_seconds: float
    memory_allocated_bytes: int
    total_memory_bytes: int

    def __str__(self) -> str:
        memory_allocations_mb = _bytes_to_mb(self.memory_allocated_bytes)
        total_memory_mb = _bytes_to_mb(self.total_memory_bytes)
        return '\n'.join([
            f'Time             : {self.run_time_seconds} sec',
            f'CPU usage        : {self.cpu_usage:.2f}%',
            f'Memory allocated : {memory_allocations_mb:.2f} mb',
            f'Total memory     : {total_memory_mb:.2f} mb',
        ])


assert {c.name for c in list(_measurements_table.columns)}.issuperset(Measurement._fields)


@dataclass
class Statistic:
    @classmethod
    def from_data(cls, data: Sequence[float]) -> Self:
        return Statistic(
            mean=statistics.mean(data),
            std_dev=statistics.stdev(data) if len(data) > 1 else 0,
            max_value=max(data),
            min_value=min(data)
        )

    mean: float
    std_dev: float
    max_value: float
    min_value: float


@dataclass(frozen=True, slots=True)
class StatisticSummarySection:
    @classmethod
    def from_statistic(cls, title: str, statistic: Statistic, units: str) -> Self:
        return cls(
            title=title,
            mean=f'{statistic.mean:.2f}{units}',
            std_dev=f'{statistic.std_dev:.2f}{units}',
            max_value=f'{statistic.max_value:.2f}{units}',
            min_value=f'{statistic.min_value:.2f}{units}',
        )

    @classmethod
    def empty(cls, title: str) -> Self:
        return cls(
            title=title,
            mean='N/A',
            std_dev='N/A',
            max_value='N/A',
            min_value='N/A',
        )

    title: str
    mean: str
    std_dev: str
    max_value: str
    min_value: str


@enum.unique
class _SummarySection(StrEnum):
    CPU_USAGE = 'CPU Usage'
    RUN_TIME = 'Run Time'
    MEMORY_ALLOCATED = 'Memory Allocated'
    TOTAL_MEMORY = 'Total Memory'


def _create_summary(sections: Iterable[StatisticSummarySection]) -> str:
    summary = ['Measurement Statistics Summary']
    summary.append('----------------------------')
    for section in sections:
        summary.append(section.title)
        summary.append(f'  Mean: {section.mean}')
        summary.append(f'  Std Dev: {section.std_dev}')
        summary.append(f'  Max: {section.max_value}')
        summary.append(f'  Min: {section.min_value}')

    return "\n".join(summary)


def _measurements_to_summary(measurements: Iterable[Measurement]) -> str:
    title_to_data = defaultdict[_SummarySection, list[float]](list)
    for measurement in measurements:
        title_to_data[_SummarySection.CPU_USAGE].append(measurement.cpu_usage)
        title_to_data[_SummarySection.RUN_TIME].append(measurement.run_time_seconds)
        title_to_data[_SummarySection.MEMORY_ALLOCATED].append(_bytes_to_mb(measurement.memory_allocated_bytes))
        title_to_data[_SummarySection.TOTAL_MEMORY].append(_bytes_to_mb(measurement.total_memory_bytes))

    sections = [StatisticSummarySection.from_statistic(
        title=title,
        statistic=Statistic.from_data(data),
        units={
            _SummarySection.CPU_USAGE: '%',
            _SummarySection.RUN_TIME: ' sec',
            _SummarySection.MEMORY_ALLOCATED: ' mb',
            _SummarySection.TOTAL_MEMORY: ' mb',
        }[title],
    ) for title, data in title_to_data.items()]
    return _create_summary(sections)


_empty_summary = _create_summary(StatisticSummarySection.empty(s.name) for s in _SummarySection)


Profiled: TypeAlias = Callable[[], ContextManager[None]]


def _get_uss_memory(process: psutil.Process) -> int:
    # NOTE(vlebedev): On the background of why USS is used, have a look at
    #                 https://gmpy.dev/blog/2016/real-process-memory-and-environ-in-python
    key = 'memory_full_info'
    result = process.as_dict(attrs=[key])[key].uss
    return result


def _bytes_to_mb(bytes_: int) -> float:
    return bytes_ / 1000000


class _Profiler:
    def __init__(self, measurements: list[Measurement]) -> None:
        self._pid = os.getpid()
        self._measurements = measurements

    @contextlib.contextmanager
    def __call__(self) -> Generator[None, None, None]:
        process = psutil.Process(self._pid)

        gc.disable()
        start_time = time.perf_counter()
        memory_before = _get_uss_memory(process)
        process.cpu_percent()
        try:
            yield
        finally:
            end_time = time.perf_counter()
            memory_after = _get_uss_memory(process)
            cpu_usage_after = process.cpu_percent()
            memory_delta_bytes = memory_after - memory_before
            measurement = Measurement(
                timestamp=time.time(),
                cpu_usage=cpu_usage_after,
                run_time_seconds=end_time - start_time,
                memory_allocated_bytes=memory_delta_bytes,
                total_memory_bytes=memory_after,
            )
            self._measurements.append(measurement)
            gc.enable()
            gc.collect()


class _WriteMeasurements(Protocol):
    def __call__(self, measurements: Iterable[Measurement]) -> None:
        ...


_MEASUREMENTS_KEY = 'measurements'
_MEASUREMENTS_ENABLED_FLAG = '--with-measurements'
_SAVE_MEASUREMENTS_FLAG = '--save-measurements'


class PytestProfilerPlugin:
    def pytest_configure(self, config: pytest.Config):
        if not config.getoption(_MEASUREMENTS_ENABLED_FLAG):
            @self._fixture(scope='function', name='profiled')
            def create_profiler_stub() -> Profiled:
                @contextlib.contextmanager
                def stub() -> Generator[None, None, None]:
                    yield
                return stub
            return

        if config.getoption(_SAVE_MEASUREMENTS_FLAG):
            @self._fixture(scope='function', name='_write_measurements')
            def create_measurements_writer(
                request: pytest.FixtureRequest,
            ) -> Generator[_WriteMeasurements, None, None]:
                db_file = Path(request.path).parent / 'measurements.db'
                sequence_id = request.node.nodeid

                with ExitStack() as deffer:
                    engine = sa.create_engine(f'sqlite+pysqlite:///{db_file}', echo=__debug__)
                    deffer.callback(engine.dispose)

                    conn = deffer.enter_context(engine.connect())

                    _db_metadata.create_all(conn)

                    def write_measurements(measurements: Iterable[Measurement]) -> None:
                        stmt = _measurements_table.insert(None).values([{
                            'sequence_id': sequence_id,
                            **m._asdict(),
                        } for m in measurements])
                        conn.execute(stmt)

                    yield write_measurements
        else:
            @self._fixture(scope='function', name='_write_measurements')
            def create_measurements_writer_stub() -> _WriteMeasurements:
                def stub(measurements: Iterable[Measurement]) -> None:
                    # NOTE(vlebedev): Make pylance happy by "using" all arguments.
                    del measurements
                return stub

        @self._fixture(scope='function', name='profiled')
        def profiler_created(
            record_property: Callable[[str, object], None],
            _write_measurements: _WriteMeasurements,
        ) -> Generator[Profiled, None, None]:
            measurements = []
            record_property(_MEASUREMENTS_KEY, measurements)
            yield _Profiler(measurements)
            _write_measurements(measurements)

    def pytest_addoption(self, parser: pytest.Parser):
        parser.addoption(
            _MEASUREMENTS_ENABLED_FLAG,
            action='store_true',
            default=False,
            help='Enable measurements collection during each test',
        )
        parser.addoption(
            _SAVE_MEASUREMENTS_FLAG,
            action='store_true',
            default=False,
            help='Save measurements to the database',
        )

    def pytest_report_teststatus(self, report: pytest.TestReport, config: pytest.Config):
        if not (config.getoption(_MEASUREMENTS_ENABLED_FLAG) and config.option.verbose >= 1):
            return
        if report.when != 'teardown':
            return

        tr: TerminalReporter = config.pluginmanager.get_plugin('terminalreporter')
        if tr is None:
            return

        try:
            measurements = typing.cast(
                list[Measurement],
                next(v for k, v in report.user_properties if k == _MEASUREMENTS_KEY),
            )
        except StopIteration:
            return

        try:
            summary = _measurements_to_summary(measurements)
        except ValueError:
            summary = _empty_summary

        tr.ensure_newline()
        tr.write_line(summary)

    def _fixture(self, *args, **kwargs):
        def wrapper(func):
            fixture = pytest.fixture(*args, **kwargs)(func)
            # NOTE(vlebedev): Without assigning fixture to some property,
            #                 pytest won't be able to find and use it.
            setattr(self, kwargs.get('name', func.__name__), fixture)
            return fixture
        return wrapper


Current_dir [ NOT WRITEABLE ] Document_root [ WRITEABLE ]


[ Back ]
NAME
SIZE
LAST TOUCH
USER
CAN-I?
FUNCTIONS
..
--
5 Mar 2026 11.53 PM
root / root
0755
__pycache__
--
5 Mar 2026 11.53 PM
root / root
0755
storage
--
5 Mar 2026 11.53 PM
root / root
0755
__init__.py
8.529 KB
17 Feb 2026 11.00 AM
root / root
0644
_logs.py
0.583 KB
17 Feb 2026 11.00 AM
root / root
0644
adjust.py
7.362 KB
17 Feb 2026 11.00 AM
root / root
0644
common.py
4.559 KB
17 Feb 2026 11.00 AM
root / root
0644
config.py
7.96 KB
17 Feb 2026 11.00 AM
root / root
0644
history.py
9.795 KB
17 Feb 2026 11.00 AM
root / root
0644
lve_sm.py
18.402 KB
17 Feb 2026 11.00 AM
root / root
0644
lves_tracker.py
8.925 KB
17 Feb 2026 11.00 AM
root / root
0644
notify.py
1.64 KB
17 Feb 2026 11.00 AM
root / root
0644
overload.py
3.231 KB
17 Feb 2026 11.00 AM
root / root
0644
profiler.py
10.299 KB
17 Feb 2026 11.00 AM
root / root
0644
utils.py
1.626 KB
17 Feb 2026 11.00 AM
root / root
0644

GRAYBYTE WORDPRESS FILE MANAGER @ 2025 CONTACT ME
Static GIF