289 lines
9.4 KiB
Python
289 lines
9.4 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from argparse import ArgumentParser, Namespace
|
|
import re
|
|
from typing import Any, List, Dict, Tuple, Optional
|
|
import shelve
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
__PREV_DATA__ = '{}/.ethMonCache'.format(Path.home().as_posix())
|
|
__total__ = 'tot_{}'
|
|
__old__ = 'old_{}'
|
|
|
|
__scale__ = {
|
|
'KB': lambda x: x / 1000,
|
|
'Kb': lambda x: (x*8) / 1000,
|
|
'MB': lambda x: x / 1000**2,
|
|
'Mb': lambda x: (x*8) / 1000**2,
|
|
'GB': lambda x: x / 1000**3,
|
|
'Gb': lambda x: (x*8) / 1000**3
|
|
}
|
|
|
|
__to_bytes__ = {
|
|
'KB': lambda x: x * 1000,
|
|
'Kb': lambda x: (x * 1000) / 8,
|
|
'MB': lambda x: x * 1000**2,
|
|
'Mb': lambda x: (x * 1000**2) / 8,
|
|
'GB': lambda x: x * 1000**3,
|
|
'Gb': lambda x: (x * 1000**3) / 8
|
|
}
|
|
|
|
|
|
def options_parser() -> Namespace:
|
|
_scalers = [i for i in __scale__.keys()]
|
|
parser = ArgumentParser('ethMon')
|
|
parser.add_argument('-i', '--interface', required=True, type=str, help='Network interface')
|
|
parser.add_argument('-w', '--warning', type=str, required=True, help='Warning threshold')
|
|
parser.add_argument('-c', '--critical', type=str, required=True, help='Critical threshold')
|
|
parser.add_argument('-s', '--scale', choices=_scalers, help='Scaled results {}'.format(_scalers))
|
|
parser.add_argument('--interval', type=int, help='Interval between the checks (in seconds)')
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def threshold_spec(threshold: str) -> bool:
|
|
"""
|
|
Check for ranges specified in the warning/critical options
|
|
"""
|
|
try:
|
|
x, y = threshold.split(':')
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
def to_int(x: str) -> int:
|
|
"""
|
|
Try to convert a value to integer. Always returns zero on unsuccessfull conversion
|
|
|
|
:param x: Any
|
|
"""
|
|
try:
|
|
return int(x)
|
|
except:
|
|
return 0
|
|
|
|
def threshold_extract(threshold: str) -> Tuple[int, int, bool]:
|
|
"""
|
|
Extract the range and the inner/outer parameter
|
|
|
|
|
|
:return: upper (int), lower (int), inner (bool)
|
|
"""
|
|
lower, upper = threshold.split(':')
|
|
if lower == '':
|
|
lower = 0
|
|
if upper == '':
|
|
""" Set the upper limit to a very large number if not specified """
|
|
upper = 10*10**256
|
|
if lower.startswith('@'):
|
|
lower = lower.lstrip('@')
|
|
inner = True
|
|
else:
|
|
inner = False
|
|
|
|
if to_int(lower) > to_int(upper):
|
|
raise ValueError('Invalid value in threshold specification. lower > upper')
|
|
return to_int(lower), to_int(upper), inner
|
|
|
|
|
|
def get_old_data(iface: str) -> Tuple[int, int, int, int]:
|
|
"""
|
|
Get the old interface data from the storage (if available)
|
|
|
|
:param iface: The name of the interface
|
|
:return: RX bytes, TX bytes, RX Total, TX Total values from the previous run
|
|
"""
|
|
storage = shelve.open(__PREV_DATA__)
|
|
rx, tx = storage.get(__old__.format(iface), (0, 0))
|
|
rx_total, tx_total = storage.get(__total__.format(iface), (0, 0))
|
|
storage.close()
|
|
return rx, tx, rx_total, tx_total
|
|
|
|
|
|
def update_stats(iface: str, rx_bytes: int, tx_bytes: int) -> None:
|
|
"""
|
|
Store the data from the current run
|
|
|
|
:param iface: The name of the interface
|
|
:param rx_bytes: RX bytes as read from the net/dev file
|
|
:param tx_bytes: TX bytes values as read from the net/dev file
|
|
:return:
|
|
"""
|
|
storage = shelve.open(__PREV_DATA__)
|
|
rx_total, tx_total = storage.get(__total__.format(iface), (0, 0))
|
|
storage[__total__.format(iface)] = (rx_total + rx_bytes, tx_total + tx_bytes)
|
|
storage[__old__.format(iface)] = (rx_bytes, tx_bytes)
|
|
storage.close()
|
|
return
|
|
|
|
|
|
def get_iface_stats(iface: str) -> Tuple[int, int]:
|
|
"""
|
|
Extract the interface statistics
|
|
|
|
:param iface:
|
|
:return:
|
|
"""
|
|
"""
|
|
RX/TX slots
|
|
bytes packets errs drop fifo frame compressed multicast
|
|
"""
|
|
_rx_bytes = 0
|
|
_tx_bytes = 0
|
|
slots = []
|
|
with open('/proc/net/dev', 'r') as stat:
|
|
for line in stat:
|
|
if iface not in line:
|
|
continue
|
|
line = line.rstrip()
|
|
line = re.sub(r'\s\s+', ' ', line)
|
|
line = line.lstrip(' ')
|
|
slots = line.split(' ')
|
|
if len(slots) > 10:
|
|
_rx_bytes = int(slots[1])
|
|
_tx_bytes = int(slots[9])
|
|
return _rx_bytes, _tx_bytes
|
|
|
|
|
|
def speed_calc(old_data: tuple, current_data: tuple) -> Tuple[int, int]:
|
|
"""
|
|
Calculate the changes between the old and new data
|
|
|
|
:param data:
|
|
:param current_data:
|
|
:return:
|
|
"""
|
|
old_rx, old_tx = old_data[:2]
|
|
cur_rx, cur_tx = current_data
|
|
|
|
return cur_rx - old_rx, cur_tx - old_tx
|
|
|
|
|
|
def speed_scaler(val: int, scaler: str) -> str:
|
|
"""
|
|
available ['KB', 'Kb', 'MB', 'Mb', 'GB', 'Gb']
|
|
"""
|
|
if scaler not in __scale__.keys():
|
|
raise
|
|
return str(round(__scale__[scaler](val), 3))
|
|
|
|
|
|
def speed_normalizer(val: int, scaler: str) -> int:
|
|
"""
|
|
available ['KB', 'Kb', 'MB', 'Mb', 'GB', 'Gb']
|
|
"""
|
|
if scaler not in __to_bytes__.keys():
|
|
raise
|
|
return int(__to_bytes__[scaler](val))
|
|
|
|
|
|
def final_string(rx_s: int, tx_s: int, warning_s: int, crit_s: int, code: int):
|
|
status = 'OK'
|
|
if code == 2:
|
|
status = 'CRITICAL'
|
|
elif code == 1:
|
|
status = 'WARNING'
|
|
|
|
return '{} bandwidth utilization | rx={}B;{}B;{}B tx={}B;{}B;{}B'.format(
|
|
status, rx_s, warning_s, crit_s, tx_s, warning_s, crit_s
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
options = options_parser()
|
|
old = get_old_data(options.interface)
|
|
exit_c = 0
|
|
if threshold_spec(options.warning) or threshold_spec(options.critical):
|
|
use_thresholds = True
|
|
try:
|
|
warn_lower, warn_upper, warn_inner = threshold_extract(options.warning)
|
|
crit_lower, crit_upper, crit_inner = threshold_extract(options.critical)
|
|
except ValueError:
|
|
print('Invalid range specification.')
|
|
sys.exit(100)
|
|
else:
|
|
use_thresholds = False
|
|
|
|
current = get_iface_stats(options.interface)
|
|
rx_speed, tx_speed = speed_calc(old, current)
|
|
|
|
if options.interval:
|
|
interval_scaler = lambda x: int(x / options.interval)
|
|
rx_speed = interval_scaler(rx_speed)
|
|
tx_speed = interval_scaler(tx_speed)
|
|
|
|
update_stats(options.interface, *current)
|
|
|
|
if options.scale:
|
|
_suffix = ' {}'.format(options.scale)
|
|
c_0 = speed_scaler(current[0], options.scale) + _suffix
|
|
c_1 = speed_scaler(current[1], options.scale) + _suffix
|
|
rx_s = speed_scaler(rx_speed, options.scale) + _suffix
|
|
tx_s = speed_scaler(tx_speed, options.scale) + _suffix
|
|
|
|
if use_thresholds is False:
|
|
warning_s = speed_normalizer(to_int(options.warning), options.scale)
|
|
if warning_s < rx_speed or warning_s < tx_speed:
|
|
exit_c = 1
|
|
critical_s = speed_normalizer(to_int(options.critical), options.scale)
|
|
if critical_s < rx_speed or critical_s < tx_speed:
|
|
exit_c = 2
|
|
else:
|
|
warn_s_low = speed_normalizer(warn_lower, options.scale)
|
|
warn_s_up = speed_normalizer(warn_upper, options.scale)
|
|
crit_s_low = speed_normalizer(crit_lower, options.scale)
|
|
crit_s_up = speed_normalizer(crit_upper, options.scale)
|
|
|
|
if warn_inner is True:
|
|
if (warn_s_low <= rx_speed <= warn_s_up) or (warn_s_low <= tx_speed <= warn_s_up):
|
|
exit_c = 1
|
|
else:
|
|
if (warn_s_low >= rx_speed or warn_s_up <= rx_speed) or (warn_s_low >= tx_speed or warn_s_up <= tx_speed):
|
|
exit_c = 1
|
|
if crit_inner is True:
|
|
if (crit_s_low <= rx_speed <= crit_s_up) or (crit_s_low <= tx_speed <= crit_s_up):
|
|
exit_c = 2
|
|
else:
|
|
if (crit_s_low >= rx_speed or crit_s_up <= rx_speed) or (crit_s_low >= tx_speed or crit_s_up <= tx_speed):
|
|
exit_c = 2
|
|
warning_s = warn_s_low
|
|
critical_s = crit_s_low
|
|
|
|
|
|
print('RX {}: {}, TX {}: {}; RX speed: {} TX speed: {}'.format(
|
|
options.scale, c_0, options.scale, c_1, rx_s, tx_s
|
|
), end='; ')
|
|
print(final_string(rx_speed, tx_speed, warning_s, critical_s, exit_c))
|
|
|
|
else:
|
|
print('RX bytes: {}, TX bytes: {}; RX speed: {}, TX speed {}'.format(
|
|
current[0], rx_speed, current[1], tx_speed
|
|
), end='; ')
|
|
if use_thresholds is False:
|
|
warning_s = to_int(options.warning)
|
|
if warning_s < rx_speed or warning_s < tx_speed:
|
|
exit_c = 1
|
|
critical_s = to_int(options.critical)
|
|
if critical_s < rx_speed or critical_s <= tx_speed:
|
|
exit_c = 2
|
|
else:
|
|
if crit_inner is True:
|
|
if (warn_lower <= rx_speed <= warn_upper) or (warn_lower <= tx_speed <= warn_upper):
|
|
exit_c = 1
|
|
else:
|
|
if (warn_lower >= rx_speed or warn_upper <= rx_speed) or (warn_lower >= tx_speed or warn_upper <= rx_speed):
|
|
exit_c = 1
|
|
if crit_inner is True:
|
|
if (crit_lower <= rx_speed <= crit_upper) or (crit_lower <= tx_speed <= crit_upper):
|
|
exit_c = 2
|
|
else:
|
|
if (crit_lower >= rx_speed or crit_upper <= rx_speed) or (crit_lower >= tx_speed or crit_upper <= tx_speed):
|
|
exit_c = 2
|
|
warning_s = warn_lower
|
|
critical_s = crit_lower
|
|
|
|
print(final_string(rx_speed, tx_speed, warning_s, critical_s, exit_c))
|
|
|
|
sys.exit(exit_c)
|