network_traffic.py 3.5KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. from i3pystatus import IntervalModule
  2. from i3pystatus.core.util import round_dict
  3. import psutil
  4. class NetworkTraffic(IntervalModule):
  5. """
  6. Network traffic for all interfaces, i.e.,
  7. packets/bytes sent/received per second.
  8. Requires the PyPI packages `psutil`.
  9. .. rubric:: Available formatters
  10. * `{interface}` — the configured network interface
  11. * `{bytes_sent}` — bytes sent per second (divided by divisor)
  12. * `{bytes_recv}` — bytes received per second (divided by divisor)
  13. * `{packets_sent}` — bytes sent per second (divided by divisor)
  14. * `{packets_recv}` — bytes received per second (divided by divisor)
  15. """
  16. interval = 1
  17. settings = (
  18. ("format", "format string"),
  19. ("interfaces", "network interfaces"),
  20. ("divisor", "divide all byte values by this value"),
  21. ("round_size", "defines number of digits in round"),
  22. ("color", "the default color"),
  23. )
  24. format = "All: \u2197{bytes_sent}kB/s \u2198{bytes_recv}kB/s"
  25. interfaces = ["eth0"]
  26. divisor = 1024
  27. round_size = None
  28. color = "#FFFFFF"
  29. pnic = None
  30. pnic_before = None
  31. def update_counters(self):
  32. self.pnic_before = self.pnic
  33. counters = psutil.net_io_counters(pernic=True)
  34. self.pnic = counters
  35. def get_bytes_sent(self, interface):
  36. if interface not in self.pnic or interface not in self.pnic_before:
  37. return 0
  38. return (self.pnic[interface].bytes_sent -
  39. self.pnic_before[interface].bytes_sent) / self.divisor
  40. def get_bytes_received(self, interface):
  41. if interface not in self.pnic or interface not in self.pnic_before:
  42. return 0
  43. return (self.pnic[interface].bytes_recv -
  44. self.pnic_before[interface].bytes_recv) / self.divisor
  45. def get_packets_sent(self, interface):
  46. if interface not in self.pnic or interface not in self.pnic_before:
  47. return 0
  48. return (self.pnic[interface].packets_sent -
  49. self.pnic_before[interface].packets_sent) / self.divisor
  50. def get_packets_received(self, interface):
  51. if interface not in self.pnic or interface not in self.pnic_before:
  52. return 0
  53. return (self.pnic[interface].packets_recv -
  54. self.pnic_before[interface].packets_recv) / self.divisor
  55. def sysfs_interface_up(self, interface):
  56. try:
  57. sysfs = "/sys/class/net/{}/operstate".format(interface)
  58. with open(sysfs) as operstate:
  59. status = operstate.read().strip()
  60. return status == "up" or status == "unknown"
  61. except FileNotFoundError:
  62. return False
  63. def run(self):
  64. self.update_counters()
  65. cdict = {
  66. "bytes_sent": float(0),
  67. "bytes_recv": float(0),
  68. "packets_sent": float(0),
  69. "packets_recv": float(0),
  70. }
  71. for interface in self.interfaces:
  72. if self.sysfs_interface_up(interface):
  73. if not self.pnic_before or interface not in self.pnic_before:
  74. continue
  75. cdict["bytes_sent"] = self.get_bytes_sent(interface)
  76. cdict["bytes_recv"] = self.get_bytes_received(interface)
  77. cdict["packets_sent"] = self.get_packets_sent(interface)
  78. cdict["packets_recv"] = self.get_packets_received(interface)
  79. round_dict(cdict, self.round_size)
  80. self.output = {
  81. "full_text": self.format.format(**cdict),
  82. "color": self.color
  83. }