1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- from i3pystatus import IntervalModule
- from i3pystatus.core.util import round_dict
- import psutil
-
-
- class NetworkTraffic(IntervalModule):
- """
- Network traffic for all interfaces, i.e.,
- packets/bytes sent/received per second.
-
- Requires the PyPI packages `psutil`.
-
- .. rubric:: Available formatters
-
- * `{interface}` — the configured network interface
- * `{bytes_sent}` — bytes sent per second (divided by divisor)
- * `{bytes_recv}` — bytes received per second (divided by divisor)
- * `{packets_sent}` — bytes sent per second (divided by divisor)
- * `{packets_recv}` — bytes received per second (divided by divisor)
- """
-
- interval = 1
- settings = (
- ("format", "format string"),
- ("interfaces", "network interfaces"),
- ("divisor", "divide all byte values by this value"),
- ("round_size", "defines number of digits in round"),
- ("color", "the default color"),
- )
-
- format = "All: \u2197{bytes_sent}kB/s \u2198{bytes_recv}kB/s"
- interfaces = ["eth0"]
- divisor = 1024
- round_size = None
- color = "#FFFFFF"
-
- pnic = None
- pnic_before = None
-
- def update_counters(self):
- self.pnic_before = self.pnic
- counters = psutil.net_io_counters(pernic=True)
- self.pnic = counters
-
- def get_bytes_sent(self, interface):
- if interface not in self.pnic or interface not in self.pnic_before:
- return 0
- return (self.pnic[interface].bytes_sent -
- self.pnic_before[interface].bytes_sent) / self.divisor
-
- def get_bytes_received(self, interface):
- if interface not in self.pnic or interface not in self.pnic_before:
- return 0
- return (self.pnic[interface].bytes_recv -
- self.pnic_before[interface].bytes_recv) / self.divisor
-
- def get_packets_sent(self, interface):
- if interface not in self.pnic or interface not in self.pnic_before:
- return 0
- return (self.pnic[interface].packets_sent -
- self.pnic_before[interface].packets_sent) / self.divisor
-
- def get_packets_received(self, interface):
- if interface not in self.pnic or interface not in self.pnic_before:
- return 0
- return (self.pnic[interface].packets_recv -
- self.pnic_before[interface].packets_recv) / self.divisor
-
- def sysfs_interface_up(self, interface):
- try:
- sysfs = "/sys/class/net/{}/operstate".format(interface)
- with open(sysfs) as operstate:
- status = operstate.read().strip()
- return status == "up" or status == "unknown"
- except FileNotFoundError:
- return False
-
- def run(self):
- self.update_counters()
- cdict = {
- "bytes_sent": float(0),
- "bytes_recv": float(0),
- "packets_sent": float(0),
- "packets_recv": float(0),
- }
- for interface in self.interfaces:
- if self.sysfs_interface_up(interface):
- if not self.pnic_before or interface not in self.pnic_before:
- continue
- cdict["bytes_sent"] = self.get_bytes_sent(interface)
- cdict["bytes_recv"] = self.get_bytes_received(interface)
- cdict["packets_sent"] = self.get_packets_sent(interface)
- cdict["packets_recv"] = self.get_packets_received(interface)
- round_dict(cdict, self.round_size)
- self.output = {
- "full_text": self.format.format(**cdict),
- "color": self.color
- }
|