watchdog.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #!/usr/bin/env python
  2. # Copyright (c) 2020 Intel Corporation
  3. #
  4. # This work is licensed under the terms of the MIT license.
  5. # For a copy, see <https://opensource.org/licenses/MIT>.
  6. """
  7. This module provides a simple watchdog timer to detect timeouts
  8. It is for example used in the ScenarioManager
  9. """
  10. from __future__ import print_function
  11. import simple_watchdog_timer as swt
  12. try:
  13. import thread
  14. except ImportError:
  15. import _thread as thread
  16. class Watchdog(object):
  17. """
  18. Simple watchdog timer to detect timeouts
  19. Args:
  20. timeout (float): Timeout value of the watchdog [seconds]. If triggered, raises a KeyboardInterrupt.
  21. interval (float): Time between timeout checks [seconds]. Defaults to 1% of the timeout.
  22. Attributes:
  23. _timeout (float): Timeout value of the watchdog [seconds].
  24. _interval (float): Time between timeout checks [seconds].
  25. _failed (bool): True if watchdog exception occured, false otherwise
  26. """
  27. def __init__(self, timeout=1.0, interval=None):
  28. """Class constructor"""
  29. self._watchdog = None
  30. self._timeout = timeout + 1.0
  31. self._interval = min(interval if interval is not None else self._timeout / 100, 1.0)
  32. self._failed = False
  33. self._watchdog_stopped = False
  34. def start(self):
  35. """Start the watchdog"""
  36. self._watchdog = swt.WDT(
  37. check_interval_sec=self._interval,
  38. trigger_delta_sec=self._timeout,
  39. callback=self._callback
  40. )
  41. def stop(self):
  42. """Stop the watchdog"""
  43. if self._watchdog is not None and not self._watchdog_stopped:
  44. self.resume() # If not resumed, the stop will block. Does nothing if already resumed
  45. self._watchdog.stop()
  46. self._watchdog_stopped = True
  47. def pause(self):
  48. """Pause the watchdog"""
  49. if self._watchdog is not None:
  50. self._watchdog.pause()
  51. def resume(self):
  52. """Resume the watchdog."""
  53. if self._watchdog is not None:
  54. self._watchdog.resume()
  55. def update(self):
  56. """Reset the watchdog."""
  57. if self._watchdog_stopped:
  58. return
  59. if self._watchdog is not None:
  60. self._watchdog.update()
  61. def _callback(self, watchdog):
  62. """Method called when the timer triggers. Raises a KeyboardInterrupt on
  63. the main thread and stops the watchdog."""
  64. self.pause() # Good practice to stop it after the event occurs
  65. print('Watchdog exception - Timeout of {} seconds occured'.format(self._timeout))
  66. self._failed = True
  67. thread.interrupt_main()
  68. def get_status(self):
  69. """returns False if watchdog exception occured, True otherwise"""
  70. return not self._failed