Skip to content

Commit

Permalink
added network_manager and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Neagu committed Mar 25, 2024
1 parent 53544b3 commit e7b8284
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 21 deletions.
113 changes: 103 additions & 10 deletions docker/activity_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@

LISTEN_PORT: Final[int] = 19597

CHECK_INTERVAL_S: Final[float] = 5
KERNEL_CHECK_INTERVAL_S: Final[float] = 5
CHECK_INTERVAL_S: Final[float] = 1
THREAD_EXECUTOR_WORKERS: Final[int] = 10

BUSY_USAGE_THRESHOLD_CPU: Final[float] = 5 # percent in range [0, 100]
BUSY_USAGE_THRESHOLD_DISK_READ: Final[int] = 0 # in bytes
BUSY_USAGE_THRESHOLD_DISK_WRITE: Final[int] = 0 # in bytes
BUSY_USAGE_THRESHOLD_NETWORK_RECEIVED: Final[int] = 0 # in bytes
BUSY_USAGE_THRESHOLD_NETWORK_SENT: Final[int] = 0 # in bytes


# Utilities
Expand Down Expand Up @@ -152,7 +155,7 @@ def __init__(self, poll_interval: float, *, busy_threshold: float):
self._last_sample: dict[ProcessID, tuple[TimeSeconds, PercentCPU]] = (
self._sample_total_cpu_usage()
)
self.total_cpu_usage: float = 0
self.total_cpu_usage: PercentCPU = 0

@staticmethod
def _sample_cpu_usage(
Expand Down Expand Up @@ -210,8 +213,8 @@ def __init__(
self,
poll_interval: float,
*,
read_usage_threshold: float,
write_usage_threshold: float,
read_usage_threshold: int,
write_usage_threshold: int,
):
super().__init__(poll_interval=poll_interval)
self.read_usage_threshold = read_usage_threshold
Expand All @@ -221,8 +224,8 @@ def __init__(
ProcessID, tuple[TimeSeconds, BytesRead, BytesWrite]
] = self._sample_total_disk_usage()

self.total_bytes_read: int = 0
self.total_bytes_write: int = 0
self.total_bytes_read: BytesRead = 0
self.total_bytes_write: BytesWrite = 0

@staticmethod
def _sample_disk_usage(
Expand Down Expand Up @@ -284,16 +287,92 @@ def _check_if_busy(self) -> bool:
)


InterfaceName: TypeAlias = str
BytesReceived: TypeAlias = int
BytesSent: TypeAlias = int


class NetworkUsageMonitor(AbstractIsBusyMonitor):
_EXCLUDE_INTERFACES: set[InterfaceName] = {
"lo",
}

def __init__(
self,
poll_interval: float,
*,
received_usage_threshold: int,
sent_usage_threshold: int,
):
super().__init__(poll_interval=poll_interval)
self.received_usage_threshold = received_usage_threshold
self.sent_usage_threshold = sent_usage_threshold

self._last_sample: tuple[TimeSeconds, BytesReceived, BytesSent] = (
self._sample_total_network_usage()
)
self.bytes_received: BytesReceived = 0
self.bytes_sent: BytesSent = 0

def _sample_total_network_usage(
self,
) -> tuple[TimeSeconds, BytesReceived, BytesSent]:
net_io_counters = psutil.net_io_counters(pernic=True)

total_bytes_received: int = 0
total_bytes_sent: int = 0
for nic, stats in net_io_counters.items():
if nic in self._EXCLUDE_INTERFACES:
continue

total_bytes_received += stats.bytes_recv
total_bytes_sent += stats.bytes_sent

return time.time(), total_bytes_received, total_bytes_sent

@staticmethod
def _get_bytes_over_one_second(
last: tuple[TimeSeconds, BytesReceived, BytesSent],
current: tuple[TimeSeconds, BytesReceived, BytesSent],
) -> tuple[BytesReceived, BytesSent]:
interval = current[0] - last[0]
measured_bytes_received_in_interval = current[1] - last[1]
measured_bytes_sent_in_interval = current[2] - last[2]

# bytes_*_1_second[%] = 1[s] * measured_bytes_*_in_interval[%] / interval[s]
bytes_received_over_1_second = int(
measured_bytes_received_in_interval / interval
)
bytes_sent_over_1_second = int(measured_bytes_sent_in_interval / interval)
return bytes_received_over_1_second, bytes_sent_over_1_second

def _update_total_network_usage(self) -> None:
current_sample = self._sample_total_network_usage()

bytes_received, bytes_sent = self._get_bytes_over_one_second(
self._last_sample, current_sample
)

self._last_sample = current_sample # replace

self.bytes_received = bytes_received
self.bytes_sent = bytes_sent

def _check_if_busy(self) -> bool:
self._update_total_network_usage()
return (
self.bytes_received > self.received_usage_threshold
or self.bytes_sent > self.sent_usage_threshold
)


class ActivityManager:
def __init__(self, interval: float) -> None:
self.interval = interval
self.last_idle: datetime | None = None

self.jupyter_kernel_monitor = JupyterKernelMonitor(CHECK_INTERVAL_S)
self.jupyter_kernel_monitor = JupyterKernelMonitor(KERNEL_CHECK_INTERVAL_S)
self.cpu_usage_monitor = CPUUsageMonitor(
# TODO: interval could be 1 second now
CHECK_INTERVAL_S,
busy_threshold=BUSY_USAGE_THRESHOLD_CPU,
)
Expand All @@ -302,12 +381,18 @@ def __init__(self, interval: float) -> None:
read_usage_threshold=BUSY_USAGE_THRESHOLD_DISK_READ,
write_usage_threshold=BUSY_USAGE_THRESHOLD_DISK_WRITE,
)
self.network_monitor = NetworkUsageMonitor(
CHECK_INTERVAL_S,
received_usage_threshold=BUSY_USAGE_THRESHOLD_NETWORK_RECEIVED,
sent_usage_threshold=BUSY_USAGE_THRESHOLD_NETWORK_SENT,
)

def check(self):
is_busy = (
self.jupyter_kernel_monitor.is_busy
or self.cpu_usage_monitor.is_busy
or self.disk_usage_monitor.is_busy
or self.network_monitor.is_busy
)

if is_busy:
Expand All @@ -327,6 +412,7 @@ async def run(self):
self.jupyter_kernel_monitor.start()
self.cpu_usage_monitor.start()
self.disk_usage_monitor.start()
self.network_monitor.start()
while True:
with suppress(Exception):
self.check()
Expand All @@ -350,8 +436,15 @@ async def get(self):
"disk_usage": {
"is_busy": self.activity_manager.disk_usage_monitor.is_busy,
"total": {
"bytes_read": self.activity_manager.disk_usage_monitor.total_bytes_read,
"bytes_write": self.activity_manager.disk_usage_monitor.total_bytes_write,
"bytes_read_per_second": self.activity_manager.disk_usage_monitor.total_bytes_read,
"bytes_write_per_second": self.activity_manager.disk_usage_monitor.total_bytes_write,
},
},
"network_usage": {
"is_busy": self.activity_manager.network_monitor.is_busy,
"total": {
"bytes_received_per_second": self.activity_manager.network_monitor.bytes_received,
"bytes_sent_per_second": self.activity_manager.network_monitor.bytes_sent,
},
},
"kernel_monitor": {
Expand Down
75 changes: 64 additions & 11 deletions tests/test_activity_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import requests_mock

from queue import Queue
from typing import Callable, Iterable, TYPE_CHECKING
from typing import Callable, Final, Iterable, TYPE_CHECKING
from pytest_mock import MockFixture
from tenacity import AsyncRetrying
from tenacity.stop import stop_after_delay
Expand Down Expand Up @@ -127,6 +127,52 @@ async def test_disk_usage_monitor_still_busy(
assert disk_usage_monitor.is_busy is True


async def test_network_usage_monitor_not_busy(
socket_server: None,
mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]],
create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator],
):
activity_generator = create_activity_generator(network=False, cpu=False, disk=False)
mock__get_sibling_processes([activity_generator.get_pid()])

with activity_monitor.NetworkUsageMonitor(
0.5, received_usage_threshold=0, sent_usage_threshold=0
) as network_usage_monitor:
async for attempt in AsyncRetrying(
stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True
):
with attempt:
assert network_usage_monitor.bytes_received == 0
assert network_usage_monitor.bytes_sent == 0
assert network_usage_monitor.is_busy is False


@pytest.fixture
def mock_network_monitor_exclude_interfaces(mocker: MockFixture) -> None:
mocker.patch("activity_monitor.NetworkUsageMonitor._EXCLUDE_INTERFACES", new=set())
assert activity_monitor.NetworkUsageMonitor._EXCLUDE_INTERFACES == set()


async def test_network_usage_monitor_still_busy(
mock_network_monitor_exclude_interfaces: None,
socket_server: None,
mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]],
create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator],
):
activity_generator = create_activity_generator(network=True, cpu=False, disk=False)
mock__get_sibling_processes([activity_generator.get_pid()])

with activity_monitor.NetworkUsageMonitor(
0.5, received_usage_threshold=0, sent_usage_threshold=0
) as network_usage_monitor:
# wait for monitor to trigger
await asyncio.sleep(1)

assert network_usage_monitor.bytes_received > 0
assert network_usage_monitor.bytes_sent > 0
assert network_usage_monitor.is_busy is True


@pytest.fixture
def mock_jupyter_kernel_monitor(are_kernels_busy: bool) -> Iterable[None]:
with requests_mock.Mocker(real_http=True) as m:
Expand Down Expand Up @@ -199,23 +245,29 @@ def _queue_stopper() -> None:
requests.get(f"{server_url}/", timeout=1)


@pytest.fixture
def mock_check_interval(mocker: MockFixture) -> None:
mocker.patch("activity_monitor.CHECK_INTERVAL_S", new=1)
assert activity_monitor.CHECK_INTERVAL_S == 1


@pytest.mark.parametrize("are_kernels_busy", [False])
async def test_tornado_server_ok(
mock_check_interval: None, tornado_server: None, server_url: str
):
async def test_tornado_server_ok(tornado_server: None, server_url: str):
result = requests.get(f"{server_url}/", timeout=5)
assert result.status_code == 200


_BIG_THRESHOLD: Final[int] = int(1e10)


@pytest.fixture
def mock_activity_manager_config(mocker: MockFixture) -> None:
mocker.patch("activity_monitor.CHECK_INTERVAL_S", 1)
mocker.patch("activity_monitor.KERNEL_CHECK_INTERVAL_S", 1)

mocker.patch(
"activity_monitor.BUSY_USAGE_THRESHOLD_NETWORK_RECEIVED", _BIG_THRESHOLD
)
mocker.patch("activity_monitor.BUSY_USAGE_THRESHOLD_NETWORK_SENT", _BIG_THRESHOLD)


@pytest.mark.parametrize("are_kernels_busy", [False])
async def test_activity_monitor_becomes_not_busy(
mock_check_interval: None,
mock_activity_manager_config: None,
socket_server: None,
mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]],
create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator],
Expand All @@ -236,6 +288,7 @@ async def test_activity_monitor_becomes_not_busy(
assert debug_response["cpu_usage"]["is_busy"] is False
assert debug_response["disk_usage"]["is_busy"] is False
assert debug_response["kernel_monitor"]["is_busy"] is False
assert debug_response["network_usage"]["is_busy"] is False

result = requests.get(f"{server_url}/", timeout=2)
assert result.status_code == 200
Expand Down

0 comments on commit e7b8284

Please sign in to comment.